Skip to content

Commit c424784

Browse files
christophechevalierjcabannes
authored andcommitted
test(kap): add E2E tests with real Kafka broker
1 parent 67b4238 commit c424784

File tree

3 files changed

+243
-0
lines changed

3 files changed

+243
-0
lines changed

kap/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,15 @@
5151
</dependencies>
5252
<build>
5353
<plugins>
54+
<plugin>
55+
<groupId>org.pitest</groupId>
56+
<artifactId>pitest-maven</artifactId>
57+
<configuration>
58+
<excludedTestClasses>
59+
<param>io.github.linagora.linid.im.kap.KafkaPublishTaskPluginE2ETest</param>
60+
</excludedTestClasses>
61+
</configuration>
62+
</plugin>
5463
<plugin>
5564
<groupId>org.apache.maven.plugins</groupId>
5665
<artifactId>maven-shade-plugin</artifactId>
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
/*
2+
* Copyright (C) 2020-2026 Linagora
3+
*
4+
* This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
5+
* Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
6+
* any later version, provided you comply with the Additional Terms applicable for LinID Identity Manager software by
7+
* LINAGORA pursuant to Section 7 of the GNU Affero General Public License, subsections (b), (c), and (e), pursuant to
8+
* which these Appropriate Legal Notices must notably (i) retain the display of the "LinID™" trademark/logo at the top
9+
* of the interface window, the display of the “You are using the Open Source and free version of LinID™, powered by
10+
* Linagora © 2009–2013. Contribute to LinID R&D by subscribing to an Enterprise offer!” infobox and in the e-mails
11+
* sent with the Program, notice appended to any type of outbound messages (e.g. e-mail and meeting requests) as well
12+
* as in the LinID Identity Manager user interface, (ii) retain all hypertext links between LinID Identity Manager
13+
* and https://linid.org/, as well as between LINAGORA and LINAGORA.com, and (iii) refrain from infringing LINAGORA
14+
* intellectual property rights over its trademarks and commercial brands. Other Additional Terms apply, see
15+
* <http://www.linagora.com/licenses/> for more details.
16+
*
17+
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
18+
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
19+
* details.
20+
*
21+
* You should have received a copy of the GNU Affero General Public License and its applicable Additional Terms for
22+
* LinID Identity Manager along with this program. If not, see <http://www.gnu.org/licenses/> for the GNU Affero
23+
* General Public License version 3 and <http://www.linagora.com/licenses/> for the Additional Terms applicable to the
24+
* LinID Identity Manager software.
25+
*/
26+
27+
package io.github.linagora.linid.im.kap;
28+
29+
import static org.junit.jupiter.api.Assertions.assertEquals;
30+
import static org.junit.jupiter.api.Assertions.assertNotNull;
31+
import static org.junit.jupiter.api.Assertions.assertTrue;
32+
33+
import com.hubspot.jinjava.Jinjava;
34+
import io.github.linagora.linid.im.corelib.plugin.config.JinjaService;
35+
import io.github.linagora.linid.im.corelib.plugin.config.dto.TaskConfiguration;
36+
import io.github.linagora.linid.im.corelib.plugin.entity.DynamicEntity;
37+
import io.github.linagora.linid.im.corelib.plugin.task.TaskExecutionContext;
38+
import java.nio.charset.StandardCharsets;
39+
import java.time.Duration;
40+
import java.util.HashMap;
41+
import java.util.List;
42+
import java.util.Map;
43+
import java.util.Properties;
44+
import org.apache.kafka.clients.consumer.ConsumerConfig;
45+
import org.apache.kafka.clients.consumer.ConsumerRecord;
46+
import org.apache.kafka.clients.consumer.ConsumerRecords;
47+
import org.apache.kafka.clients.consumer.KafkaConsumer;
48+
import org.apache.kafka.common.TopicPartition;
49+
import org.apache.kafka.common.header.Header;
50+
import org.apache.kafka.common.serialization.StringDeserializer;
51+
import org.junit.jupiter.api.AfterEach;
52+
import org.junit.jupiter.api.BeforeEach;
53+
import org.junit.jupiter.api.DisplayName;
54+
import org.junit.jupiter.api.Test;
55+
56+
/**
57+
* End-to-end tests for {@link KafkaPublishTaskPlugin} with a real Kafka broker.
58+
*
59+
* <p>Requires a running Kafka broker on {@code localhost:9092}.
60+
* Use {@code docker compose -f kap/src/test/resources/docker-compose.yml up -d} before running.
61+
*/
62+
@DisplayName("Test class: KafkaPublishTaskPlugin E2E")
63+
class KafkaPublishTaskPluginE2ETest {
64+
65+
private static final String BROKER = "localhost:9092";
66+
67+
private KafkaPublishTaskPlugin plugin;
68+
private KafkaProducerFactoryImpl producerFactory;
69+
private KafkaConsumer<String, String> consumer;
70+
71+
@BeforeEach
72+
void setUp() {
73+
producerFactory = new KafkaProducerFactoryImpl();
74+
plugin = new KafkaPublishTaskPlugin(new JinjaServiceImpl(), producerFactory);
75+
}
76+
77+
@AfterEach
78+
void tearDown() {
79+
if (consumer != null) {
80+
consumer.close();
81+
}
82+
producerFactory.closeAll();
83+
}
84+
85+
@Test
86+
@DisplayName("test e2e: should publish a message to Kafka and consume it")
87+
void testPublishAndConsume() {
88+
String topic = "e2e-test-publish-" + System.currentTimeMillis();
89+
90+
var config = new TaskConfiguration();
91+
config.addOption("connection", Map.of("brokers", List.of(BROKER)));
92+
config.addOption("topic", topic);
93+
config.addOption("key", "test-key");
94+
config.addOption("payload", "{\"event\":\"created\",\"id\":\"42\"}");
95+
96+
plugin.execute(config, new DynamicEntity(), new TaskExecutionContext());
97+
producerFactory.flushAll();
98+
99+
consumer = createConsumer(topic);
100+
101+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
102+
103+
assertEquals(1, records.count());
104+
ConsumerRecord<String, String> record = records.iterator().next();
105+
assertEquals("test-key", record.key());
106+
assertTrue(record.value().contains("\"event\":\"created\""));
107+
assertTrue(record.value().contains("\"id\":\"42\""));
108+
}
109+
110+
@Test
111+
@DisplayName("test e2e: should resolve entity templates in payload")
112+
void testPublishWithEntityTemplates() {
113+
String topic = "e2e-test-entity-" + System.currentTimeMillis();
114+
115+
var config = new TaskConfiguration();
116+
config.addOption("connection", Map.of("brokers", List.of(BROKER)));
117+
config.addOption("topic", topic);
118+
config.addOption("key", "{{ entity.id }}");
119+
config.addOption("payload", "{\"id\":\"{{ entity.id }}\",\"email\":\"{{ entity.email }}\"}");
120+
121+
var entity = new DynamicEntity();
122+
entity.setAttributes(new HashMap<>(Map.of("id", "user-99", "email", "test@example.com")));
123+
124+
plugin.execute(config, entity, new TaskExecutionContext());
125+
producerFactory.flushAll();
126+
127+
consumer = createConsumer(topic);
128+
129+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
130+
131+
assertEquals(1, records.count());
132+
ConsumerRecord<String, String> record = records.iterator().next();
133+
assertEquals("user-99", record.key());
134+
assertTrue(record.value().contains("\"id\":\"user-99\""));
135+
assertTrue(record.value().contains("\"email\":\"test@example.com\""));
136+
}
137+
138+
@Test
139+
@DisplayName("test e2e: should include headers in the published message")
140+
void testPublishWithHeaders() {
141+
String topic = "e2e-test-headers-" + System.currentTimeMillis();
142+
143+
var config = new TaskConfiguration();
144+
config.addOption("connection", Map.of("brokers", List.of(BROKER)));
145+
config.addOption("topic", topic);
146+
config.addOption("payload", "{}");
147+
config.addOption("headers", List.of(
148+
Map.of("name", "source", "value", "linid"),
149+
Map.of("name", "version", "value", "1.0")));
150+
151+
plugin.execute(config, new DynamicEntity(), new TaskExecutionContext());
152+
producerFactory.flushAll();
153+
154+
consumer = createConsumer(topic);
155+
156+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
157+
158+
assertEquals(1, records.count());
159+
ConsumerRecord<String, String> record = records.iterator().next();
160+
161+
Header sourceHeader = record.headers().lastHeader("source");
162+
assertNotNull(sourceHeader);
163+
assertEquals("linid", new String(sourceHeader.value(), StandardCharsets.UTF_8));
164+
165+
Header versionHeader = record.headers().lastHeader("version");
166+
assertNotNull(versionHeader);
167+
assertEquals("1.0", new String(versionHeader.value(), StandardCharsets.UTF_8));
168+
}
169+
170+
/**
171+
* Creates a consumer assigned directly to partition 0 of the given topic, seeking to the
172+
* beginning. This avoids consumer group coordination issues with Kafka KRaft single-node.
173+
*/
174+
private KafkaConsumer<String, String> createConsumer(String topic) {
175+
Properties props = new Properties();
176+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER);
177+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
178+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
179+
180+
var kafkaConsumer = new KafkaConsumer<String, String>(props);
181+
TopicPartition partition = new TopicPartition(topic, 0);
182+
kafkaConsumer.assign(List.of(partition));
183+
kafkaConsumer.seekToBeginning(List.of(partition));
184+
return kafkaConsumer;
185+
}
186+
187+
/**
188+
* JinjaService implementation using Jinjava directly, matching production behavior.
189+
*/
190+
static class JinjaServiceImpl implements JinjaService {
191+
192+
private final Jinjava jinjava = new Jinjava();
193+
194+
@Override
195+
public String render(TaskExecutionContext taskContext, String template) {
196+
return render(taskContext, null, Map.of(), template);
197+
}
198+
199+
@Override
200+
public String render(TaskExecutionContext taskContext, DynamicEntity entity, String template) {
201+
return render(taskContext, entity, Map.of(), template);
202+
}
203+
204+
@Override
205+
public String render(TaskExecutionContext taskContext, DynamicEntity entity,
206+
Map<String, Object> map, String template) {
207+
var context = new HashMap<String, Object>();
208+
209+
context.put("context", taskContext);
210+
if (entity != null && entity.getAttributes() != null) {
211+
context.put("entity", entity.getAttributes());
212+
} else {
213+
context.put("entity", Map.of());
214+
}
215+
context.putAll(map);
216+
217+
return jinjava.render(template, context);
218+
}
219+
}
220+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
services:
2+
kafka:
3+
image: apache/kafka:4.2.0
4+
ports:
5+
- "9092:9092"
6+
environment:
7+
KAFKA_NODE_ID: 1
8+
KAFKA_PROCESS_ROLES: broker,controller
9+
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
10+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
11+
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
12+
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
13+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
14+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

0 commit comments

Comments
 (0)