Exception handling for pega Kafka producer and consumer

Could you please help in answering the following questions?

  1. In the event of brokers unavailability, how can we handle the exception while reading the data from Kafka or publish the data to Kafka?

  2. How to retry if reading data from Kafka topic fails?

  3. How to retry if writing data to Kafka topic fails?

  4. How to stop the real time event data flows if any of the exception happened in Kafka Pega integration ? Is there any API available?

  5. Are there any settings at server side or dynamic system settings regarding the exception scenarios?

@RathnakumarT :warning: This is a GenAI-powered tool. All generated answers require validation against the provided references.

Here are detailed answers to each of your questions:

1) In the event of brokers unavailability, how can we handle the exception while reading the data from Kafka or publish the data to Kafka?

When Kafka brokers become unavailable, Pega provides several mechanisms to handle these exceptions:

  • Configuration Validation: Often, broker unavailability issues are related to incorrect configuration of the Kafka configuration instance. Using the Kafka-test tool can help determine the correct configuration, which can then be replicated in your Pega application.
  • Connection Refresh: If Kafka connections are using a cached version of the configuration, restarting the connections will enforce the use of updated configurations. This can be done by:
    • Restarting the running Data Flow (see Managing Data Flow runs)
    • Restarting nodes that use a Kafka connection (see Restarting nodes in containerized deployments)
  • Retry Mechanism: For temporary unavailability, Pega’s default producer configuration includes retry settings (10 retries by default) that automatically attempt to reconnect when brokers are temporarily unavailable.

2) How to retry if reading data from Kafka topic fails?

Pega provides built-in retry capabilities for consumer operations:

  • Default Consumer Configuration: By default, consumer settings include:
    • enable.auto.commit: false - Gives more control over commit operations
    • auto.offset.reset: earliest - Ensures no messages are missed on reconnection
    • session.timeout.ms: 30000 - Provides time for recovery before session expiration
  • Custom Retry Configuration: You can modify these default settings using Dynamic System Settings (DSS) with these formats:
    • Global setting: prconfig/dnode/kafka/consumer/<consumer_property_name>/default
    • Topic-specific setting: prconfig/dnode/kafka/consumer/<consumer_property_name>/<topic_name>
  • Key Parameters to Adjust:
    • retry.backoff.ms: Controls the time between retry attempts
    • request.timeout.ms: Determines how long to wait for a response
    • session.timeout.ms: Defines how long to maintain the consumer session

3) How to retry if writing data to Kafka topic fails?

For producer (writing) operations, Pega provides these retry capabilities:

  • Default Producer Configuration:
    • retries: 10 - Automatically attempts to resend messages up to 10 times
    • acks: 1 - Waits for acknowledgment from the broker leader
    • max.block.ms: 30000 - Maximum time the producer will block when sending data
  • Custom Retry Configuration: Similar to consumers, you can modify these settings using DSS:
    • Global setting: prconfig/dnode/kafka/producer/<producer_property_name>/default
    • Topic-specific setting: prconfig/dnode/kafka/producer/<producer_property_name>/<topic_name>
  • Key Parameters for Retry Tuning:
    • Increase retries value for more persistent retry attempts
    • Adjust retry.backoff.ms to control the interval between retries
    • Configure delivery.timeout.ms to set the maximum time for delivery attempts

4) How to stop the real-time event data flows if any of the exception happened in Kafka Pega integration? Is there any API available?

To stop real-time event data flows during exceptions:

  • Manual Control: In Dev Studio, navigate to Configure > Decisioning > Decisions > Data Flows > Real-time processing, select the relevant Data Flow, click “Manage” and select “Stop” to manually stop a Data Flow.
  • Automatic Exception Handling: You can implement error thresholds in your Data Flow configuration. When the threshold is exceeded, the Data Flow will stop automatically with the exception message “Failure threshold hit - too many errors occurred”.
  • Programmatic Control: While there isn’t a dedicated API specifically for exception-based stops, you can:
    • Use the Data Flow Manager interface (com.pega.dsm.dnode.api.dataflow.DataFlowManager) to programmatically control Data Flows
    • Implement monitoring that detects exceptions and triggers the stop command

5) Are there any settings at server side or dynamic system settings regarding the exception scenarios?

Yes, several settings can be configured for exception handling:

  • Dynamic System Settings (DSS):
    • Producer settings: prconfig/dnode/kafka/producer/<property_name>/default
    • Consumer settings: prconfig/dnode/kafka/consumer/<property_name>/default
    • Topic-specific settings by adding topic name instead of “default”
  • Key Exception-Related Settings:
    • retries: Controls the number of retry attempts (default: 10)
    • retry.backoff.ms: Time between retries
    • request.timeout.ms: How long to wait for server response
    • max.block.ms: Maximum time the producer will block
    • delivery.timeout.ms: Maximum time from send() to delivery/failure
  • Data Flow Configuration Settings:
    • Failure thresholds can be set to automatically stop Data Flows after a certain number of errors
    • Checkpointing frequency controls how often the system records the processing state (important for recovery)

References: