Kafka Data Flow Error

Hi All,

Pega Version : 8.7.2

Server : Websphere

DB : Oracle

Established Kerberos connection successfully. Running the dataset manually works fine and able to see the messages.

However , when running the Real Time Data Flow, it gets stuck in ‘Initializing’ with below error:

Failed To Prepare Partitions: org.apache.kafka.common.errors.TimeoutException: Topic <> not present in metadata after 30000 ms.

Also, when we tried to manually run the data set from activity, it fails stating Exception executing the dataset. Please note, we are able to see the jsonData for the events in tracer though.

Error trace when running data set manually from activity:
2023-06-28 04:25:29,160 [ WebContainer : 36] [ STANDARD] (set.kafka.KafkaBrowseOperation) ERROR - Error deserializing stream while reading from Kafka topic com.pega.dsm.dnode.api.dataset.kafka.KafkaDataSet$KafkaDataSetBuilder$$Lambda$4850/0x00000000a4bb44f0@30640d06
at com.pega.dsm.kafka.api.serde.JsonSerde.deserialize(JsonSerde.java:59) ~[d-node.jar:?]
at com.pega.dsm.kafka.impl.serde.ConfiguredPegaSerde.deserialize(ConfiguredPegaSerde.java:59) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.deserializeMessage(KafkaBrowseOperation.java:347) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.convertRecordToClipboardPage(KafkaBrowseOperation.java:302) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:210) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:167) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:157) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecordsAndComplete(KafkaBrowseOperation.java:149) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.lambda$browse$0(KafkaBrowseOperation.java:145) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.reportError(KafkaBrowseOperation.java:261) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:217) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:167) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:157) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecordsAndComplete(KafkaBrowseOperation.java:149) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.lambda$browse$0(KafkaBrowseOperation.java:145) ~[d-node.jar:?]
at com.pega.dsm.kafka.api.serde.JsonSerde.deserialize(JsonSerde.java:59) ~[d-node.jar:?]
at com.pega.dsm.kafka.impl.serde.ConfiguredPegaSerde.deserialize(ConfiguredPegaSerde.java:59) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.deserializeMessage(KafkaBrowseOperation.java:347) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.convertRecordToClipboardPage(KafkaBrowseOperation.java:302) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:210) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:167) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:157) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecordsAndComplete(KafkaBrowseOperation.java:149) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.lambda$browse$0(KafkaBrowseOperation.java:145) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.reportError(KafkaBrowseOperation.java:261) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:217) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:167) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:157) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecordsAndComplete(KafkaBrowseOperation.java:149) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.lambda$browse$0(KafkaBrowseOperation.java:145) ~[d-node.jar:?]
at com.pega.dsm.kafka.api.serde.JsonSerde.deserialize(JsonSerde.java:59) ~[d-node.jar:?]
at com.pega.dsm.kafka.impl.serde.ConfiguredPegaSerde.deserialize(ConfiguredPegaSerde.java:59) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.deserializeMessage(KafkaBrowseOperation.java:347) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.convertRecordToClipboardPage(KafkaBrowseOperation.java:302) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:210) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:167) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:157) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecordsAndComplete(KafkaBrowseOperation.java:149) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.lambda$browse$0(KafkaBrowseOperation.java:145) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.reportError(KafkaBrowseOperation.java:261) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:217) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:167) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:157) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecordsAndComplete(KafkaBrowseOperation.java:149) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.lambda$browse$0(KafkaBrowseOperation.java:145) ~[d-node.jar:?]
at com.pega.dsm.kafka.api.serde.JsonSerde.deserialize(JsonSerde.java:59) ~[d-node.jar:?]
at com.pega.dsm.kafka.impl.serde.ConfiguredPegaSerde.deserialize(ConfiguredPegaSerde.java:59) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.deserializeMessage(KafkaBrowseOperation.java:347) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.convertRecordToClipboardPage(KafkaBrowseOperation.java:302) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:210) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:167) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecords(KafkaBrowseOperation.java:157) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.processRecordsAndComplete(KafkaBrowseOperation.java:149) ~[d-node.jar:?]
at com.pega.dsm.dnode.impl.dataset.kafka.KafkaBrowseOperation.lambda$browse$0(KafkaBrowseOperation.java:145) ~[d-node.jar:?]

Any help would be appreciated.

@SubhashC3897 we had faced this issue. We raised a SR and Pegasystems externalized kafka and this started working fine.

But in dev initially for testing purpose what we did is,

  1. First created a real time data flow(this went to partition error). → Initializing stage

  2. Then keeping the above one, created another real time data flow instance and this went to in progress state and stopped and removed the above one.

@Anoop Krishna , thank you for your prompt response.

Can you explain about what externalizing KAFKA means?

Fyi, we are the consumers.

@KiranD01 sure, earlier kafka services was embedded in the same server. Later on, as it is pega cloud, pegasystems offloaded kafka services into another server and trafficking would be less and with more performance. This is externaliztion of kafka services

Please check the below URL

https://support.pega.com/question/externalisation-kafka-service

@SubhashC3897 ,

Seems like partition related issue,

  1. Try server restart by deleting ‘Kafka-data’ folder from the server.

and run below queries before restart:

truncate table pegadata.pr_data_stream_node_updates;

truncate table pegadata.pr_data_stream_nodes;

truncate table pegadata.pr_data_stream_sessions;

Let me know if this does not work.

Thanks,

Siddhant Suryakant Jivane

@SiddhantJivane Thanks for the steps. We did try but the same issues occurs.

@SubhashC3897 , Better to raise an SR.

@SubhashC3897 if you did raise an INC support ticket, please provide the id here so that we can help track it.

@SubhashC3897 I’ve checked and it seems that you still did not log an INC on the MSP.

If your issue is resolved, please add a reply and mark as Accept Solution so that we can close this thread.