Kafka dataflow fails when parse the json with data trasform

Hi, I am trying parse json with data transform in Kafka data flow.

It works when automapping selected and map only with simple page.

Data transform unit tested with JSON, it parsing correctly

when we map the data in complex pagelist (Page->Page->PageList), it fails in data flow execution with below error.

com.pega.dsm.dnode.api.dataflow.StageException: Exception in stage: KafkaDataSet at com.pega.dsm.dnode.api.dataflow.StageException.create(StageException.java:54) at com.pega.dsm.dnode.api.dataflow.DataFlowStage$StageOutputSubscriber.onError(DataFlowStage.java:633) at com.pega.dsm.dnode.api.dataflow.DataFlowStage$StageInputSubscriber.onError(DataFlowStage.java:500) at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.onError(DataObservableImpl.java:320) at com.pega.dsm.dnode.api.stream.DataSubscriber.onError(DataSubscriber.java:75) at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.onError(DataObservableImpl.java:320) at com.pega.dsm.dnode.impl.dataflow.resilience.retry.RetryableBrowse$RecordErrorHandlingSubscriber.handleRecordError(RetryableBrowse.java:192) at com.pega.dsm.dnode.impl.dataflow.resilience.retry.RetryableBrowse$RecordErrorHandlingSubscriber.onError(RetryableBrowse.java:182) at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.onError(DataObservableImpl.java:320) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.reportError(KafkaBrowseOperation.java:261) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:217) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:167) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:157) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecordsAndComplete(KafkaBrowseOperation.java:149) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.lambda$browse$0(KafkaBrowseOperation.java:145) at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:353) at com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:55) at com.pega.dsm.dnode.impl.dataflow.resilience.retry.RetryableBrowse$RetryableEmitter.produceRecords(RetryableBrowse.java:145) at com.pega.dsm.dnode.impl.dataflow.resilience.retry.RetryableBrowse$RetryableEmitter.emit(RetryableBrowse.java:111) at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:353) at com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:55) at com.pega.dsm.dnode.impl.stream.DataObservableImpl$3.emit(DataObservableImpl.java:176) at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:353) at com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:55) at com.pega.dsm.dnode.api.dataflow.DataFlow$3.run(DataFlow.java:438) at com.pega.dsm.dnode.api.dataflow.DataFlow$3.run(DataFlow.java:431) at com.pega.dsm.dnode.util.PrpcRunnable.execute(PrpcRunnable.java:67) at com.pega.dsm.dnode.impl.dataflow.DataFlowThreadContext.lambda$submitInput$5(DataFlowThreadContext.java:271) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:108) at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:41) at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:77) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at com.pega.dsm.dnode.util.PrpcRunnable$1.run(PrpcRunnable.java:59) at com.pega.dsm.dnode.util.PrpcRunnable$1.run(PrpcRunnable.java:56) at com.pega.dsm.dnode.util.PrpcRunnable.execute(PrpcRunnable.java:67) at com.pega.dsm.dnode.impl.prpc.PrpcThreadFactory$PrpcThread.run(PrpcThreadFactory.java:124) Caused by: com.pega.dsm.dnode.api.ExceptionWithInputRecord: com.pega.pegarules.pub.PRRuntimeException: LegacyModelAspectInvokableRuleContainer.invoke-Exception Encountered a :java.lang.UnsupportedOperationException … 29 more Caused by: com.pega.pegarules.pub.PRRuntimeException: LegacyModelAspectInvokableRuleContainer.invoke-Exception Encountered a :java.lang.UnsupportedOperationException at com.pega.platform.executionengine.vtable.containers.internal.LegacyModelAspectInvokableRuleContainer.invoke(LegacyModelAspectInvokableRuleContainer.java:76) at com.pega.platform.executionengine.vtable.internal.RuleDispatcherImpl.invokeModel(RuleDispatcherImpl.java:165) at com.pega.decision.dsm.strategy.clipboard.DSMPegaAPI.applyModel(DSMPegaAPI.java:330) at com.pega.decision.dsm.strategy.clipboard.DSMPegaAPI.applyModel(DSMPegaAPI.java:301) at com.pega.dsm.kafka.api.serde.DataTransformSerde.deserializeWithUsageOfJsonDataTransform(DataTransformSerde.java:80) at com.pega.dsm.kafka.api.serde.DataTransformSerde.deserialize(DataTransformSerde.java:72) at com.pega.dsm.kafka.api.serde.LazyExceptionHandlingSerDe.deserialize(LazyExceptionHandlingSerDe.java:77) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.deserializeMessage(KafkaBrowseOperation.java:347) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.convertRecordToClipboardPage(KafkaBrowseOperation.java:302) at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:210) … 27 more Caused by: java.lang.UnsupportedOperationException at com.pega.decision.dsm.strategy.clipboard.DSMClipboardPropertyAtomicInList.getProperty(DSMClipboardPropertyAtomicInList.java:137) at com.pega.pegarules.data.internal.clipboard.mapping.json.ClipboardJSONDeserializer.createChildProperty(ClipboardJSONDeserializer.java:490) at com.pega.pegarules.data.internal.clipboard.mapping.json.ClipboardJSONDeserializer.populateClipboardWithJson(ClipboardJSONDeserializer.java:291) at com.pega.pegarules.data.internal.clipboard.mapping.json.ClipboardJSONDeserializer.populateClipboardPage(ClipboardJSONDeserializer.java:135) at com.pega.pegarules.data.internal.clipboard.mapping.json.ClipboardJSONMapper.deserialize(ClipboardJSONMapper.java:57) at com.pegarules.generated.model.ra_model_parsedatajson_dedeafae040d7fe58c3d98bb9b835a9d.perform(ra_model_parsedatajson_dedeafae040d7fe58c3d98bb9b835a9d.java:74) at com.pega.platform.executionengine.vtable.containers.internal.LegacyModelAspectInvokableRuleContainer.invoke(LegacyModelAspectInvokableRuleContainer.java:67) … 36 more

