Skip to content

Commit 01371e3

Browse files
feat(kap): add KafkaPublishTaskPlugin
1 parent 8ecc0ed commit 01371e3

File tree

2 files changed

+428
-0
lines changed

2 files changed

+428
-0
lines changed
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright (C) 2020-2026 Linagora
3+
*
4+
* This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
5+
* Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
6+
* any later version, provided you comply with the Additional Terms applicable for LinID Identity Manager software by
7+
* LINAGORA pursuant to Section 7 of the GNU Affero General Public License, subsections (b), (c), and (e), pursuant to
8+
* which these Appropriate Legal Notices must notably (i) retain the display of the "LinID™" trademark/logo at the top
9+
* of the interface window, the display of the “You are using the Open Source and free version of LinID™, powered by
10+
* Linagora © 2009–2013. Contribute to LinID R&D by subscribing to an Enterprise offer!” infobox and in the e-mails
11+
* sent with the Program, notice appended to any type of outbound messages (e.g. e-mail and meeting requests) as well
12+
* as in the LinID Identity Manager user interface, (ii) retain all hypertext links between LinID Identity Manager
13+
* and https://linid.org/, as well as between LINAGORA and LINAGORA.com, and (iii) refrain from infringing LINAGORA
14+
* intellectual property rights over its trademarks and commercial brands. Other Additional Terms apply, see
15+
* <http://www.linagora.com/licenses/> for more details.
16+
*
17+
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
18+
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
19+
* details.
20+
*
21+
* You should have received a copy of the GNU Affero General Public License and its applicable Additional Terms for
22+
* LinID Identity Manager along with this program. If not, see <http://www.gnu.org/licenses/> for the GNU Affero
23+
* General Public License version 3 and <http://www.linagora.com/licenses/> for the Additional Terms applicable to the
24+
* LinID Identity Manager software.
25+
*/
26+
27+
package io.github.linagora.linid.im.kap;
28+
29+
import io.github.linagora.linid.im.corelib.exception.ApiException;
30+
import io.github.linagora.linid.im.corelib.i18n.I18nMessage;
31+
import io.github.linagora.linid.im.corelib.plugin.config.JinjaService;
32+
import io.github.linagora.linid.im.corelib.plugin.config.dto.TaskConfiguration;
33+
import io.github.linagora.linid.im.corelib.plugin.entity.DynamicEntity;
34+
import io.github.linagora.linid.im.corelib.plugin.task.TaskExecutionContext;
35+
import io.github.linagora.linid.im.corelib.plugin.task.TaskPlugin;
36+
import io.github.linagora.linid.im.kap.model.KafkaConnection;
37+
import io.github.linagora.linid.im.kap.model.KafkaHeader;
38+
import io.github.linagora.linid.im.kap.model.KafkaOptions;
39+
import java.nio.charset.StandardCharsets;
40+
import java.util.List;
41+
import java.util.Map;
42+
import lombok.extern.slf4j.Slf4j;
43+
import org.apache.kafka.clients.producer.KafkaProducer;
44+
import org.apache.kafka.clients.producer.ProducerRecord;
45+
import org.apache.kafka.common.header.Header;
46+
import org.apache.kafka.common.header.internals.RecordHeader;
47+
import org.springframework.beans.factory.annotation.Autowired;
48+
import org.springframework.stereotype.Component;
49+
import tools.jackson.core.type.TypeReference;
50+
51+
/**
52+
* Task plugin that publishes messages to Apache Kafka.
53+
*
54+
* <p>This plugin reads connection, topic, payload, key, headers, and options from the task
55+
* configuration, resolves Jinja templates, and sends a message asynchronously to the configured
56+
* Kafka topic. It supports SSL/SASL authentication, compression, and partitioning. Retry logic
57+
* is delegated to the native Kafka producer configuration.
58+
*/
59+
@Slf4j
60+
@Component
61+
public class KafkaPublishTaskPlugin implements TaskPlugin {
62+
63+
private static final String MISSING_OPTION = "error.plugin.default.missing.option";
64+
private static final String OPTION = "option";
65+
66+
private static final KafkaOptions DEFAULT_OPTIONS = new KafkaOptions(
67+
null, null, null, null, null, null, null);
68+
69+
private final JinjaService jinjaService;
70+
private final KafkaProducerFactory producerFactory;
71+
72+
/**
73+
* Constructor with dependency injection.
74+
*
75+
* @param jinjaService Service for rendering Jinja templates.
76+
* @param producerFactory Factory for creating and caching Kafka producers.
77+
*/
78+
@Autowired
79+
public KafkaPublishTaskPlugin(final JinjaService jinjaService,
80+
final KafkaProducerFactory producerFactory) {
81+
this.jinjaService = jinjaService;
82+
this.producerFactory = producerFactory;
83+
}
84+
85+
/**
86+
* Returns whether this plugin handles the given task type.
87+
*
88+
* @param type the task type identifier
89+
* @return {@code true} if the type is {@code "kafka-publish"}, {@code false} otherwise
90+
*/
91+
@Override
92+
public boolean supports(final String type) {
93+
return "kafka-publish".equals(type);
94+
}
95+
96+
/**
97+
* Publishes a message to a Kafka topic asynchronously.
98+
*
99+
* <p>Reads connection, topic, payload, key, headers, and options from the task configuration,
100+
* resolves Jinja templates against the execution context and entity attributes, then sends
101+
* the message using a cached Kafka producer. Retry logic is handled natively by the Kafka
102+
* producer via {@code retries}, {@code delivery.timeout.ms}, and {@code retry.backoff.ms}
103+
* properties.
104+
*
105+
* @param configuration the task configuration containing Kafka options
106+
* @param entity the dynamic entity whose attributes are available in templates
107+
* @param context the execution context containing contextual data
108+
*/
109+
@Override
110+
public void execute(final TaskConfiguration configuration,
111+
final DynamicEntity entity,
112+
final TaskExecutionContext context) {
113+
KafkaConnection connection = configuration
114+
.getOption("connection", new TypeReference<KafkaConnection>() {})
115+
.orElseThrow(() -> new ApiException(500,
116+
I18nMessage.of(MISSING_OPTION, Map.of(OPTION, "connection"))));
117+
118+
String topicTemplate = configuration.getOption("topic")
119+
.orElseThrow(() -> new ApiException(500,
120+
I18nMessage.of(MISSING_OPTION, Map.of(OPTION, "topic"))));
121+
122+
String payloadTemplate = configuration.getOption("payload")
123+
.orElseThrow(() -> new ApiException(500,
124+
I18nMessage.of(MISSING_OPTION, Map.of(OPTION, "payload"))));
125+
126+
String keyTemplate = configuration.getOption("key").orElse(null);
127+
128+
List<KafkaHeader> headerTemplates = configuration
129+
.getOption("headers", new TypeReference<List<KafkaHeader>>() {})
130+
.orElse(List.of());
131+
132+
KafkaOptions options = configuration
133+
.getOption("options", new TypeReference<KafkaOptions>() {})
134+
.orElse(DEFAULT_OPTIONS);
135+
136+
String topic = jinjaService.render(context, entity, topicTemplate);
137+
String key = keyTemplate != null ? jinjaService.render(context, entity, keyTemplate) : null;
138+
String payload = jinjaService.render(context, entity, payloadTemplate);
139+
140+
if (options.isNormalizeWhitespace()) {
141+
payload = payload.strip().replaceAll("\\s+", " ");
142+
}
143+
144+
List<Header> headers = headerTemplates.stream()
145+
.<Header>map(h -> new RecordHeader(
146+
jinjaService.render(context, entity, h.name()),
147+
jinjaService.render(context, entity, h.value()).getBytes(StandardCharsets.UTF_8)))
148+
.toList();
149+
150+
ProducerRecord<String, String> record = new ProducerRecord<>(
151+
topic, options.partition(), options.timestamp(), key, payload, headers);
152+
153+
KafkaProducer<String, String> producer = producerFactory.getOrCreate(connection, options);
154+
155+
producer.send(record, (metadata, exception) -> {
156+
if (exception != null) {
157+
log.error("Failed to send message to topic '{}': {}",
158+
record.topic(), exception.getMessage(), exception);
159+
return;
160+
}
161+
log.info("Message sent to topic '{}' partition {} offset {}",
162+
metadata.topic(), metadata.partition(), metadata.offset());
163+
});
164+
}
165+
}

0 commit comments

Comments
 (0)