-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Description
Create a new task plugin for CMTP that allows sending messages to Kafka directly from the execution context.
This plugin should support dynamic construction of payload and headers from context values, flexible Kafka connection options (brokers, SSL, SASL), and advanced message options such as partitioning, compression, acknowledgments, retries, and error handling.
kap for KafkaPlugin and class: KafkaPublishTaskPlugin
Configuration
Example configuration:
task:
type: kafka-publish
connection:
brokers: ["kafka1:9092", "kafka2:9092"]
clientId: "linid-task-plugin"
ssl:
enabled: false
sasl:
mechanism: plain
username: "kafka_user"
password: "kafka_pass"
topic: "{{context.kafka.topic}}"
key: "{{context.user.id}}"
payload: >
{
"eventType": "{{context.eventType}}",
"timestamp": "{{context.now}}",
"user": {
"id": "{{context.user.id}}",
"email": "{{context.user.email}}"
}
}
headers: >
[
{"name": "correlationId", "value": "{{context.requestId}}"},
{"name": "source", "value": "linid"}
]
options:
partition: null
compression: gzip
acks: all
timestamp: null
timeoutMs: 30000
retry:
attempts: 3
backoffMs: 1000Behavior
- The payload and headers are resolved from the execution context using the templating mechanism.
- The task constructs a Kafka message using these values and sends it to the configured topic.
- Advanced options like partitioning, compression, and acks are applied to the message.
- Retry logic handles temporary failures, and error handling behavior is determined by
onError.
Options
-
partition(number | null) – Sends the message to a specific partition or uses round-robin ifnull. -
compression(string) – Kafka-supported compression (none,gzip,snappy,lz4,zstd). -
acks(string) – Kafka acknowledgment mode (0,1,all). -
timestamp(number | null) – Optional Kafka timestamp for the message. -
timeoutMs(number) – Maximum time to wait for the message to be sent. -
retry(object) – Retry configuration:attempts– Number of retries on failure.backoffMs– Milliseconds to wait between retries.
Task Logic
- Resolve
payloadandheadersfrom the execution context. - Apply optional normalization to the payload if
normalizeWhitespaceis enabled. - Construct the Kafka message object, applying key, partition, headers, and compression.
- Send the message using the configured Kafka connection, respecting SSL/SASL options.
Expected Result
- Messages can be dynamically generated from context and reliably sent to Kafka.
- Advanced Kafka options and retry/error handling provide robustness for production workloads.
- Headers and payload are fully configurable via YAML.