Skip to content

Commit 8986ea7

Browse files
authored
Merge pull request #170 from Aiven-Open/rskraba/KCON-136-keytovalue
feat: Add KeyToValue transform
2 parents 15ff758 + e966d30 commit 8986ea7

File tree

6 files changed

+805
-2
lines changed

6 files changed

+805
-2
lines changed

README.md

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ The transformation defines the following configurations:
9999
- `fail` - fail with `DataException`.
100100

101101
Here is an example of this transformation configuration:
102+
102103
```properties
103104
transforms=TombstoneHandler
104105
transforms.TombstoneHandler.type=io.aiven.kafka.connect.transforms.TombstoneHandler
@@ -139,6 +140,7 @@ This transformation converts a record into a tombstone by setting its value and
139140
It can be used together with predicates, for example, to create a tombstone event from a delete event produced by a source connector.
140141

141142
Here is an example of this transformation configuration:
143+
142144
```properties
143145
transforms=MakeTombstone
144146
transforms.MakeTombstone.type=io.aiven.kafka.connect.transforms.MakeTombstone
@@ -188,6 +190,7 @@ transforms.ExtractTopicFromSchemaName.type=io.aiven.kafka.connect.transforms.Ext
188190
transforms.ExtractTopicFromSchemaName.schema.name.topic-map=com.acme.schema.SchemaNameToTopic1:TheNameToReplace1,com.acme.schema.SchemaNameToTopic2:TheNameToReplace2
189191

