Unable to clear Ready to Process queue processor records

Pega 8.8.2. Our stream node will not join our cluster which is preventing millions of queue records from being processed, which is causing issues with loading our interaction portal due to the data page executed in the OOTB rule SystemHealthOverview1. We have disable this step for now but we still need to resolve the queue processor issue. Every time we restart the cluster this one goes down immediately with error:

[0.001s][error][logging] Error opening log file ‘/usr/share/tomcat/kafka-1.1.0.10/logs/kafkaServer-gc.log’: No space left on device

We would like to remove all Ready to Process items in our custom queue processors so that the node does not immediately run out of memory, but using the script found in other articles is not working for this version of Pega.

The below java is returning the error:

The method withIdempotentSave(boolean) is undefined for the type StreamDataSetBuilder

java.util.ArrayList<String> partitionKeys = new java.util.ArrayList<>();
partitionKeys.add(".pzMessageContent.pzInsKey");
com.pega.dsm.dnode.api.dataset.DataSet dataset = com.pega.dsm.dnode.api.dataset.stream.StreamDataSet.builder()
.withClassName("System-Message-QueueProcessor")
.withTopicName("RESOLVEAGEDINBOUNDWLCASE")
.withPartitionKeys(partitionKeys)
.withIdempotentSave(true)
.withOperation(com.pega.dsm.dnode.api.dataset.operation.SaveOperation.NAME)
.withOperation(com.pega.dsm.dnode.impl.dataset.kafka.KafkaGetTopicsOperation.NAME)
.withOperation(com.pega.dsm.dnode.api.dataset.operation.BrowseOperation.NAME)
.withOperation(com.pega.dsm.dnode.impl.dataset.kafka.KafkaDropOperation.NAME)
.withOperation(com.pega.dsm.dnode.impl.dataset.kafka.KafkaTruncateOperation.NAME)
.withSerde(com.pega.dsm.kafka.api.serde.ClipboardPageStreamSerde.create())
.build(tools);
com.pega.dsm.dnode.api.dataset.operation.Operation operation = dataset.getOperationByName(com.pega.dsm.dnode.impl.dataset.kafka.KafkaTruncateOperation.NAME);
((com.pega.dsm.dnode.impl.dataset.kafka.KafkaTruncateOperation)operation).truncate().await(tools);

I have also tried importing the DELAYEDITEMSDATAFLOWSERVICE data instance from a working environment to see if that was the issue but did not have any impact.

Can you suggest how to resolve this error or issue overall?

2 Likes

@AndrewP it sounds like you need to log a support issue for this.

Please use the MSP and provide the INC id here so that we can help track the ticket.

Support case was INC-B35990 -

You can create an activity with a Java step with below code:

java.util.ArrayList<String> partitionKeys = new java.util.ArrayList<>();
partitionKeys.add(".pzMessageContent.pzInsKey");
com.pega.dsm.dnode.api.dataset.DataSet dataset = com.pega.dsm.dnode.api.dataset.stream.StreamDataSet.builder()
.withClassName("System-Message-QueueProcessor")
.withStreamName("<QP name in capital letter>")
.withPartitionKeys(partitionKeys)
.withOperation(com.pega.dsm.dnode.api.dataset.operation.SaveOperation.NAME)
.withOperation(com.pega.dsm.dnode.impl.dataset.kafka.KafkaGetTopicsOperation.NAME)
.withOperation(com.pega.dsm.dnode.api.dataset.operation.BrowseOperation.NAME)
.withOperation(com.pega.dsm.dnode.impl.dataset.kafka.KafkaDropOperation.NAME)
.withOperation(com.pega.dsm.dnode.impl.dataset.kafka.KafkaTruncateOperation.NAME)
.withSerde(com.pega.dsm.kafka.api.serde.ClipboardPageStreamSerde.create()).build(tools);
com.pega.dsm.dnode.api.dataset.operation.Operation operation = dataset.getOperationByName(com.pega.dsm.dnode.impl.dataset.stream.StreamTruncateOperation.NAME);
((com.pega.dsm.dnode.impl.dataset.stream.StreamTruncateOperation)operation).truncate().await(tools);

Update the above code with QP name in Capital letters for which you want to truncate the ready to process queue

@AndrewP

Hi Andrew,

The activity is not saving with this Java code. I have created the activity in @baseclass.

Please let me know if I need to update anything in the code.

Thanks in advance.

@AdilSaleem This seems to change with newer versions of Pega. I wasn’t able to save the java in my original post because one line was removed in 8.x. Not sure if it has changed again in a newer version. Pega support should be able to assist.

@AndrewP We handled it in a different way as it was urgent.