diff --git a/README.md b/README.md index 7d4a206..f3e48bf 100644 --- a/README.md +++ b/README.md @@ -99,6 +99,7 @@ The transformation defines the following configurations: - `fail` - fail with `DataException`. Here is an example of this transformation configuration: + ```properties transforms=TombstoneHandler transforms.TombstoneHandler.type=io.aiven.kafka.connect.transforms.TombstoneHandler @@ -139,6 +140,7 @@ This transformation converts a record into a tombstone by setting its value and It can be used together with predicates, for example, to create a tombstone event from a delete event produced by a source connector. Here is an example of this transformation configuration: + ```properties transforms=MakeTombstone transforms.MakeTombstone.type=io.aiven.kafka.connect.transforms.MakeTombstone @@ -188,6 +190,7 @@ transforms.ExtractTopicFromSchemaName.type=io.aiven.kafka.connect.transforms.Ext transforms.ExtractTopicFromSchemaName.schema.name.topic-map=com.acme.schema.SchemaNameToTopic1:TheNameToReplace1,com.acme.schema.SchemaNameToTopic2:TheNameToReplace2 ``` + And here is an example of this transformation configuration (using :schema.name.regex) ```properties @@ -221,6 +224,37 @@ transforms.caseTransform.type=io.aiven.kafka.connect.transforms.CaseTransform$Va transforms.caseTransform.field.names=field_name_1, field_name_2 ``` +### `KeyToValue` + +Updates the record value with information found in the record key. + +This transformation extracts fields from the key and adds them to the value. This is similar to the standard [ValueToKey](https://kafka.apache.org/documentation/#org.apache.kafka.connect.transforms.ValueToKey) transformation from Kafka, but doesn't replace the value. + +This supports extracting information from a record key with a schema (e.g. Avro) or without a schema (e.g. JSON), as well as from a record value with a schema or without a schema. + +The transformation defines the following configurations: + +- `key.fields` - The comma-separated name(s) of the fields in the record key that should be extracted, or `*` to use the entire key. +- `value.fields` - The comma-separated name(s) of the fields to add into the record value, in the same order as `key.fields`. + +Any empty or missing value field uses the same name as the key field by default. If a `*` is specified as the key field, its default value field name is `_key`. + +Here is an example of this transformation configuration that copies the `id`, `department` and `cost` fields from the key to the value, and renames the `department` field in the value to `dept`: + +```properties +transforms=keyToValue +transforms.keyToValue.type=io.aiven.kafka.connect.transforms.KeyToValue +transforms.keyToValue.key.fields=id, department, cost +transforms.keyToValue.value.fields=id, dept +``` + +Here is an example of this transformation configuration that copies the entire key to the value, under the field `_key`: + +```properties +transforms=copyKey +transforms.copyKey.type=io.aiven.kafka.connect.transforms.KeyToValue +transforms.copyKey.key.fields=* +``` ## License @@ -229,4 +263,3 @@ This project is licensed under the [Apache License, Version 2.0](LICENSE). ## Trademarks Apache Kafka and Apache Kafka Connect are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. - diff --git a/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java b/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java index f5539c1..6ffaf15 100644 --- a/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java +++ b/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -144,7 +145,7 @@ void setUp() throws ExecutionException, InterruptedException { final NewTopic newTopic = new NewTopic(TestSourceConnector.NEW_TOPIC, 1, (short) 1); final NewTopic originalTopicForExtractTopicFromValue = new NewTopic(TopicFromValueSchemaConnector.TOPIC, 1, (short) 1); - final NewTopic newTopicForExtractTopicFromValue = + final NewTopic newTopicForExtractTopicFromValue = new NewTopic(TopicFromValueSchemaConnector.NAME, 1, (short) 1); adminClient.createTopics(Arrays.asList(originalTopic, newTopic, originalTopicForExtractTopicFromValue, newTopicForExtractTopicFromValue)).all().get(); @@ -234,6 +235,39 @@ void testCaseTransform() throws ExecutionException, InterruptedException, IOExce ); } + @Test + @Timeout(10) + void testKeyToValue() throws ExecutionException, InterruptedException, IOException { + + final String topicName = TestKeyToValueConnector.TARGET_TOPIC; + adminClient.createTopics(List.of(new NewTopic(topicName, 1, (short) 1))).all().get(); + + final Map connectorConfig = new HashMap<>(); + connectorConfig.put("name", "test-source-connector"); + connectorConfig.put("connector.class", TestKeyToValueConnector.class.getName()); + connectorConfig.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); + connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + connectorConfig.put("tasks.max", "1"); + connectorConfig.put("transforms", "keyToValue"); + connectorConfig.put("transforms.keyToValue.key.fields", "a1,a3"); + connectorConfig.put("transforms.keyToValue.value.fields", "b1"); + connectorConfig.put("transforms.keyToValue.type", "io.aiven.kafka.connect.transforms.KeyToValue"); + + connectRunner.createConnector(connectorConfig); + + waitForCondition( + () -> consumer.endOffsets(Collections.singletonList(new TopicPartition(topicName, 0))) + .values().stream().reduce(Long::sum).map(s -> s == TestKeyToValueConnector.MESSAGES_TO_PRODUCE) + .orElse(false), 5000, "Messages appear in target topic" + ); + + final String payload = "'payload':{'b1':'a1','b2':'b2','b3':'b3','a3':'a3'}".replace('\'', '"'); + consumer.subscribe(Collections.singletonList(topicName)); + for (final ConsumerRecord consumerRecord : consumer.poll(Duration.ofSeconds(1))) { + assertThat(consumerRecord.value()).asString().contains(payload); + } + } + final void checkMessageTransformInTopic(final TopicPartition topicPartition, final long expectedNumberOfMessages) throws InterruptedException, IOException { waitForCondition( diff --git a/src/integration-test/java/io/aiven/kafka/connect/transforms/TestKeyToValueConnector.java b/src/integration-test/java/io/aiven/kafka/connect/transforms/TestKeyToValueConnector.java new file mode 100644 index 0000000..849a85e --- /dev/null +++ b/src/integration-test/java/io/aiven/kafka/connect/transforms/TestKeyToValueConnector.java @@ -0,0 +1,88 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.transforms; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; + +public class TestKeyToValueConnector extends AbstractTestSourceConnector { + + static final long MESSAGES_TO_PRODUCE = 10L; + + static final String TARGET_TOPIC = "key-to-value-target-topic"; + + @Override + public Class taskClass() { + return TestKeyToValueConnector.TestSourceConnectorTask.class; + } + + public static class TestSourceConnectorTask extends SourceTask { + private int counter = 0; + + private final Schema keySchema = SchemaBuilder.struct().field("a1", SchemaBuilder.STRING_SCHEMA) + .field("a2", SchemaBuilder.STRING_SCHEMA) + .field("a3", SchemaBuilder.STRING_SCHEMA).schema(); + private final Struct key = new Struct(keySchema).put("a1", "a1").put("a2", "a2").put("a3", "a3"); + private final Schema valueSchema = SchemaBuilder.struct().field("b1", SchemaBuilder.STRING_SCHEMA) + .field("b2", SchemaBuilder.STRING_SCHEMA) + .field("b3", SchemaBuilder.STRING_SCHEMA).schema(); + private final Struct value = new Struct(valueSchema).put("b1", "b1").put("b2", "b2").put("b3", "b3"); + + @Override + public void start(final Map props) { + } + + @Override + public List poll() { + if (counter >= MESSAGES_TO_PRODUCE) { + return null; // indicate pause + } + + final Map sourcePartition = new HashMap<>(); + sourcePartition.put("partition", "0"); + final Map sourceOffset = new HashMap<>(); + sourceOffset.put("offset", Integer.toString(counter)); + + counter += 1; + + return Collections.singletonList( + new SourceRecord(sourcePartition, sourceOffset, + TARGET_TOPIC, + keySchema, key, + valueSchema, value) + ); + } + + @Override + public void stop() { + } + + @Override + public String version() { + return null; + } + } +} diff --git a/src/main/java/io/aiven/kafka/connect/transforms/KeyToValue.java b/src/main/java/io/aiven/kafka/connect/transforms/KeyToValue.java new file mode 100644 index 0000000..ca914c7 --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/transforms/KeyToValue.java @@ -0,0 +1,311 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.transforms; + + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Values; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.transforms.Transformation; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @param ConnectRecord + */ +public class KeyToValue> implements Transformation { + + private static final Logger LOGGER = LoggerFactory.getLogger(KeyToValue.class); + + private KeyToValueConfig config; + + /** + * A list of the new fields that will be added to the value schema, mapped to the key fields used to populate them. + */ + private LinkedHashMap valueFields; + + /** + * Whether any keyFields are non-wildcard (that copies a field from the key). + */ + private boolean keyFieldsHasNamedFields; + + /** + * Whether any keyFields are a wildcard (that copies the entire key). + */ + private boolean keyFieldsHasWildcard; + + + private Cache, Schema> schemaCache; + + + @Override + public ConfigDef config() { + return KeyToValueConfig.config(); + } + + @Override + public void configure(final Map settings) { + this.config = new KeyToValueConfig(settings); + // Construct the mapping from the list in the config + final List keyFieldsList = config.keyFields(); + final List valueFieldsList = config.valueFields(); + valueFields = new LinkedHashMap<>(); + + for (int i = 0; i < keyFieldsList.size(); i++) { + final String kf = keyFieldsList.get(i); + final String vfIfPresent = i < valueFieldsList.size() ? valueFieldsList.get(i) : kf; + final String vf = "*".equals(vfIfPresent) ? KeyToValueConfig.DEFAULT_WHOLE_KEY_FIELD : vfIfPresent; + + if (valueFields.containsKey(vf)) { + throw new DataException( + String.format("More than one key value is copied to the value field name '%s'", vf)); + } + valueFields.put(vf, kf); + } + + keyFieldsHasNamedFields = valueFields.values().stream().anyMatch(kf -> !kf.equals("*")); + keyFieldsHasWildcard = valueFields.containsValue("*"); + + schemaCache = new SynchronizedCache<>(new LRUCache<>(16)); + } + + + /** + * Validation check to ensure that if any named fields exist, they can be extracted from the structured data in the + * key. + * + * @param record The incoming record to be transformed. + * @throws DataException if the key does not support extracting named fields. + */ + private void validateNoUnextractableKeyFields(final R record) { + if (keyFieldsHasNamedFields && record.keySchema() != null && record.keySchema().type() != Schema.Type.STRUCT) { + throw new DataException( + String.format("Named key fields %s cannot be copied from the key schema: %s", valueFields.values(), + record.keySchema().type())); + } + + if (keyFieldsHasNamedFields && record.keySchema() == null + && !(record.key() instanceof Map || record.key() instanceof Struct)) { + throw new DataException( + String.format("Named key fields %s cannot be copied from the key class: %s", valueFields.values(), + record.key().getClass().getName())); + } + } + + /** + * Validation check to fail quickly when the entire key is copied into a structured value with a schema, but has a + * schemaless class. + * + * @param record The incoming record to be transformed. + * @throws DataException if the value class requires the key to have a schema but the key is a schemaless class. + */ + private void validateKeySchemaRequirementsMet(final R record) { + if (keyFieldsHasWildcard && record.keySchema() == null && record.key() instanceof Map) { + if (record.valueSchema() != null && record.valueSchema().type() == Schema.Type.STRUCT) { + throw new DataException("The value requires a schema, but the key class is a schemaless Map"); + } + } + } + + /** + * Validation check to ensure that the value can receive columns from the key, i.e. as either a Struct or Map. + */ + private void validateStructuredValue(final R record) { + if (record.valueSchema() == null && !(record.value() instanceof Map) + || record.valueSchema() != null && record.valueSchema().type() != Schema.Type.STRUCT) { + throw new DataException("The value needs to be a Struct or Map in order to append fields"); + } + } + + @Override + public R apply(final R record) { + validateNoUnextractableKeyFields(record); + validateKeySchemaRequirementsMet(record); + validateStructuredValue(record); + + if (record.value() instanceof Struct) { + if (record.keySchema() != null) { + return applyToStruct(record, record.keySchema()); + } else { + final Schema inferredSchema = Values.inferSchema(record.key()); + if (inferredSchema == null) { + throw new DataException( + "Cannot infer schema for unsupported key class: " + record.key().getClass().getName()); + } + return applyToStruct(record, inferredSchema); + } + } else { + return applyToMap(record); + } + } + + @Override + public void close() { + schemaCache = null; + } + + /** + * Merges the key and value schemas into a new schema, according to the configuration. + * + * @param record The incoming record to be transformed. + * @param keySchema The schema to be used for the incoming key. This may have been inferred from the key value. + * @return The transformed record with the new schema. + */ + private R applyToStruct(final R record, final Schema keySchema) { + Schema newSchema; + + final List schemaKey = List.of(keySchema, record.valueSchema()); + newSchema = schemaCache.get(schemaKey); + if (newSchema == null) { + newSchema = mergeSchema(keySchema, record.valueSchema()); + LOGGER.debug("Merging into new schema {}", newSchema); + schemaCache.put(schemaKey, newSchema); + } + + final Struct value = (Struct) record.value(); + final Struct newValue = new Struct(newSchema); + + if (record.key() instanceof Struct) { + final Struct key = (Struct) record.key(); + for (final Field f : newSchema.fields()) { + final String kf = valueFields.get(f.name()); + if (kf == null) { + newValue.put(f.name(), value.get(f.name())); + } else if (kf.equals("*")) { + newValue.put(f.name(), key); + } else { + newValue.put(f.name(), key.get(kf)); + } + } + } else { + for (final Field f : newSchema.fields()) { + final String kf = valueFields.get(f.name()); + if (kf == null) { + newValue.put(f.name(), value.get(f.name())); + } else if (kf.equals("*")) { + newValue.put(f.name(), record.key()); + } + } + } + + // Replace the value in the record + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), + newValue.schema(), newValue, record.timestamp()); + } + + /** + * Merges the key and value schemas into a new schema, according to the configuration of keyFields to copy into the + * new value schema, and how they are renamed via valueFields. + * + * @param keySchema The key schema. + * @param valueSchema The original value schema. + * @return The merged schema with any new types copied from the key schema. + */ + private Schema mergeSchema(final Schema keySchema, final Schema valueSchema) { + + // Build a map of all the field names and schemas for the output, starting with the ones already present + // in the value. + final Map updatedFieldSchemas = new LinkedHashMap<>(); + for (final Field vf : valueSchema.fields()) { + updatedFieldSchemas.put(vf.name(), vf.schema()); + } + + // Add all the value fields that are going to be extracted by the key (overwriting any old ones). + for (final Map.Entry names : valueFields.entrySet()) { + final String kf = names.getValue(); + final String vf = names.getKey(); + if (kf.equals("*")) { + updatedFieldSchemas.put(vf, keySchema); + } else if (keySchema.field(kf) == null) { + throw new DataException(String.format("Key field '%s' not found in key schema", kf)); + } else { + updatedFieldSchemas.put(vf, keySchema.field(kf).schema()); + } + } + + // Create a copy of the output schema. + final SchemaBuilder preVsb = SchemaBuilder.struct() + .name(valueSchema.name()) + .version(valueSchema.version()) + .doc(valueSchema.doc()) + .parameters(valueSchema.parameters() != null ? valueSchema.parameters() : Collections.emptyMap()); + final SchemaBuilder vsb = valueSchema.isOptional() ? preVsb.optional() : preVsb; + + // Apply the fields retaining the order of the original value schema and valueFields configuration + for (final Map.Entry entry : updatedFieldSchemas.entrySet()) { + vsb.field(entry.getKey(), entry.getValue()); + } + + return vsb.build(); + } + + @SuppressWarnings("unchecked") + private R applyToMap(final R record) { + + final Map value = (Map) record.value(); + final Map newValue = new HashMap<>(value); + + if (record.key() instanceof Struct) { + final Struct key = (Struct) record.key(); + for (final String vf : valueFields.keySet()) { + final String kf = valueFields.get(vf); + if (kf.equals("*")) { + newValue.put(vf, key); + } else { + newValue.put(vf, key.get(kf)); + } + } + } else if (record.key() instanceof Map) { + final Map key = (Map) record.key(); + for (final String vf : valueFields.keySet()) { + final String kf = valueFields.get(vf); + if (kf.equals("*")) { + newValue.put(vf, key); + } else { + newValue.put(vf, key.get(kf)); + } + } + } else { + for (final String vf : valueFields.keySet()) { + final String kf = valueFields.get(vf); + if (kf.equals("*")) { + newValue.put(vf, record.key()); + } + } + } + + // Replace the value in the record + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), + null, newValue, record.timestamp()); + } +} + diff --git a/src/main/java/io/aiven/kafka/connect/transforms/KeyToValueConfig.java b/src/main/java/io/aiven/kafka/connect/transforms/KeyToValueConfig.java new file mode 100644 index 0000000..d1e29c5 --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/transforms/KeyToValueConfig.java @@ -0,0 +1,69 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.transforms; + +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +/** + *

Configure the SMT to copy fields from the key into the value.

+ * + * @link org.apache.kafka.connect.transforms.ValueToKey transform for a similar approach + */ +public class KeyToValueConfig extends AbstractConfig { + + + public static final String KEY_FIELDS_CONFIG = "key.fields"; + public static final String VALUE_FIELDS_CONFIG = "value.fields"; + /** + * If the value (destination) column isn't specified and the wildcard '*' is used, this will be the name of the + * column. + */ + public static final String DEFAULT_WHOLE_KEY_FIELD = "_key"; + private static final String KEY_FIELDS_DOC = "Comma-separated list of field names on the record key to copy into " + + "the record value (or * to copy the entire key)."; + private static final String VALUE_FIELDS_DOC = "Corresponding destination field names to be added or replaced on " + + "the record value, or empty if the same field name should be used as the key."; + + KeyToValueConfig(final Map originals) { + super(config(), originals); + } + + static ConfigDef config() { + return new ConfigDef().define(KEY_FIELDS_CONFIG, + ConfigDef.Type.LIST, + ConfigDef.NO_DEFAULT_VALUE, + ConfigDef.Importance.HIGH, + KEY_FIELDS_DOC) + .define(VALUE_FIELDS_CONFIG, + ConfigDef.Type.LIST, + "", + ConfigDef.Importance.HIGH, + VALUE_FIELDS_DOC); + } + + final List keyFields() { + return getList(KEY_FIELDS_CONFIG); + } + + final List valueFields() { + return getList(VALUE_FIELDS_CONFIG); + } +} diff --git a/src/test/java/io/aiven/kafka/connect/transforms/KeyToValueTest.java b/src/test/java/io/aiven/kafka/connect/transforms/KeyToValueTest.java new file mode 100644 index 0000000..078e123 --- /dev/null +++ b/src/test/java/io/aiven/kafka/connect/transforms/KeyToValueTest.java @@ -0,0 +1,268 @@ +/* + * Copyright 2020 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.transforms; + +import java.util.Collections; +import java.util.Map; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.SinkRecord; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class KeyToValueTest { + private static final Struct ABC_STRUCT = createStruct("a", 1, "b", 2, "c", 3); + private static final Struct XYZ_STRUCT = createStruct("x", 100, "y", 200, "z", 300); + private static final SinkRecord ABC_XYZ_RECORD = new SinkRecord("", 0, ABC_STRUCT.schema(), ABC_STRUCT, + XYZ_STRUCT.schema(), XYZ_STRUCT, 0); + private final KeyToValue k2v = new KeyToValue<>(); + + /** + * Build a test {@link Struct} from a list of field names and values, inferring the schema from the values. + * + * @param kvs A list of alternating keys and values (e.g. "a", 1, "b", 2). + * @return A structure created from the keys and values, where the values are either int32, string, or another + * struct. + */ + private static Struct createStruct(final Object... kvs) { + SchemaBuilder sb = SchemaBuilder.struct(); + for (int i = 0; i < kvs.length; i += 2) { + Schema fieldSchema = Schema.STRING_SCHEMA; + if (kvs[i + 1] instanceof Integer) { + fieldSchema = Schema.INT32_SCHEMA; + } else if (kvs[i + 1] instanceof Struct) { + fieldSchema = ((Struct) kvs[i + 1]).schema(); + } + sb = sb.field(kvs[i].toString(), fieldSchema); + } + + final Schema schema = sb.build(); + final Struct struct = new Struct(schema); + for (int i = 0; i < kvs.length; i += 2) { + struct.put(kvs[i].toString(), kvs[i + 1]); + } + + return struct; + } + + @AfterEach + public void teardown() { + k2v.close(); + } + + @Test + public void schemalessMapToSchemalessMap() { + final Map key = Map.of("a", 1, "b", 2, "c", 3); + final Map value = Map.of("x", 100, "y", 200, "z", 300); + final SinkRecord input = new SinkRecord("", 0, null, key, null, value, 0); + + final Map expectedValue = Map.of("x", 100, "y", 200, "z", 300, "a", 1, "b", 2); + + k2v.configure(Collections.singletonMap("key.fields", "a,b")); + final SinkRecord output = k2v.apply(input); + + assertNull(output.keySchema()); + assertEquals(key, output.key()); + assertNull(output.valueSchema()); + assertEquals(expectedValue, output.value()); + } + + @Test + public void schemaStructToSchemaStruct() { + final SinkRecord input = ABC_XYZ_RECORD; + + final Struct expectedValue = createStruct("x", 100, "y", 200, "z", 300, "a", 1, "b", 2); + + k2v.configure(Collections.singletonMap("key.fields", "a,b")); + final SinkRecord output = k2v.apply(input); + + assertEquals(input.keySchema(), output.keySchema()); + assertEquals(input.key(), output.key()); + assertEquals(expectedValue.schema(), output.valueSchema()); + assertEquals(expectedValue, output.value()); + } + + @Test + public void schemaIntToSchemaStructWholeKey() { + final int key = 123; + final SinkRecord input = new SinkRecord("", 0, Schema.INT32_SCHEMA, key, XYZ_STRUCT.schema(), + XYZ_STRUCT, 0); + + final Struct expectedValue = createStruct("x", 100, "y", 200, "z", 300, "_key", 123); + + k2v.configure(Collections.singletonMap("key.fields", "*")); + final SinkRecord output = k2v.apply(input); + + assertEquals(input.keySchema(), output.keySchema()); + assertEquals(input.key(), output.key()); + assertEquals(expectedValue.schema(), output.valueSchema()); + assertEquals(expectedValue, output.value()); + } + + @Test + public void schemaStructToSchemaStructWithRenaming() { + final SinkRecord input = ABC_XYZ_RECORD; + + final Struct expectedValue = createStruct("x", 1, "y", 2, "z", 300, "m", 1, "n", 2, "a", 1); + + k2v.configure(Map.of("key.fields", "a,b,a,b,a", "value.fields", "m,n,x,y")); + final SinkRecord output = k2v.apply(input); + + assertEquals(input.keySchema(), output.keySchema()); + assertEquals(input.key(), output.key()); + assertEquals(expectedValue.schema(), output.valueSchema()); + assertEquals(expectedValue, output.value()); + } + + @Test + public void schemaIntToSchemaStructWholeKeyWithRenaming() { + final int key = 123; + final SinkRecord input = new SinkRecord("", 0, Schema.INT32_SCHEMA, key, XYZ_STRUCT.schema(), + XYZ_STRUCT, 0); + + final Struct expectedValue = createStruct("x", 123, "y", 123, "z", 300, "m", 123, "n", 123, "_key", 123); + + k2v.configure(Map.of("key.fields", "*,*,*,*,*", "value.fields", "m,n,x,y")); + final SinkRecord output = k2v.apply(input); + + assertEquals(input.keySchema(), output.keySchema()); + assertEquals(input.key(), output.key()); + assertEquals(expectedValue.schema(), output.valueSchema()); + assertEquals(expectedValue, output.value()); + } + + @Test + public void schemalessIntToSchemaStructWholeKey() { + final int key = 123; + final SinkRecord input = new SinkRecord("", 0, null, key, XYZ_STRUCT.schema(), XYZ_STRUCT, 0); + + final Struct expectedValue = createStruct("x", 100, "y", 200, "z", 300, "_key", 123); + + k2v.configure(Collections.singletonMap("key.fields", "*")); + final SinkRecord output = k2v.apply(input); + + assertEquals(input.keySchema(), output.keySchema()); + assertEquals(input.key(), output.key()); + assertEquals(expectedValue.schema(), output.valueSchema()); + assertEquals(expectedValue, output.value()); + } + + @Test + public void schemaStructToSchemaStructWholeKey() { + final SinkRecord input = ABC_XYZ_RECORD; + + final Struct expectedValue = createStruct("x", 100, "y", 200, "z", 300, "_key", ABC_STRUCT); + + k2v.configure(Collections.singletonMap("key.fields", "*")); + final SinkRecord output = k2v.apply(input); + + assertEquals(input.keySchema(), output.keySchema()); + assertEquals(input.key(), output.key()); + assertEquals(expectedValue.schema(), output.valueSchema()); + assertEquals(expectedValue, output.value()); + } + + + @Test + public void errorMissingFieldKey() { + k2v.configure(Collections.singletonMap("key.fields", "a,doesnotexist")); + final DataException ex = assertThrows(DataException.class, () -> k2v.apply(ABC_XYZ_RECORD)); + assertEquals("Key field 'doesnotexist' not found in key schema", ex.getMessage()); + } + + /** + * Test trying to extract fields from a key when the key value doesn't support fields. + */ + @Test + public void errorNonExtractableKeys() { + final int key = 123; + final Struct value = XYZ_STRUCT; + final SinkRecord input = new SinkRecord("", 0, Schema.OPTIONAL_INT32_SCHEMA, key, value.schema(), value, 0); + + k2v.configure(Collections.singletonMap("key.fields", "a, b")); + final DataException ex = assertThrows(DataException.class, () -> k2v.apply(input)); + assertEquals("Named key fields [a, b] cannot be copied from the key schema: INT32", ex.getMessage()); + + final SinkRecord input2 = new SinkRecord("", 0, null, key, value.schema(), value, 0); + + final DataException ex2 = assertThrows(DataException.class, () -> k2v.apply(input2)); + assertEquals("Named key fields [a, b] cannot be copied from the key class: java.lang.Integer", + ex2.getMessage()); + } + + /** + * Test trying to extract a whole schemaless key to a value that requires a schema. This is not allowed. + */ + @Test + public void errorCantDeduceKeySchema() { + final Map key = Map.of("a", 1, "b", 2, "c", 3); + final Struct value = XYZ_STRUCT; + final SinkRecord input = new SinkRecord("", 0, null, key, value.schema(), value, 0); + + k2v.configure(Collections.singletonMap("key.fields", "*")); + final DataException ex = assertThrows(DataException.class, () -> k2v.apply(input)); + assertEquals("The value requires a schema, but the key class is a schemaless Map", ex.getMessage()); + + k2v.configure(Collections.singletonMap("key.fields", "a,*")); + final DataException ex2 = assertThrows(DataException.class, () -> k2v.apply(input)); + assertEquals("The value requires a schema, but the key class is a schemaless Map", ex2.getMessage()); + + // An unknown or unsupported primitive. + k2v.configure(Collections.singletonMap("key.fields", "*")); + final SinkRecord input3 = new SinkRecord("", 0, null, new Object(), value.schema(), value, 0); + final DataException ex3 = assertThrows(DataException.class, () -> k2v.apply(input3)); + assertEquals("Cannot infer schema for unsupported key class: java.lang.Object", ex3.getMessage()); + } + + /** + * Test trying to copy to a value that can't be appended to. + */ + @Test + public void errorCantAppendToValue() { + final Struct key = ABC_STRUCT; + final int value = 123; + final SinkRecord input = new SinkRecord("", 0, key.schema(), key, null, value, 0); + + k2v.configure(Collections.singletonMap("key.fields", "a,b")); + final DataException ex = assertThrows(DataException.class, () -> k2v.apply(input)); + assertEquals("The value needs to be a Struct or Map in order to append fields", ex.getMessage()); + + final SinkRecord input2 = new SinkRecord("", 0, key.schema(), key, null, value, 0); + + final DataException ex2 = assertThrows(DataException.class, () -> k2v.apply(input2)); + assertEquals("The value needs to be a Struct or Map in order to append fields", ex2.getMessage()); + } + + @Test + public void errorDuplicateValueMapping() { + final DataException ex = assertThrows(DataException.class, + () -> k2v.configure(Map.of("key.fields", "a,a", "value.fields", "x,x"))); + assertEquals("More than one key value is copied to the value field name 'x'", ex.getMessage()); + + final DataException ex2 = assertThrows(DataException.class, + () -> k2v.configure(Map.of("key.fields", "*,*,*,*", "value.fields", "a,b"))); + assertEquals("More than one key value is copied to the value field name '_key'", ex2.getMessage()); + } +}