Skip to content

Commit a86b4a6

Browse files
committed
test: Add KeyToValue integration test
1 parent d081c4b commit a86b4a6

File tree

2 files changed

+124
-1
lines changed

2 files changed

+124
-1
lines changed

src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Arrays;
2424
import java.util.Collections;
2525
import java.util.HashMap;
26+
import java.util.List;
2627
import java.util.Map;
2728
import java.util.Properties;
2829
import java.util.concurrent.ExecutionException;
@@ -144,7 +145,7 @@ void setUp() throws ExecutionException, InterruptedException {
144145
final NewTopic newTopic = new NewTopic(TestSourceConnector.NEW_TOPIC, 1, (short) 1);
145146
final NewTopic originalTopicForExtractTopicFromValue =
146147
new NewTopic(TopicFromValueSchemaConnector.TOPIC, 1, (short) 1);
147-
final NewTopic newTopicForExtractTopicFromValue =
148+
final NewTopic newTopicForExtractTopicFromValue =
148149
new NewTopic(TopicFromValueSchemaConnector.NAME, 1, (short) 1);
149150
adminClient.createTopics(Arrays.asList(originalTopic, newTopic, originalTopicForExtractTopicFromValue,
150151
newTopicForExtractTopicFromValue)).all().get();
@@ -234,6 +235,40 @@ void testCaseTransform() throws ExecutionException, InterruptedException, IOExce
234235
);
235236
}
236237

238+
@Test
239+
@Timeout(10)
240+
void testKeyToValue() throws ExecutionException, InterruptedException, IOException {
241+
242+
final String topicName = TestKeyToValueConnector.TARGET_TOPIC;
243+
adminClient.createTopics(List.of(new NewTopic(topicName, 1, (short) 1))).all().get();
244+
245+
final Map<String, String> connectorConfig = new HashMap<>();
246+
connectorConfig.put("name", "test-source-connector");
247+
connectorConfig.put("connector.class", TestKeyToValueConnector.class.getName());
248+
connectorConfig.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
249+
connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
250+
connectorConfig.put("tasks.max", "1");
251+
connectorConfig.put("transforms", "keyToValue");
252+
connectorConfig.put("transforms.keyToValue.case", "upper");
253+
connectorConfig.put("transforms.keyToValue.key.fields", "a1,a3");
254+
connectorConfig.put("transforms.keyToValue.value.fields", "b1");
255+
connectorConfig.put("transforms.keyToValue.type", "io.aiven.kafka.connect.transforms.KeyToValue");
256+
257+
connectRunner.createConnector(connectorConfig);
258+
259+
waitForCondition(
260+
() -> consumer.endOffsets(Collections.singletonList(new TopicPartition(topicName, 0)))
261+
.values().stream().reduce(Long::sum).map(s -> s == TestKeyToValueConnector.MESSAGES_TO_PRODUCE)
262+
.orElse(false), 5000, "Messages appear in target topic"
263+
);
264+
265+
final String payload = "'payload':{'b1':'a1','b2':'b2','b3':'b3','a3':'a3'}".replace('\'', '"');
266+
consumer.subscribe(Collections.singletonList(topicName));
267+
for (final ConsumerRecord<byte[], byte[]> consumerRecord : consumer.poll(Duration.ofSeconds(1))) {
268+
assertThat(consumerRecord.value()).asString().contains(payload);
269+
}
270+
}
271+
237272
final void checkMessageTransformInTopic(final TopicPartition topicPartition, final long expectedNumberOfMessages)
238273
throws InterruptedException, IOException {
239274
waitForCondition(
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2025 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Collections;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
import org.apache.kafka.connect.connector.Task;
25+
import org.apache.kafka.connect.data.Schema;
26+
import org.apache.kafka.connect.data.SchemaBuilder;
27+
import org.apache.kafka.connect.data.Struct;
28+
import org.apache.kafka.connect.source.SourceRecord;
29+
import org.apache.kafka.connect.source.SourceTask;
30+
31+
public class TestKeyToValueConnector extends AbstractTestSourceConnector {
32+
33+
static final long MESSAGES_TO_PRODUCE = 10L;
34+
35+
static final String TARGET_TOPIC = "key-to-value-target-topic";
36+
37+
@Override
38+
public Class<? extends Task> taskClass() {
39+
return TestKeyToValueConnector.TestSourceConnectorTask.class;
40+
}
41+
42+
public static class TestSourceConnectorTask extends SourceTask {
43+
private int counter = 0;
44+
45+
private final Schema keySchema = SchemaBuilder.struct().field("a1", SchemaBuilder.STRING_SCHEMA)
46+
.field("a2", SchemaBuilder.STRING_SCHEMA)
47+
.field("a3", SchemaBuilder.STRING_SCHEMA).schema();
48+
private final Struct key = new Struct(keySchema).put("a1", "a1").put("a2", "a2").put("a3", "a3");
49+
private final Schema valueSchema = SchemaBuilder.struct().field("b1", SchemaBuilder.STRING_SCHEMA)
50+
.field("b2", SchemaBuilder.STRING_SCHEMA)
51+
.field("b3", SchemaBuilder.STRING_SCHEMA).schema();
52+
private final Struct value = new Struct(valueSchema).put("b1", "b1").put("b2", "b2").put("b3", "b3");
53+
54+
@Override
55+
public void start(final Map<String, String> props) {
56+
}
57+
58+
@Override
59+
public List<SourceRecord> poll() {
60+
if (counter >= MESSAGES_TO_PRODUCE) {
61+
return null; // indicate pause
62+
}
63+
64+
final Map<String, String> sourcePartition = new HashMap<>();
65+
sourcePartition.put("partition", "0");
66+
final Map<String, String> sourceOffset = new HashMap<>();
67+
sourceOffset.put("offset", Integer.toString(counter));
68+
69+
counter += 1;
70+
71+
return Collections.singletonList(
72+
new SourceRecord(sourcePartition, sourceOffset,
73+
TARGET_TOPIC,
74+
keySchema, key,
75+
valueSchema, value)
76+
);
77+
}
78+
79+
@Override
80+
public void stop() {
81+
}
82+
83+
@Override
84+
public String version() {
85+
return null;
86+
}
87+
}
88+
}

0 commit comments

Comments
 (0)