Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions .github/workflows/kafka-publish-task-pull-request.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
name: 'Check Pull Request for plugin: kafka-publish-task-plugin'

on:
pull_request:
types: [opened, synchronize, reopened]
paths:
- 'kap/**'

jobs:
tests:
name: Unit tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK 21 for x64
uses: actions/setup-java@v4
with:
java-version: '21'
distribution: 'temurin'
architecture: x64

- name: Start Kafka
working-directory: kap/src/test/resources
run: |
docker compose up -d
timeout 60 bash -c 'until docker compose exec kafka \
/opt/kafka/bin/kafka-broker-api-versions.sh \
--bootstrap-server localhost:9092 > /dev/null 2>&1; do sleep 2; done'

- name: Install Pandoc
run: sudo apt-get update && sudo apt-get install -y pandoc

- name: Execute unit tests
run: mvn -ntp -pl kap test

- name: Stop Kafka
if: always()
working-directory: kap/src/test/resources
run: docker compose down

- name: Execute mutation tests
run: mvn -ntp -pl kap org.pitest:pitest-maven:mutationCoverage

- name: Extract summary from pitest
run: |
echo "<html><head></head><body><h1>Pit Test Coverage Report: kafka-publish-task-plugin</h1><h3>Project Summary</h3>" > pitest.html
perl -0777 -ne 'print "$1\n" if /(<table>.*?<\/table>)/s' kap/target/pit-reports/index.html >> pitest.html
echo "</body></html>" >> pitest.html

- name: Convert pitest report to markdown
run: pandoc --from html --to markdown_github --no-highlight pitest.html

- name: Post comment
uses: luukkemp/pr-comment@2024.1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
path: pitest.html
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Each plugin is maintained in a dedicated subdirectory and follows the same struc
| `cmtp` | Context plugin: task to map context values + validation to compare values. |
| `hpp` | Task plugin to execute configurable HTTP requests within task lifecycles. |
| `jptp` | Task plugin to parse a JSON string from the context. |
| `kap` | Task plugin to publish messages to Apache Kafka. |

### **📦 Provider plugins**

Expand Down
149 changes: 149 additions & 0 deletions kap/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# 📨 KafkaPlugin (`kap`)

The `kap` module provides a Kafka publishing task plugin for LinID:

- **KafkaPublishTaskPlugin** — a `TaskPlugin` that publishes messages to Apache Kafka.

## ✅ Use Case

Use this plugin when you need to:

- Send events to Kafka topics as part of the entity lifecycle (e.g., after user creation).
- Construct dynamic payloads and headers from the execution context using Jinja templates.
- Use flexible Kafka connection options including SSL and SASL authentication.

## 🔧 Configuration

```yaml
- type: kafka-publish
connection:
brokers: ['kafka:9092']
clientId: 'linid-task-plugin'
ssl:
enabled: false
sasl:
mechanism: plain
username: 'kafka_user'
password: 'kafka_pass'
topic: '{{context.kafka.topic}}'
key: '{{context.user.id}}'
payload: >
{
"eventType": "{{context.eventType}}",
"timestamp": "{{context.now}}",
"user": {
"id": "{{context.user.id}}",
"email": "{{context.user.email}}"
}
}
headers: >
[
{"name": "correlationId", "value": "{{context.requestId}}"},
{"name": "source", "value": "linid"}
]
options:
partition: null
compression: gzip
acks: all
timestamp: null
timeoutMs: 30000
normalizeWhitespace: true
retry:
attempts: 3
backoffMs: 1000
```

### Configuration Fields

| Key | Required | Description |
| ------------ | -------- | ---------------------------------------------------------------------- |
| `connection` | ✅ | Kafka connection configuration (brokers, clientId, ssl, sasl). |
| `topic` | ✅ | Target Kafka topic (supports Jinja templates). |
| `key` | ❌ | Message key (supports Jinja templates). If omitted, no key is set. |
| `payload` | ✅ | Message payload (supports Jinja templates). |
| `headers` | ❌ | List of `{name, value}` headers (supports Jinja templates). |
| `options` | ❌ | Advanced Kafka options (partition, compression, acks, timeout, retry). |

### Connection Fields

