Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ The transformation defines the following configurations:
- `fail` - fail with `DataException`.

Here is an example of this transformation configuration:

```properties
transforms=TombstoneHandler
transforms.TombstoneHandler.type=io.aiven.kafka.connect.transforms.TombstoneHandler
Expand Down Expand Up @@ -139,6 +140,7 @@ This transformation converts a record into a tombstone by setting its value and
It can be used together with predicates, for example, to create a tombstone event from a delete event produced by a source connector.

Here is an example of this transformation configuration:

```properties
transforms=MakeTombstone
transforms.MakeTombstone.type=io.aiven.kafka.connect.transforms.MakeTombstone
Expand Down Expand Up @@ -188,6 +190,7 @@ transforms.ExtractTopicFromSchemaName.type=io.aiven.kafka.connect.transforms.Ext
transforms.ExtractTopicFromSchemaName.schema.name.topic-map=com.acme.schema.SchemaNameToTopic1:TheNameToReplace1,com.acme.schema.SchemaNameToTopic2:TheNameToReplace2

```

And here is an example of this transformation configuration (using :schema.name.regex)

```properties
Expand Down Expand Up @@ -221,6 +224,37 @@ transforms.caseTransform.type=io.aiven.kafka.connect.transforms.CaseTransform$Va
transforms.caseTransform.field.names=field_name_1, field_name_2
```

### `KeyToValue`

Updates the record value with information found in the record key.

This transformation extracts fields from the key and adds them to the value. This is similar to the standard [ValueToKey](https://kafka.apache.org/documentation/#org.apache.kafka.connect.transforms.ValueToKey) transformation from Kafka, but doesn't replace the value.

This supports extracting information from a record key with a schema (e.g. Avro) or without a schema (e.g. JSON), as well as from a record value with a schema or without a schema.

The transformation defines the following configurations:

- `key.fields` - The comma-separated name(s) of the fields in the record key that should be extracted, or `*` to use the entire key.
- `value.fields` - The comma-separated name(s) of the fields to add into the record value, in the same order as `key.fields`.

Any empty or missing value field uses the same name as the key field by default. If a `*` is specified as the key field, its default value field name is `_key`.

Here is an example of this transformation configuration that copies the `id`, `department` and `cost` fields from the key to the value, and renames the `department` field in the value to `dept`:

```properties
transforms=keyToValue
transforms.keyToValue.type=io.aiven.kafka.connect.transforms.KeyToValue
transforms.keyToValue.key.fields=id, department, cost
transforms.keyToValue.value.fields=id, dept
```

Here is an example of this transformation configuration that copies the entire key to the value, under the field `_key`:

```properties
transforms=copyKey
transforms.copyKey.type=io.aiven.kafka.connect.transforms.KeyToValue
transforms.copyKey.key.fields=*
```

## License

Expand All @@ -229,4 +263,3 @@ This project is licensed under the [Apache License, Version 2.0](LICENSE).
## Trademarks

Apache Kafka and Apache Kafka Connect are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -144,7 +145,7 @@ void setUp() throws ExecutionException, InterruptedException {
final NewTopic newTopic = new NewTopic(TestSourceConnector.NEW_TOPIC, 1, (short) 1);
final NewTopic originalTopicForExtractTopicFromValue =
new NewTopic(TopicFromValueSchemaConnector.TOPIC, 1, (short) 1);
final NewTopic newTopicForExtractTopicFromValue =
final NewTopic newTopicForExtractTopicFromValue =
new NewTopic(TopicFromValueSchemaConnector.NAME, 1, (short) 1);
adminClient.createTopics(Arrays.asList(originalTopic, newTopic, originalTopicForExtractTopicFromValue,
newTopicForExtractTopicFromValue)).all().get();
Expand Down Expand Up @@ -234,6 +235,39 @@ void testCaseTransform() throws ExecutionException, InterruptedException, IOExce
);
}

@Test
@Timeout(10)
void testKeyToValue() throws ExecutionException, InterruptedException, IOException {

final String topicName = TestKeyToValueConnector.TARGET_TOPIC;
adminClient.createTopics(List.of(new NewTopic(topicName, 1, (short) 1))).all().get();

final Map<String, String> connectorConfig = new HashMap<>();
connectorConfig.put("name", "test-source-connector");
connectorConfig.put("connector.class", TestKeyToValueConnector.class.getName());
connectorConfig.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
connectorConfig.put("tasks.max", "1");
connectorConfig.put("transforms", "keyToValue");
connectorConfig.put("transforms.keyToValue.key.fields", "a1,a3");
connectorConfig.put("transforms.keyToValue.value.fields", "b1");
connectorConfig.put("transforms.keyToValue.type", "io.aiven.kafka.connect.transforms.KeyToValue");

connectRunner.createConnector(connectorConfig);

waitForCondition(
() -> consumer.endOffsets(Collections.singletonList(new TopicPartition(topicName, 0)))
.values().stream().reduce(Long::sum).map(s -> s == TestKeyToValueConnector.MESSAGES_TO_PRODUCE)
.orElse(false), 5000, "Messages appear in target topic"
);

final String payload = "'payload':{'b1':'a1','b2':'b2','b3':'b3','a3':'a3'}".replace('\'', '"');
consumer.subscribe(Collections.singletonList(topicName));
for (final ConsumerRecord<byte[], byte[]> consumerRecord : consumer.poll(Duration.ofSeconds(1))) {
assertThat(consumerRecord.value()).asString().contains(payload);
}
}

final void checkMessageTransformInTopic(final TopicPartition topicPartition, final long expectedNumberOfMessages)
throws InterruptedException, IOException {
waitForCondition(
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be KeyToValueConnectorTest rather than TestKeyToValueConnector?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, this isn't a test :/

It needs a better name, or even optimised with a better strategy for all integration tests... Do you have any suggestions for the name for now? MockKeyToValueSource isn't quite right either :/

Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2025 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.transforms;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

public class TestKeyToValueConnector extends AbstractTestSourceConnector {

static final long MESSAGES_TO_PRODUCE = 10L;

static final String TARGET_TOPIC = "key-to-value-target-topic";

@Override
public Class<? extends Task> taskClass() {
return TestKeyToValueConnector.TestSourceConnectorTask.class;
}

public static class TestSourceConnectorTask extends SourceTask {
private int counter = 0;

private final Schema keySchema = SchemaBuilder.struct().field("a1", SchemaBuilder.STRING_SCHEMA)
.field("a2", SchemaBuilder.STRING_SCHEMA)
.field("a3", SchemaBuilder.STRING_SCHEMA).schema();
private final Struct key = new Struct(keySchema).put("a1", "a1").put("a2", "a2").put("a3", "a3");
private final Schema valueSchema = SchemaBuilder.struct().field("b1", SchemaBuilder.STRING_SCHEMA)
.field("b2", SchemaBuilder.STRING_SCHEMA)
.field("b3", SchemaBuilder.STRING_SCHEMA).schema();
private final Struct value = new Struct(valueSchema).put("b1", "b1").put("b2", "b2").put("b3", "b3");

@Override
public void start(final Map<String, String> props) {
}

@Override
public List<SourceRecord> poll() {
if (counter >= MESSAGES_TO_PRODUCE) {
return null; // indicate pause
}

final Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put("partition", "0");
final Map<String, String> sourceOffset = new HashMap<>();
sourceOffset.put("offset", Integer.toString(counter));

counter += 1;

return Collections.singletonList(
new SourceRecord(sourcePartition, sourceOffset,
TARGET_TOPIC,
keySchema, key,
valueSchema, value)
);
}

@Override
public void stop() {
}

@Override
public String version() {
return null;
}
}
}
Loading