Hi,
I have the exact same issue. After some days of running fine I get this exception, then my server is not responding properly, and it is restarting (hosted on kubernetes so restarting automaticaly with probes not repsonding)
Looking at the log just few seconds before for stream error I have this log :
@timestamp
2020-10-20T08:25:56.059Z
@version
1
app
level
ERROR
logger_name
com.pega.dsm.dnode.api.server.StreamServerService
message
Unexpected broker state NORMAL, current env StreamServiceCluster[
MemberInfo{address=10.240.0.XXX, prpcId=pega-pegademo-xxxxxxxx-7vltj, state=NORMAL, sessionId=xxxxxxxxx-xxxx-xxxx-xxxx-xxxxxd92eb4e}
]
nodeType
Stream,WebUser,BackgroundProcessing,Search,Batch,RealTime,Custom1,Custom2,Custom3,Custom4,Custom5,BIX,ADM,RTDG
pegathread
STANDARD
source_host
pega-pegademo-xxxxxxxxxx-7vltj
stack
tenantid
thread_name
PegaRULES-HeatlthMonitorTaskExecutor
(format is from json export of container logs)
then this exception, full content here for more details :
@timestamp
2020-10-20T08:26:04.978Z
@version
1
CorrelationId
pyProcessCaseEvents
app
exception
{“stacktrace”:“java.util.concurrent.CompletionException: com.pega.fnx.stream.spi.StreamServiceException: Invalid configuration. Undefined stream provider end point.\n\tat java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)\n\tat java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)\n\tat java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:870)\n\tat java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883)\n\tat java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251)\n\tat com.pega.fnx.stream.spi.impl.metric.StreamSPIMetricsProxy.execute(StreamSPIMetricsProxy.java:41)\n\tat com.pega.fnx.stream.spi.impl.StreamSP.execute(StreamSP.java:68)\n\tat com.pega.fnx.stream.spi.loader.SPIProxy.execute(SPIProxy.java:48)\n\tat com.pega.platform.stream.StreamSPIStateValidator.execute(StreamSPIStateValidator.java:67)\n\tat com.pega.platform.stream.StreamSPIRequestValidator.execute(StreamSPIRequestValidator.java:57)\n\tat com.pega.fnx.stream.api.PollingSubscriber.poll(PollingSubscriber.java:80)\n\tat com.pega.dsm.dnode.impl.dataset.stream.StreamBrowseOperation.processRecords(StreamBrowseOperation.java:138)\n\tat com.pega.dsm.dnode.impl.dataset.stream.StreamBrowseOperation.access$000(StreamBrowseOperation.java:61)\n\tat com.pega.dsm.dnode.impl.dataset.stream.StreamBrowseOperation$1.emit(StreamBrowseOperation.java:107)\n\tat com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:353)\n\tat com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:55)\n\tat com.pega.dsm.dnode.impl.dataflow.resilience.retry.RetryableBrowse$RetryableEmitter.produceRecords(RetryableBrowse.java:145)\n\tat com.pega.dsm.dnode.impl.dataflow.resilience.retry.RetryableBrowse$RetryableEmitter.emit(RetryableBrowse.java:111)\n\tat com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:353)\n\tat com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:55)\n\tat com.pega.dsm.dnode.impl.stream.DataObservableImpl$3.emit(DataObservableImpl.java:176)\n\tat com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:353)\n\tat com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:55)\n\tat com.pega.dsm.dnode.api.dataflow.DataFlow$3.run(DataFlow.java:432)\n\tat com.pega.dsm.dnode.api.dataflow.DataFlow$3.run(DataFlow.java:425)\n\tat com.pega.dsm.dnode.util.PrpcRunnable.execute(PrpcRunnable.java:67)\n\tat com.pega.dsm.dnode.impl.dataflow.DataFlowThreadContext.lambda$submitInput$3(DataFlowThreadContext.java:228)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:108)\n\tat com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:41)\n\tat com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:77)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat com.pega.dsm.dnode.util.PrpcRunnable$1.run(PrpcRunnable.java:59)\n\tat com.pega.dsm.dnode.util.PrpcRunnable$1.run(PrpcRunnable.java:56)\n\tat com.pega.dsm.dnode.util.PrpcRunnable.execute(PrpcRunnable.java:67)\n\tat com.pega.dsm.dnode.impl.prpc.PrpcThreadFactory$PrpcThread.run(PrpcThreadFactory.java:124)\nCaused by: com.pega.fnx.stream.spi.StreamServiceException: Invalid configuration. Undefined stream provider end point.\n\tat com.pega.fnx.stream.spi.impl.kafka.KafkaSettingsProvider.getBootstrapServers(KafkaSettingsProvider.java:157)\n\tat com.pega.fnx.stream.spi.impl.kafka.KafkaSettingsProvider.defaultConsumerProperties(KafkaSettingsProvider.java:80)\n\tat com.pega.fnx.stream.spi.impl.kafka.KafkaSettingsProvider.getConsumerProperties(KafkaSettingsProvider.java:62)\n\tat com.pega.fnx.stream.spi.impl.kafka.KafkaServerImpl.getConsumer(KafkaServerImpl.java:49)\n\tat com.pega.fnx.stream.spi.impl.processor.SubscriberRequestProcessor.getConsumer(SubscriberRequestProcessor.java:41)\n\tat com.pega.fnx.stream.spi.impl.processor.GetMessageRequestProcessor.execute(GetMessageRequestProcessor.java:75)\n\tat com.pega.fnx.stream.spi.impl.processor.GetMessageRequestProcessor.execute(GetMessageRequestProcessor.java:46)\n\tat com.pega.fnx.stream.spi.impl.StreamSPNative.execute(StreamSPNative.java:61)\n\t… 32 more\n”,“exception_class”:“java.util.concurrent.CompletionException”,“exception_message”:“com.pega.fnx.stream.spi.StreamServiceException: Invalid configuration. Undefined stream provider end point.”}
exception_class
java.util.concurrent.CompletionException
exception_message
com.pega.fnx.stream.spi.StreamServiceException: Invalid configuration. Undefined stream provider end point.
stacktrace
java.util.concurrent.CompletionException: com.pega.fnx.stream.spi.StreamServiceException: Invalid configuration. Undefined stream provider end point.
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:870)
at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251)
at com.pega.fnx.stream.spi.impl.metric.StreamSPIMetricsProxy.execute(StreamSPIMetricsProxy.java:41)
at com.pega.fnx.stream.spi.impl.StreamSP.execute(StreamSP.java:68)
at com.pega.fnx.stream.spi.loader.SPIProxy.execute(SPIProxy.java:48)
at com.pega.platform.stream.StreamSPIStateValidator.execute(StreamSPIStateValidator.java:67)
at com.pega.platform.stream.StreamSPIRequestValidator.execute(StreamSPIRequestValidator.java:57)
at com.pega.fnx.stream.api.PollingSubscriber.poll(PollingSubscriber.java:80)
at com.pega.dsm.dnode.impl.dataset.stream.StreamBrowseOperation.processRecords(StreamBrowseOperation.java:138)
at com.pega.dsm.dnode.impl.dataset.stream.StreamBrowseOperation.access$000(StreamBrowseOperation.java:61)
at com.pega.dsm.dnode.impl.dataset.stream.StreamBrowseOperation$1.emit(StreamBrowseOperation.java:107)
at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:353)
at com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:55)
at com.pega.dsm.dnode.impl.dataflow.resilience.retry.RetryableBrowse$RetryableEmitter.produceRecords(RetryableBrowse.java:145)
at com.pega.dsm.dnode.impl.dataflow.resilience.retry.RetryableBrowse$RetryableEmitter.emit(RetryableBrowse.java:111)
at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:353)
at com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:55)
at com.pega.dsm.dnode.impl.stream.DataObservableImpl$3.emit(DataObservableImpl.java:176)
at com.pega.dsm.dnode.impl.stream.DataObservableImpl$SafeDataSubscriber.subscribe(DataObservableImpl.java:353)
at com.pega.dsm.dnode.impl.stream.DataObservableImpl.subscribe(DataObservableImpl.java:55)
at com.pega.dsm.dnode.api.dataflow.DataFlow$3.run(DataFlow.java:432)
at com.pega.dsm.dnode.api.dataflow.DataFlow$3.run(DataFlow.java:425)
at com.pega.dsm.dnode.util.PrpcRunnable.execute(PrpcRunnable.java:67)
at com.pega.dsm.dnode.impl.dataflow.DataFlowThreadContext.lambda$submitInput$3(DataFlowThreadContext.java:228)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:108)
at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:41)
at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:77)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at com.pega.dsm.dnode.util.PrpcRunnable$1.run(PrpcRunnable.java:59)
at com.pega.dsm.dnode.util.PrpcRunnable$1.run(PrpcRunnable.java:56)
at com.pega.dsm.dnode.util.PrpcRunnable.execute(PrpcRunnable.java:67)
at com.pega.dsm.dnode.impl.prpc.PrpcThreadFactory$PrpcThread.run(PrpcThreadFactory.java:124)
Caused by: com.pega.fnx.stream.spi.StreamServiceException: Invalid configuration. Undefined stream provider end point.
at com.pega.fnx.stream.spi.impl.kafka.KafkaSettingsProvider.getBootstrapServers(KafkaSettingsProvider.java:157)
at com.pega.fnx.stream.spi.impl.kafka.KafkaSettingsProvider.defaultConsumerProperties(KafkaSettingsProvider.java:80)
at com.pega.fnx.stream.spi.impl.kafka.KafkaSettingsProvider.getConsumerProperties(KafkaSettingsProvider.java:62)
at com.pega.fnx.stream.spi.impl.kafka.KafkaServerImpl.getConsumer(KafkaServerImpl.java:49)
at com.pega.fnx.stream.spi.impl.processor.SubscriberRequestProcessor.getConsumer(SubscriberRequestProcessor.java:41)
at com.pega.fnx.stream.spi.impl.processor.GetMessageRequestProcessor.execute(GetMessageRequestProcessor.java:75)
at com.pega.fnx.stream.spi.impl.processor.GetMessageRequestProcessor.execute(GetMessageRequestProcessor.java:46)
at com.pega.fnx.stream.spi.impl.StreamSPNative.execute(StreamSPNative.java:61)
… 32 more
level
ERROR
logger_name
com.pega.dsm.dnode.impl.dataflow.listener.RetryLoggerListener
message
[pyProcessCaseEvents] Preparing retry 1. Next attempt in 1s on data flow [pyProcessCaseEvents] on stage [InputStage] with affected partitions: [0, 1, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 2, 3, 4, 5, 6, 7, 8, 9]
nodeType
Stream,WebUser,BackgroundProcessing,Search,Batch,RealTime,Custom1,Custom2,Custom3,Custom4,Custom5,BIX,ADM,RTDG
pegathread
STANDARD
source_host
pega-pegademo-xxxxxxxxxxxx-7vltj
stack
tenantid
thread_name
DataFlow-Service-PickingupRun-pyProcessCaseEvents_Input:100, Access group: [PRPC:AsyncProcessor], Partitions=[0,1,10,11,12,13,14,15,16,17,18,19,2,3,4,5,6,7,8,9]