| Key | Required | Description |
| ---------- | -------- | ---------------------------------------------------------- |
| `brokers` | ✅ | List of Kafka broker addresses. |
| `clientId` | ❌ | Client identifier for the Kafka producer. |
| `ssl` | ❌ | SSL configuration (`enabled`, truststore, keystore). |
| `sasl` | ❌ | SASL authentication (`mechanism`, `username`, `password`). |

### Options Fields

| Key | Default | Description |
| --------------------- | ------- | ----------------------------------------------------------------------- |
| `partition` | `null` | Target partition or `null` for round-robin. |
| `compression` | `none` | Compression type: `none`, `gzip`, `snappy`, `lz4`, `zstd`. |
| `acks` | `all` | Acknowledgment mode: `0`, `1`, `all`. |
| `timestamp` | `null` | Optional message timestamp or `null` for broker-assigned. |
| `timeoutMs` | — | Maps to Kafka `delivery.timeout.ms`. If omitted, Kafka default applies. |
| `retry` | — | Retry configuration mapped to native Kafka producer properties. |
| `retry.attempts` | — | Maps to Kafka `retries`. If omitted, Kafka default applies. |
| `retry.backoffMs` | — | Maps to Kafka `retry.backoff.ms`. If omitted, Kafka default applies. |
| `normalizeWhitespace` | `false` | If `true`, collapse consecutive whitespace in the rendered payload. |

## 🛠 Behavior

1. The plugin reads the connection, topic, key, payload, headers, and options from the task configuration.
2. It resolves all Jinja templates using the execution context (`{{context.xxx}}`) and the entity attributes (`{{entity.xxx}}`).
3. If `normalizeWhitespace` is enabled, consecutive whitespace in the rendered payload is collapsed into single spaces.
4. It constructs a Kafka `ProducerRecord` with the resolved values, applying partition and timestamp if configured.
5. It sends the message **asynchronously** using a cached `KafkaProducer` instance. Retry logic is handled natively by the Kafka producer via `retries`, `delivery.timeout.ms`, and `retry.backoff.ms` properties.

- If a required option (`connection`, `topic`, `payload`) is missing, the plugin throws an error with status `500`.
- On send failure, the error is logged. The send is asynchronous so errors do not block the calling thread.
- Kafka producers are cached per connection and producer-level options (compression, acks) for reuse across invocations.
- Cached producers are automatically closed on application shutdown via `@PreDestroy`.

## 📖 Examples

### Publish user creation event

```yaml
tasks:
- type: kafka-publish
phases:
- afterCreate
connection:
brokers: ['kafka:9092']
topic: 'user-events'
key: '{{context.user.id}}'
payload: >
{"action": "created", "userId": "{{context.user.id}}"}
```

### Publish with SASL authentication and compression

```yaml
tasks:
- type: kafka-publish
phases:
- afterUpdate
connection:
brokers: ['kafka1:9093', 'kafka2:9093']
clientId: 'linid-publisher'
ssl:
enabled: true
sasl:
mechanism: plain
username: 'producer'
password: 'secret'
topic: 'audit-events'
payload: >
{"action": "updated", "entity": "{{context.entityName}}"}
options:
compression: gzip
acks: all
```

## 🧷 Important Notes

