|
| 1 | +# 📨 KafkaPlugin (`kap`) |
| 2 | + |
| 3 | +The `kap` module provides a Kafka publishing task plugin for LinID: |
| 4 | + |
| 5 | +- **KafkaPublishTaskPlugin** — a `TaskPlugin` that publishes messages to Apache Kafka. |
| 6 | + |
| 7 | +## ✅ Use Case |
| 8 | + |
| 9 | +Use this plugin when you need to: |
| 10 | + |
| 11 | +- Send events to Kafka topics as part of the entity lifecycle (e.g., after user creation). |
| 12 | +- Construct dynamic payloads and headers from the execution context using Jinja templates. |
| 13 | +- Use flexible Kafka connection options including SSL and SASL authentication. |
| 14 | + |
| 15 | +## 🔧 Configuration |
| 16 | + |
| 17 | +```yaml |
| 18 | +- type: kafka-publish |
| 19 | + connection: |
| 20 | + brokers: ['kafka:9092'] |
| 21 | + clientId: 'linid-task-plugin' |
| 22 | + ssl: |
| 23 | + enabled: false |
| 24 | + sasl: |
| 25 | + mechanism: plain |
| 26 | + username: 'kafka_user' |
| 27 | + password: 'kafka_pass' |
| 28 | + topic: '{{context.kafka.topic}}' |
| 29 | + key: '{{context.user.id}}' |
| 30 | + payload: > |
| 31 | + { |
| 32 | + "eventType": "{{context.eventType}}", |
| 33 | + "timestamp": "{{context.now}}", |
| 34 | + "user": { |
| 35 | + "id": "{{context.user.id}}", |
| 36 | + "email": "{{context.user.email}}" |
| 37 | + } |
| 38 | + } |
| 39 | + headers: > |
| 40 | + [ |
| 41 | + {"name": "correlationId", "value": "{{context.requestId}}"}, |
| 42 | + {"name": "source", "value": "linid"} |
| 43 | + ] |
| 44 | + options: |
| 45 | + partition: null |
| 46 | + compression: gzip |
| 47 | + acks: all |
| 48 | + timestamp: null |
| 49 | + timeoutMs: 30000 |
| 50 | + normalizeWhitespace: true |
| 51 | + retry: |
| 52 | + attempts: 3 |
| 53 | + backoffMs: 1000 |
| 54 | +``` |
| 55 | +
|
| 56 | +### Configuration Fields |
| 57 | +
|
| 58 | +| Key | Required | Description | |
| 59 | +| ------------ | -------- | ---------------------------------------------------------------------- | |
| 60 | +| `connection` | ✅ | Kafka connection configuration (brokers, clientId, ssl, sasl). | |
| 61 | +| `topic` | ✅ | Target Kafka topic (supports Jinja templates). | |
| 62 | +| `key` | ❌ | Message key (supports Jinja templates). If omitted, no key is set. | |
| 63 | +| `payload` | ✅ | Message payload (supports Jinja templates). | |
| 64 | +| `headers` | ❌ | List of `{name, value}` headers (supports Jinja templates). | |
| 65 | +| `options` | ❌ | Advanced Kafka options (partition, compression, acks, timeout, retry). | |
| 66 | + |
| 67 | +### Connection Fields |
| 68 | + |
| 69 | +| Key | Required | Description | |
| 70 | +| ---------- | -------- | ---------------------------------------------------------- | |
| 71 | +| `brokers` | ✅ | List of Kafka broker addresses. | |
| 72 | +| `clientId` | ❌ | Client identifier for the Kafka producer. | |
| 73 | +| `ssl` | ❌ | SSL configuration (`enabled`, truststore, keystore). | |
| 74 | +| `sasl` | ❌ | SASL authentication (`mechanism`, `username`, `password`). | |
| 75 | + |
| 76 | +### Options Fields |
| 77 | + |
| 78 | +| Key | Default | Description | |
| 79 | +| --------------------- | ------- | ----------------------------------------------------------------------- | |
| 80 | +| `partition` | `null` | Target partition or `null` for round-robin. | |
| 81 | +| `compression` | `none` | Compression type: `none`, `gzip`, `snappy`, `lz4`, `zstd`. | |
| 82 | +| `acks` | `all` | Acknowledgment mode: `0`, `1`, `all`. | |
| 83 | +| `timestamp` | `null` | Optional message timestamp or `null` for broker-assigned. | |
| 84 | +| `timeoutMs` | — | Maps to Kafka `delivery.timeout.ms`. If omitted, Kafka default applies. | |
| 85 | +| `retry` | — | Retry configuration mapped to native Kafka producer properties. | |
| 86 | +| `retry.attempts` | — | Maps to Kafka `retries`. If omitted, Kafka default applies. | |
| 87 | +| `retry.backoffMs` | — | Maps to Kafka `retry.backoff.ms`. If omitted, Kafka default applies. | |
| 88 | +| `normalizeWhitespace` | `false` | If `true`, collapse consecutive whitespace in the rendered payload. | |
| 89 | + |
| 90 | +## 🛠 Behavior |
| 91 | + |
| 92 | +1. The plugin reads the connection, topic, key, payload, headers, and options from the task configuration. |
| 93 | +2. It resolves all Jinja templates using the execution context (`{{context.xxx}}`) and the entity attributes (`{{entity.xxx}}`). |
| 94 | +3. If `normalizeWhitespace` is enabled, consecutive whitespace in the rendered payload is collapsed into single spaces. |
| 95 | +4. It constructs a Kafka `ProducerRecord` with the resolved values, applying partition and timestamp if configured. |
| 96 | +5. It sends the message **asynchronously** using a cached `KafkaProducer` instance. Retry logic is handled natively by the Kafka producer via `retries`, `delivery.timeout.ms`, and `retry.backoff.ms` properties. |
| 97 | + |
| 98 | +- If a required option (`connection`, `topic`, `payload`) is missing, the plugin throws an error with status `500`. |
| 99 | +- On send failure, the error is logged. The send is asynchronous so errors do not block the calling thread. |
| 100 | +- Kafka producers are cached per connection and producer-level options (compression, acks) for reuse across invocations. |
| 101 | +- Cached producers are automatically closed on application shutdown via `@PreDestroy`. |
| 102 | + |
| 103 | +## 📖 Examples |
| 104 | + |
| 105 | +### Publish user creation event |
| 106 | + |
| 107 | +```yaml |
| 108 | +tasks: |
| 109 | + - type: kafka-publish |
| 110 | + phases: |
| 111 | + - afterCreate |
| 112 | + connection: |
| 113 | + brokers: ['kafka:9092'] |
| 114 | + topic: 'user-events' |
| 115 | + key: '{{context.user.id}}' |
| 116 | + payload: > |
| 117 | + {"action": "created", "userId": "{{context.user.id}}"} |
| 118 | +``` |
| 119 | + |
| 120 | +### Publish with SASL authentication and compression |
| 121 | + |
| 122 | +```yaml |
| 123 | +tasks: |
| 124 | + - type: kafka-publish |
| 125 | + phases: |
| 126 | + - afterUpdate |
| 127 | + connection: |
| 128 | + brokers: ['kafka1:9093', 'kafka2:9093'] |
| 129 | + clientId: 'linid-publisher' |
| 130 | + ssl: |
| 131 | + enabled: true |
| 132 | + sasl: |
| 133 | + mechanism: plain |
| 134 | + username: 'producer' |
| 135 | + password: 'secret' |
| 136 | + topic: 'audit-events' |
| 137 | + payload: > |
| 138 | + {"action": "updated", "entity": "{{context.entityName}}"} |
| 139 | + options: |
| 140 | + compression: gzip |
| 141 | + acks: all |
| 142 | +``` |
| 143 | + |
| 144 | +## 🧷 Important Notes |
| 145 | + |
| 146 | +- The plugin type identifier is `kafka-publish`. |
| 147 | +- Supported SASL mechanisms: `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`. |
| 148 | +- Templating uses Jinja (via `JinjaService`) to dynamically inject context values (`{{context.xxx}}`) and entity attributes (`{{entity.xxx}}`) into topic, key, payload, and headers. |
| 149 | +- Kafka producers are thread-safe and cached per connection configuration for optimal performance. |
0 commit comments