Data transform

Dataset

JSON

{
	"id": "0001",
	"type": "donut",
	"name": "Cake",
	"ppu": 0.55,
	"event": {
		"batters": {
			"batter": [
				{
					"id": "1001",
					"type": "Regular"
				},
				{
					"id": "1002",
					"type": "Chocolate"
				},
				{
					"id": "1003",
					"type": "Blueberry"
				},
				{
					"id": "1004",
					"type": "Devil's Food"
				}
			]
		}
	},
	"topping": [
		{
			"id": "5001",
			"type": "None"
		},
		{
			"id": "5002",
			"type": "Glazed"
		},
		{
			"id": "5005",
			"type": "Sugar"
		},
		{
			"id": "5007",
			"type": "Powdered Sugar"
		},
		{
			"id": "5006",
			"type": "Chocolate with Sprinkles"
		},
		{
			"id": "5003",
			"type": "Chocolate"
		},
		{
			"id": "5004",
			"type": "Maple"
		}
	]
}

@MarissaRogers are you able to tag any dataflow/dataset SMEs.

@JohnPaulRaja,C We had the same issue and it was resolved after performing below steps.

  1. Update/Create below DSS

Owning Ruleset: Pega-DecisionEngine
Setting Purpose: dsm/clipboard/optimize
Value: false

  1. Perform Cluster restart.

This is only a workaround. Pega is planning to come with a hotfix to resolve this permanently.

@MANODASS

Do you know if Pega ever released this as a hotfix? If so what number did it get?

@MathijsF1060 ; I think this is the mentionned Pega fix:

<<INC-218145 · ISSUE 715678 - DSS introduced to control DSM clipboard page serialization

RESOLVED IN PEGA VERSION 8.7.3

When using a Kafka dataset to consume a message from an external topic that had an attribute name with a special character contained in a page list structure, using a JSON data transform for the mapping in a realtime dataflow resulted in the error “Exception in stage: KafkaDS; LegacyModelAspectInvokableRuleContainer.invoke-Exception encountered a :java.lang.UnsupportedOperationException.” To resolve this, a new DSS dataset/CLASS_NAME/DATASET_NAME/JSONDataTransform/deserialization/useDSMPage has been introduced. When the value is set to true, the process will follow the previous behavior of DSM clipboard pages being generated when Kafka records are deserialized using JSON data transform. When the value is set to false, the JSON data transform will generate regular clipboard pages and convert them later to DSM clipboard pages. This would avoid errors when a JSON data transform calls methods from the Clipboard API that are not implemented by DSM pages. This DSS is set per data set instance. CLASS_NAME and DATASET_NAME are placeholders which should be replaced by data set’s pyClassName and pyPurpose property values. In addition, a similar DSS, dataset/CLASS_NAME/DATASET_NAME/JSONDataTransform/serialization/useDSMPage, has been introduced for serialization.>>

@Mano@TCS_Pega This had to be applied for 8.8.1 version as well.

@Avinash.Haridasu if you are in 8.8.1 version check the DSS or check with pega support team if you are facing similar issue

@MANODASS is there a hotfix number available for this issue?