- The plugin type identifier is `kafka-publish`.
- Supported SASL mechanisms: `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`.
- Templating uses Jinja (via `JinjaService`) to dynamically inject context values (`{{context.xxx}}`) and entity attributes (`{{entity.xxx}}`) into topic, key, payload, and headers.
- Kafka producers are thread-safe and cached per connection configuration for optimal performance.
91 changes: 91 additions & 0 deletions kap/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.github.linagora.linid.im</groupId>
<artifactId>linid-im-api-community-plugins</artifactId>
<version>0.1.0</version>
</parent>
<groupId>io.github.linagora.linid.im</groupId>
<artifactId>kap</artifactId>
<version>0.1.0</version>
<name>kafka-publish-task</name>
<description>Task plugin to publish messages to Apache Kafka.</description>
<url>https://github.com/linagora/linid-im-api-community-plugins</url>
<licenses>
<license>
<name>GNU Affero General Public License v3.0</name>
<url>https://www.gnu.org/licenses/agpl-3.0.html</url>
<distribution>repo</distribution>
<comments>This project is licensed under the GNU AGPL v3.0</comments>
</license>
</licenses>
<developers>
<developer>
<id>christophechevalier</id>
<name>Christophe CHEVALIER</name>
<email>cchevalier@linagora.com</email>
</developer>
</developers>
<scm>
<connection>scm:git:git://github.com/linagora/linid-im-api-community-plugins.git</connection>
<developerConnection>scm:git:ssh://git@github.com:linagora/linid-im-api-community-plugins.git</developerConnection>
<url>https://github.com/linagora/linid-im-api-community-plugins</url>
</scm>
<properties>
<sonar.projectBaseDir>kap</sonar.projectBaseDir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>com.hubspot.jinjava</groupId>
<artifactId>jinjava</artifactId>
<version>2.8.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.pitest</groupId>
<artifactId>pitest-maven</artifactId>
<configuration>
<excludedTestClasses>
<param>io.github.linagora.linid.im.kap.KafkaPublishTaskPluginE2ETest</param>
</excludedTestClasses>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<filters>
<filter>
<artifact>org.apache.kafka:kafka-clients</artifact>
<excludes>
<exclude>org/apache/kafka/common/security/oauthbearer/BrokerJwtValidator*.class</exclude>
<exclude>org/apache/kafka/common/security/oauthbearer/internals/secured/JwksFileVerificationKeyResolver*.class</exclude>
<exclude>org/apache/kafka/common/security/oauthbearer/internals/secured/RefreshingHttpsJwks*.class</exclude>
<exclude>org/apache/kafka/common/security/oauthbearer/internals/secured/VerificationKeyResolverFactory*.class</exclude>
<exclude>org/apache/kafka/common/security/oauthbearer/internals/secured/CloseableVerificationKeyResolver.class</exclude>
<exclude>org/apache/kafka/server/telemetry/**</exclude>
<exclude>org/apache/kafka/shaded/**/collector/**</exclude>
</excludes>
</filter>
<filter>
<artifact>org.xerial.snappy:snappy-java</artifact>
<excludes>
<exclude>org/xerial/snappy/SnappyBundleActivator.class</exclude>
</excludes>
</filter>
</filters>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2020-2026 Linagora
*
* This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
* Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
* any later version, provided you comply with the Additional Terms applicable for LinID Identity Manager software by
* LINAGORA pursuant to Section 7 of the GNU Affero General Public License, subsections (b), (c), and (e), pursuant to
* which these Appropriate Legal Notices must notably (i) retain the display of the "LinID™" trademark/logo at the top
* of the interface window, the display of the “You are using the Open Source and free version of LinID™, powered by
* Linagora © 2009–2013. Contribute to LinID R&D by subscribing to an Enterprise offer!” infobox and in the e-mails
* sent with the Program, notice appended to any type of outbound messages (e.g. e-mail and meeting requests) as well
* as in the LinID Identity Manager user interface, (ii) retain all hypertext links between LinID Identity Manager
* and https://linid.org/, as well as between LINAGORA and LINAGORA.com, and (iii) refrain from infringing LINAGORA
* intellectual property rights over its trademarks and commercial brands. Other Additional Terms apply, see
* <http://www.linagora.com/licenses/> for more details.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
* details.
*
* You should have received a copy of the GNU Affero General Public License and its applicable Additional Terms for
* LinID Identity Manager along with this program. If not, see <http://www.gnu.org/licenses/> for the GNU Affero
* General Public License version 3 and <http://www.linagora.com/licenses/> for the Additional Terms applicable to the
* LinID Identity Manager software.
*/

package io.github.linagora.linid.im.kap;

import io.github.linagora.linid.im.kap.model.KafkaConnection;
import io.github.linagora.linid.im.kap.model.KafkaOptions;
import org.apache.kafka.clients.producer.KafkaProducer;

/**
* Factory for creating and caching {@link KafkaProducer} instances.
*
* <p>Implementations are responsible for building Kafka producer configurations from
* the plugin's connection and options settings, caching producers by configuration key,
* and managing their lifecycle.
*/
public interface KafkaProducerFactory {

/**
* Returns an existing producer for the given connection, or creates a new one.
*
* @param connection Kafka connection configuration.
* @param options Optional advanced options (compression, acks, retry). May be {@code null}.
* @return A {@link KafkaProducer} instance.
*/
KafkaProducer<String, String> getOrCreate(KafkaConnection connection, KafkaOptions options);

/**
* Flushes all cached Kafka producers, ensuring all buffered messages are sent.
*/
void flushAll();

/**
* Closes all cached Kafka producers and clears the cache.
*/
void closeAll();
}
Loading
Loading