190192
```
193+
191194
And here is an example of this transformation configuration (using :schema.name.regex)
192195

193196
```properties
@@ -221,6 +224,37 @@ transforms.caseTransform.type=io.aiven.kafka.connect.transforms.CaseTransform$Va
221224
transforms.caseTransform.field.names=field_name_1, field_name_2
222225
```
223226

227+
### `KeyToValue`
228+
229+
Updates the record value with information found in the record key.
230+
231+
This transformation extracts fields from the key and adds them to the value. This is similar to the standard [ValueToKey](https://kafka.apache.org/documentation/#org.apache.kafka.connect.transforms.ValueToKey) transformation from Kafka, but doesn't replace the value.
232+
233+
This supports extracting information from a record key with a schema (e.g. Avro) or without a schema (e.g. JSON), as well as from a record value with a schema or without a schema.
234+
235+
The transformation defines the following configurations:
236+
237+
- `key.fields` - The comma-separated name(s) of the fields in the record key that should be extracted, or `*` to use the entire key.
238+
- `value.fields` - The comma-separated name(s) of the fields to add into the record value, in the same order as `key.fields`.
239+
240+
Any empty or missing value field uses the same name as the key field by default. If a `*` is specified as the key field, its default value field name is `_key`.
241+
242+
Here is an example of this transformation configuration that copies the `id`, `department` and `cost` fields from the key to the value, and renames the `department` field in the value to `dept`:
243+
244+
```properties
245+
transforms=keyToValue
246+
transforms.keyToValue.type=io.aiven.kafka.connect.transforms.KeyToValue
247+
transforms.keyToValue.key.fields=id, department, cost
248+
transforms.keyToValue.value.fields=id, dept
249+
```
250+
251+
Here is an example of this transformation configuration that copies the entire key to the value, under the field `_key`:
252+
253+
```properties
254+
transforms=copyKey
255+
transforms.copyKey.type=io.aiven.kafka.connect.transforms.KeyToValue
256+
transforms.copyKey.key.fields=*
257+
```
224258

225259
## License
226260

@@ -229,4 +263,3 @@ This project is licensed under the [Apache License, Version 2.0](LICENSE).
229263
## Trademarks
230264

231265
Apache Kafka and Apache Kafka Connect are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.
232-

src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Arrays;
2424
import java.util.Collections;
2525
import java.util.HashMap;
26+
import java.util.List;
2627
import java.util.Map;
2728
import java.util.Properties;
2829
import java.util.concurrent.ExecutionException;
@@ -144,7 +145,7 @@ void setUp() throws ExecutionException, InterruptedException {
144145
final NewTopic newTopic = new NewTopic(TestSourceConnector.NEW_TOPIC, 1, (short) 1);
145146
final NewTopic originalTopicForExtractTopicFromValue =
146147
new NewTopic(TopicFromValueSchemaConnector.TOPIC, 1, (short) 1);
147-
final NewTopic newTopicForExtractTopicFromValue =
148+
final NewTopic newTopicForExtractTopicFromValue =
148149
new NewTopic(TopicFromValueSchemaConnector.NAME, 1, (short) 1);
149150
adminClient.createTopics(Arrays.asList(originalTopic, newTopic, originalTopicForExtractTopicFromValue,
150151
newTopicForExtractTopicFromValue)).all().get();
@@ -234,6 +235,39 @@ void testCaseTransform() throws ExecutionException, InterruptedException, IOExce
234235
);
235236
}
236237

238+
@Test
239+
@Timeout(10)
240+
void testKeyToValue() throws ExecutionException, InterruptedException, IOException {
241+
242+
final String topicName = TestKeyToValueConnector.TARGET_TOPIC;
243+
adminClient.createTopics(List.of(new NewTopic(topicName, 1, (short) 1))).all().get();
244+
245+
final Map<String, String> connectorConfig = new HashMap<>();
246+
connectorConfig.put("name", "test-source-connector");
247+
connectorConfig.put("connector.class", TestKeyToValueConnector.class.getName());
248+
connectorConfig.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
249+
connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
250+
connectorConfig.put("tasks.max", "1");
251+
connectorConfig.put("transforms", "keyToValue");
252+
connectorConfig.put("transforms.keyToValue.key.fields", "a1,a3");
253+
connectorConfig.put("transforms.keyToValue.value.fields", "b1");
254+
connectorConfig.put("transforms.keyToValue.type", "io.aiven.kafka.connect.transforms.KeyToValue");
255+
256+
connectRunner.createConnector(connectorConfig);
257+
258+
waitForCondition(
259+
() -> consumer.endOffsets(Collections.singletonList(new TopicPartition(topicName, 0)))
260+
.values().stream().reduce(Long::sum).map(s -> s == TestKeyToValueConnector.MESSAGES_TO_PRODUCE)
261+
.orElse(false), 5000, "Messages appear in target topic"
262+
);
263+
264+
final String payload = "'payload':{'b1':'a1','b2':'b2','b3':'b3','a3':'a3'}".replace('\'', '"');
265+
consumer.subscribe(Collections.singletonList(topicName));
266+
for (final ConsumerRecord<byte[], byte[]> consumerRecord : consumer.poll(Duration.ofSeconds(1))) {
267+
assertThat(consumerRecord.value()).asString().contains(payload);
268+
}
269+
}
270+
237271
final void checkMessageTransformInTopic(final TopicPartition topicPartition, final long expectedNumberOfMessages)
238272
throws InterruptedException, IOException {
239273
waitForCondition(
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2025 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Collections;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
import org.apache.kafka.connect.connector.Task;
25+
import org.apache.kafka.connect.data.Schema;
26+
import org.apache.kafka.connect.data.SchemaBuilder;
27+
import org.apache.kafka.connect.data.Struct;
28+
import org.apache.kafka.connect.source.SourceRecord;
29+
import org.apache.kafka.connect.source.SourceTask;
30+
31+
public class TestKeyToValueConnector extends AbstractTestSourceConnector {
32+
33+
static final long MESSAGES_TO_PRODUCE = 10L;
34+
35+
static final String TARGET_TOPIC = "key-to-value-target-topic";
36+
37+
@Override
38+
public Class<? extends Task> taskClass() {
39+
return TestKeyToValueConnector.TestSourceConnectorTask.class;
40+
}
41+
42+
public static class TestSourceConnectorTask extends SourceTask {
43+
private int counter = 0;
44+
45+
private final Schema keySchema = SchemaBuilder.struct().field("a1", SchemaBuilder.STRING_SCHEMA)
46+
.field("a2", SchemaBuilder.STRING_SCHEMA)
47+
.field("a3", SchemaBuilder.STRING_SCHEMA).schema();
48+
private final Struct key = new Struct(keySchema).put("a1", "a1").put("a2", "a2").put("a3", "a3");
49+
private final Schema valueSchema = SchemaBuilder.struct().field("b1", SchemaBuilder.STRING_SCHEMA)
50+
.field("b2", SchemaBuilder.STRING_SCHEMA)
51+
.field("b3", SchemaBuilder.STRING_SCHEMA).schema();
52+
private final Struct value = new Struct(valueSchema).put("b1", "b1").put("b2", "b2").put("b3", "b3");
53+
54+
@Override
55+
public void start(final Map<String, String> props) {
56+
}
57+
58+
@Override
59+
public List<SourceRecord> poll() {
60+
if (counter >= MESSAGES_TO_PRODUCE) {
61+
return null; // indicate pause
62+
}
63+
64+
final Map<String, String> sourcePartition = new HashMap<>();
65+
sourcePartition.put("partition", "0");
66+
final Map<String, String> sourceOffset = new HashMap<>();
67+
sourceOffset.put("offset", Integer.toString(counter));
68+
69+
counter += 1;
70+
71+
return Collections.singletonList(
72+
new SourceRecord(sourcePartition, sourceOffset,
73+
TARGET_TOPIC,
74+
keySchema, key,
75+
valueSchema, value)
76+
);
77+
}
78+
79+
@Override
80+
public void stop() {
81+
}
82+
83+
@Override
84+
public String version() {
85+
return null;
86+
}
87+
}
88+
}

0 commit comments

Comments
 (0)