diff --git a/README.md b/README.md index 9f5764d..7d4a206 100644 --- a/README.md +++ b/README.md @@ -196,6 +196,32 @@ transforms.ExtractTopicFromValueSchema.type=io.aiven.kafka.connect.transforms.Ex transforms.ExtractTopicFromValueSchema.schema.name.regex=(?:[.]|^)([^.]*)$ ``` +### `CaseTransform` + +This transformation transforms the case a string value from the record field to uppercase or lowercase. + +This transform can modify fields of `STRING` type. + +It supports fields with (e.g. Avro) or without schema (e.g. JSON). + +Exists in two variants: +- `io.aiven.kafka.connect.transforms.CaseTransform$Key` - works on keys; +- `io.aiven.kafka.connect.transforms.CaseTransform$Value` - works on values. + +The transformation defines the following configurations: + +- `field.names` - The name of the fields which should be case transformed. +- `case` - either `lower` or `upper` for transforming the case as desired. + +Here is an example of this transformation configuration: + +```properties +transforms=caseTransform +transforms.caseTransform.type=io.aiven.kafka.connect.transforms.CaseTransform$Value +transforms.caseTransform.field.names=field_name_1, field_name_2 +``` + + ## License This project is licensed under the [Apache License, Version 2.0](LICENSE). diff --git a/build.gradle b/build.gradle index 86f69be..c71481a 100644 --- a/build.gradle +++ b/build.gradle @@ -42,6 +42,7 @@ sourceCompatibility = JavaVersion.VERSION_11 targetCompatibility = JavaVersion.VERSION_11 ext { + jacksonVersion = "2.18.2" kafkaVersion = "2.0.1" testcontainersVersion = "1.20.4" debeziumVersion = "1.3.0.Final" @@ -114,6 +115,7 @@ dependencies { integrationTestImplementation "org.apache.kafka:connect-transforms:$kafkaVersion" integrationTestImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion" + integrationTestImplementation("com.fasterxml.jackson.core:jackson-core:$jacksonVersion") integrationTestImplementation "org.testcontainers:kafka:$testcontainersVersion" // this is not Kafka version // Make test utils from 'test' available in 'integration-test' integrationTestImplementation sourceSets.test.output diff --git a/src/integration-test/java/io/aiven/kafka/connect/transforms/AbstractTestSourceConnector.java b/src/integration-test/java/io/aiven/kafka/connect/transforms/AbstractTestSourceConnector.java new file mode 100644 index 0000000..7f08a7a --- /dev/null +++ b/src/integration-test/java/io/aiven/kafka/connect/transforms/AbstractTestSourceConnector.java @@ -0,0 +1,57 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.transforms; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.source.SourceConnector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractTestSourceConnector extends SourceConnector { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTestSourceConnector.class); + + @Override + public void start(final Map props) { + // no-op + } + + @Override + public void stop() { + // no-op + } + + @Override + public List> taskConfigs(final int maxTasks) { + return Collections.singletonList(Collections.emptyMap()); + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + + @Override + public String version() { + return null; + } +} diff --git a/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java b/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java index c5b1edb..f5539c1 100644 --- a/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java +++ b/src/integration-test/java/io/aiven/kafka/connect/transforms/IntegrationTest.java @@ -19,7 +19,9 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; +import java.time.Duration; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -30,9 +32,12 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -130,6 +135,9 @@ void setUp() throws ExecutionException, InterruptedException { "org.apache.kafka.common.serialization.ByteArrayDeserializer"); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumer = new KafkaConsumer<>(consumerProps); final NewTopic originalTopic = new NewTopic(TestSourceConnector.ORIGINAL_TOPIC, 1, (short) 1); @@ -156,7 +164,7 @@ final void tearDown() { @Test @Timeout(10) - final void testExtractTopic() throws ExecutionException, InterruptedException, IOException { + final void testExtractTopic() throws ExecutionException, InterruptedException { final Map connectorConfig = new HashMap<>(); connectorConfig.put("name", "test-source-connector"); connectorConfig.put("connector.class", TestSourceConnector.class.getName()); @@ -194,6 +202,55 @@ final void testExtractTopicFromValueSchemaName() throws ExecutionException, Inte } + @Test + @Timeout(10) + void testCaseTransform() throws ExecutionException, InterruptedException, IOException { + adminClient.createTopics(Arrays.asList(new NewTopic(TestCaseTransformConnector.SOURCE_TOPIC, 1, (short) 1))) + .all().get(); + adminClient.createTopics(Arrays.asList(new NewTopic(TestCaseTransformConnector.TARGET_TOPIC, 1, (short) 1))) + .all().get(); + + final Map connectorConfig = new HashMap<>(); + connectorConfig.put("name", "test-source-connector"); + connectorConfig.put("connector.class", TestCaseTransformConnector.class.getName()); + connectorConfig.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); + connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + connectorConfig.put("value.converter.value.subject.name.strategy", + "io.confluent.kafka.serializers.subject.RecordNameStrategy"); + connectorConfig.put("tasks.max", "1"); + connectorConfig.put("transforms", "regexRouteToTargetTopic, caseTransform"); + connectorConfig.put("transforms.caseTransform.case", "upper"); + connectorConfig.put("transforms.caseTransform.field.names", TestCaseTransformConnector.TRANSFORM_FIELD); + connectorConfig.put("transforms.caseTransform.type", "io.aiven.kafka.connect.transforms.CaseTransform$Value"); + connectorConfig.put("transforms.regexRouteToTargetTopic.type", + "org.apache.kafka.connect.transforms.RegexRouter"); + connectorConfig.put("transforms.regexRouteToTargetTopic.regex", "(.*)-source-(.*)"); + connectorConfig.put("transforms.regexRouteToTargetTopic.replacement", String.format("$1-target-$2")); + + connectRunner.createConnector(connectorConfig); + checkMessageTransformInTopic( + new TopicPartition(TestCaseTransformConnector.TARGET_TOPIC, 0), + TestCaseTransformConnector.MESSAGES_TO_PRODUCE + ); + } + + final void checkMessageTransformInTopic(final TopicPartition topicPartition, final long expectedNumberOfMessages) + throws InterruptedException, IOException { + waitForCondition( + () -> consumer.endOffsets(Arrays.asList(topicPartition)) + .values().stream().reduce(Long::sum).map(s -> s == expectedNumberOfMessages) + .orElse(false), 5000, "Messages appear in target topic" + ); + consumer.subscribe(Collections.singletonList(topicPartition.topic())); + final ObjectMapper objectMapper = new ObjectMapper(); + final TypeReference> tr = new TypeReference<>() {}; + for (final ConsumerRecord consumerRecord : consumer.poll(Duration.ofSeconds(1))) { + final Map value = objectMapper.readValue(consumerRecord.value(), tr); + final Map payload = (Map) value.get("payload"); + assertThat(payload.get("transform")).isEqualTo("LOWER-CASE-DATA-TRANSFORMS-TO-UPPERCASE"); + } + } + final void checkMessageTopics(final TopicPartition originalTopicPartition, final TopicPartition newTopicPartition) throws InterruptedException { waitForCondition( diff --git a/src/integration-test/java/io/aiven/kafka/connect/transforms/TestCaseTransformConnector.java b/src/integration-test/java/io/aiven/kafka/connect/transforms/TestCaseTransformConnector.java new file mode 100644 index 0000000..9c07f1b --- /dev/null +++ b/src/integration-test/java/io/aiven/kafka/connect/transforms/TestCaseTransformConnector.java @@ -0,0 +1,85 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.transforms; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; + +public class TestCaseTransformConnector extends AbstractTestSourceConnector { + static final long MESSAGES_TO_PRODUCE = 10L; + + static final String SOURCE_TOPIC = "case-transform-source-topic"; + static final String TARGET_TOPIC = "case-transform-target-topic"; + static final String TRANSFORM_FIELD = "transform"; + + @Override + public Class taskClass() { + return TestCaseTransformConnector.TestSourceConnectorTask.class; + } + + public static class TestSourceConnectorTask extends SourceTask { + private int counter = 0; + + private final Schema valueSchema = SchemaBuilder.struct() + .field(TRANSFORM_FIELD, SchemaBuilder.STRING_SCHEMA) + .schema(); + private final Struct value = + new Struct(valueSchema).put(TRANSFORM_FIELD, "lower-case-data-transforms-to-uppercase"); + + @Override + public void start(final Map props) { + } + + @Override + public List poll() { + if (counter >= MESSAGES_TO_PRODUCE) { + return null; // indicate pause + } + + final Map sourcePartition = new HashMap<>(); + sourcePartition.put("partition", "0"); + final Map sourceOffset = new HashMap<>(); + sourceOffset.put("offset", Integer.toString(counter)); + + counter += 1; + + return Collections.singletonList( + new SourceRecord(sourcePartition, sourceOffset, + SOURCE_TOPIC, + valueSchema, value) + ); + } + + @Override + public void stop() { + } + + @Override + public String version() { + return null; + } + } +} diff --git a/src/integration-test/java/io/aiven/kafka/connect/transforms/TestSourceConnector.java b/src/integration-test/java/io/aiven/kafka/connect/transforms/TestSourceConnector.java index 8e2e642..c16f704 100644 --- a/src/integration-test/java/io/aiven/kafka/connect/transforms/TestSourceConnector.java +++ b/src/integration-test/java/io/aiven/kafka/connect/transforms/TestSourceConnector.java @@ -21,12 +21,10 @@ import java.util.List; import java.util.Map; -import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -35,41 +33,18 @@ * *

It just produces a fixed number of struct records. */ -public class TestSourceConnector extends SourceConnector { +public final class TestSourceConnector extends AbstractTestSourceConnector { static final long MESSAGES_TO_PRODUCE = 10L; static final String ORIGINAL_TOPIC = "original-topic"; static final String NEW_TOPIC = "new-topic"; static final String ROUTING_FIELD = "field-0"; - @Override - public void start(final Map props) { - } - @Override public Class taskClass() { return TestSourceConnectorTask.class; } - @Override - public List> taskConfigs(final int maxTasks) { - return Collections.singletonList(Collections.emptyMap()); - } - - @Override - public void stop() { - } - - @Override - public ConfigDef config() { - return new ConfigDef(); - } - - @Override - public String version() { - return null; - } - public static class TestSourceConnectorTask extends SourceTask { private int counter = 0; @@ -83,7 +58,7 @@ public void start(final Map props) { } @Override - public List poll() throws InterruptedException { + public List poll() { if (counter >= MESSAGES_TO_PRODUCE) { return null; // indicate pause } diff --git a/src/integration-test/java/io/aiven/kafka/connect/transforms/TopicFromValueSchemaConnector.java b/src/integration-test/java/io/aiven/kafka/connect/transforms/TopicFromValueSchemaConnector.java index cce12fa..589ff68 100644 --- a/src/integration-test/java/io/aiven/kafka/connect/transforms/TopicFromValueSchemaConnector.java +++ b/src/integration-test/java/io/aiven/kafka/connect/transforms/TopicFromValueSchemaConnector.java @@ -21,12 +21,10 @@ import java.util.List; import java.util.Map; -import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -38,7 +36,7 @@ * *

It just produces a fixed number of struct records with value schema name set. */ -public class TopicFromValueSchemaConnector extends SourceConnector { +public class TopicFromValueSchemaConnector extends AbstractTestSourceConnector { static final int MESSAGES_TO_PRODUCE = 10; private static final Logger log = LoggerFactory.getLogger(TopicFromValueSchemaConnector.class); @@ -47,34 +45,11 @@ public class TopicFromValueSchemaConnector extends SourceConnector { static final String NAME = "com.acme.schema.SchemaNameToTopic"; - @Override - public void start(final Map props) { - } - @Override public Class taskClass() { return TopicFromValueSchemaConnectorTask.class; } - @Override - public List> taskConfigs(final int maxTasks) { - return Collections.singletonList(Collections.emptyMap()); - } - - @Override - public void stop() { - } - - @Override - public ConfigDef config() { - return new ConfigDef(); - } - - @Override - public String version() { - return null; - } - public static class TopicFromValueSchemaConnectorTask extends SourceTask { private int counter = 0; @@ -90,7 +65,7 @@ public void start(final Map props) { } @Override - public List poll() throws InterruptedException { + public List poll() { if (counter >= MESSAGES_TO_PRODUCE) { return null; // indicate pause } diff --git a/src/main/java/io/aiven/kafka/connect/transforms/CaseTransform.java b/src/main/java/io/aiven/kafka/connect/transforms/CaseTransform.java new file mode 100644 index 0000000..9083497 --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/transforms/CaseTransform.java @@ -0,0 +1,217 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.transforms; + +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.function.UnaryOperator; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.transforms.Transformation; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Transform field value case based on the configuration. + * Supports maps and structs. + * @param ConnectRecord + */ +public abstract class CaseTransform> implements Transformation { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConcatFields.class); + + /** + * The configuration for case transform. + */ + private CaseTransformConfig config; + + /** + * Configured transform operation. + */ + private UnaryOperator caseTransformFunc; + + protected abstract String dataPlace(); + + protected abstract SchemaAndValue getSchemaAndValue(final R record); + + protected abstract R createNewRecord(final R record, final Schema newSchema, final Object newValue); + + /** + * Apply the case transformation to given new struct from the original struct field. + * @param struct Original struct + * @param newStruct New struct + * @param fieldName The field name to case transform + */ + private void applyStruct(final Struct struct, final Struct newStruct, final String fieldName) { + try { + final Object value = struct.get(fieldName); + if (value == null) { + newStruct.put(fieldName, null); + return; + } + newStruct.put(fieldName, caseTransformFunc.apply(value.toString())); + } catch (final DataException e) { + LOGGER.debug("{} is missing, cannot transform the case", fieldName); + } + } + + /** + * Apply the case transformation to given map from the map field. + * @param newValue The mutable map + * @param fieldName The field name to case transform + */ + private void applyMap(final Map newValue, final String fieldName) { + final Object value = newValue.get(fieldName); + if (value == null) { + newValue.put(fieldName, null); + return; + } + newValue.put(fieldName, caseTransformFunc.apply(newValue.get(fieldName).toString())); + } + + @Override + public R apply(final R record) { + final SchemaAndValue schemaAndValue = getSchemaAndValue(record); + + if (schemaAndValue.value() == null) { + throw new DataException(dataPlace() + " Value can't be null: " + record); + } + + final R newRecord; + + if (schemaAndValue.value() instanceof Struct) { + final Struct struct = (Struct) schemaAndValue.value(); + final Struct newStruct = new Struct(struct.schema()); + struct.schema().fields().forEach(field -> { + newStruct.put(field.name(), struct.get(field)); + }); + config.fieldNames().forEach(field -> { + applyStruct(struct, newStruct, field); + }); + newRecord = createNewRecord(record, struct.schema(), newStruct); + } else if (schemaAndValue.value() instanceof Map) { + final Map newValue = new HashMap<>((Map) schemaAndValue.value()); + config.fieldNames().forEach(field -> { + applyMap(newValue, field); + }); + //if we have a schema, use it, otherwise leave null. + if (schemaAndValue.schema() != null) { + newRecord = createNewRecord(record, schemaAndValue.schema(), newValue); + } else { + newRecord = createNewRecord(record, null, newValue); + } + } else { + throw new DataException("Value type must be STRUCT or MAP: " + record); + } + return newRecord; + } + + + @Override + public void close() { + // no-op + } + + @Override + public ConfigDef config() { + return CaseTransformConfig.config(); + } + + @Override + public void configure(final Map settings) { + this.config = new CaseTransformConfig(settings); + + switch (config.transformCase()) { + case LOWER: + caseTransformFunc = value -> value.toLowerCase(Locale.ROOT); + break; + case UPPER: + caseTransformFunc = value -> value.toUpperCase(Locale.ROOT); + break; + default: + throw new ConnectException("Unknown case transform function " + config.transformCase()); + } + } + + /** + * Record key transform implementation. + * @param ConnectRecord + */ + public static class Key> extends CaseTransform { + @Override + protected SchemaAndValue getSchemaAndValue(final R record) { + return new SchemaAndValue(record.keySchema(), record.key()); + } + + @Override + protected R createNewRecord(final R record, final Schema newSchema, final Object newValue) { + return record.newRecord( + record.topic(), + record.kafkaPartition(), + newSchema, + newValue, + record.valueSchema(), + record.value(), + record.timestamp(), + record.headers() + ); + } + + @Override + protected String dataPlace() { + return "key"; + } + } + + /** + * Record value transform implementation. + * @param ConnectRecord + */ + public static class Value> extends CaseTransform { + @Override + protected SchemaAndValue getSchemaAndValue(final R record) { + return new SchemaAndValue(record.valueSchema(), record.value()); + } + + @Override + protected R createNewRecord(final R record, final Schema newSchema, final Object newValue) { + return record.newRecord( + record.topic(), + record.kafkaPartition(), + record.keySchema(), + record.key(), + newSchema, + newValue, + record.timestamp(), + record.headers() + ); + } + + @Override + protected String dataPlace() { + return "value"; + } + } +} diff --git a/src/main/java/io/aiven/kafka/connect/transforms/CaseTransformConfig.java b/src/main/java/io/aiven/kafka/connect/transforms/CaseTransformConfig.java new file mode 100644 index 0000000..8df055d --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/transforms/CaseTransformConfig.java @@ -0,0 +1,103 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.transforms; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +/** + * Case transform configuration. + * + *

Configure the SMT to do case transform on configured fields. + * Supported case transformations are transform to upper case and transform to lowercase.

+ */ +public class CaseTransformConfig extends AbstractConfig { + + /** + * A comma-separated list of fields to concatenate. + */ + public static final String FIELD_NAMES_CONFIG = "field.names"; + private static final String FIELD_NAMES_DOC = + "A comma-separated list of fields to concatenate."; + /** + * Set the case configuration, 'upper' or 'lower' are supported. + */ + public static final String CASE_CONFIG = "case"; + private static final String CASE_DOC = + "Set the case configuration, 'upper' or 'lower'."; + + CaseTransformConfig(final Map originals) { + super(config(), originals); + } + + static ConfigDef config() { + return new ConfigDef() + .define( + FIELD_NAMES_CONFIG, + ConfigDef.Type.LIST, + ConfigDef.NO_DEFAULT_VALUE, + ConfigDef.Importance.HIGH, + FIELD_NAMES_DOC) + .define( + CASE_CONFIG, + ConfigDef.Type.STRING, + ConfigDef.NO_DEFAULT_VALUE, + new ConfigDef.NonEmptyString(), + ConfigDef.Importance.HIGH, + CASE_DOC); + } + + final List fieldNames() { + return getList(FIELD_NAMES_CONFIG); + } + + final Case transformCase() { + return Objects.requireNonNull(Case.fromString(getString(CASE_CONFIG))); + } + + /** + * Case enumeration for supported transforms. + */ + public enum Case { + LOWER("lower"), + UPPER("upper"); + + private final String transformCase; + + Case(final String transformCase) { + this.transformCase = transformCase; + } + + /** + * Return the case enumeration object resolved from the given parameter. + * @param string The case enumeration to fetch. + * @return + */ + public static Case fromString(final String string) { + for (final Case caseValue : values()) { + if (caseValue.transformCase.equals(string)) { + return caseValue; + } + } + throw new IllegalArgumentException(String.format("Unknown enum value %s", string)); + } + } +} diff --git a/src/test/java/io/aiven/kafka/connect/transforms/CaseTransformTest.java b/src/test/java/io/aiven/kafka/connect/transforms/CaseTransformTest.java new file mode 100644 index 0000000..764d5a6 --- /dev/null +++ b/src/test/java/io/aiven/kafka/connect/transforms/CaseTransformTest.java @@ -0,0 +1,279 @@ +/* + * Copyright 2019 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.transforms; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; + +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import static org.assertj.core.api.Assertions.assertThat; + +public class CaseTransformTest { + + private static final String ORIGINAL_UPPERCASE_FIELD_1 = "original_uppercase_1"; + private static final String ORIGINAL_UPPERCASE_FIELD_2 = "original_uppercase_2"; + private static final String ORIGINAL_LOWERCASE_1 = "original_lowercase_1"; + private static final String ORIGINAL_LOWERCASE_2 = "original_lowercase_2"; + private static final String FIELD_NOT_TRANSFORMED = "do_not_touch"; + + private static final Schema STRUCT_SCHEMA = SchemaBuilder.struct() + .field(ORIGINAL_UPPERCASE_FIELD_1, Schema.OPTIONAL_STRING_SCHEMA) + .field(ORIGINAL_UPPERCASE_FIELD_2, Schema.OPTIONAL_STRING_SCHEMA) + .field(ORIGINAL_LOWERCASE_1, Schema.OPTIONAL_STRING_SCHEMA) + .field(ORIGINAL_LOWERCASE_2, Schema.OPTIONAL_STRING_SCHEMA) + .field(FIELD_NOT_TRANSFORMED, Schema.OPTIONAL_STRING_SCHEMA) + .schema(); + + static Stream recordProvider() { + return Stream.of( + record( + STRUCT_SCHEMA, new Struct(STRUCT_SCHEMA) + .put(ORIGINAL_UPPERCASE_FIELD_1, "UPPERCASE") + .put(ORIGINAL_UPPERCASE_FIELD_2, "CamelCase_1") + .put(ORIGINAL_LOWERCASE_1, "lowercase") + .put(ORIGINAL_LOWERCASE_2, "CamelCase_2") + .put(FIELD_NOT_TRANSFORMED, "DoNotTouch"), + STRUCT_SCHEMA, new Struct(STRUCT_SCHEMA) + .put(ORIGINAL_UPPERCASE_FIELD_1, "UPPERCASE") + .put(ORIGINAL_UPPERCASE_FIELD_2, "CamelCase_1") + .put(ORIGINAL_LOWERCASE_1, "lowercase") + .put(ORIGINAL_LOWERCASE_2, "CamelCase_2") + .put(FIELD_NOT_TRANSFORMED, "DoNotTouch") + ), + record( + null, Map.of( + ORIGINAL_UPPERCASE_FIELD_1, "UPPERCASE", + ORIGINAL_UPPERCASE_FIELD_2, "CamelCase_1", + ORIGINAL_LOWERCASE_1, "lowercase", + ORIGINAL_LOWERCASE_2, "CamelCase_2", + FIELD_NOT_TRANSFORMED, "DoNotTouch" + ), + null, Map.of( + ORIGINAL_UPPERCASE_FIELD_1, "UPPERCASE", + ORIGINAL_UPPERCASE_FIELD_2, "CamelCase_1", + ORIGINAL_LOWERCASE_1, "lowercase", + ORIGINAL_LOWERCASE_2, "CamelCase_2", + FIELD_NOT_TRANSFORMED, "DoNotTouch" + ) + ), + record( + STRUCT_SCHEMA, Map.of( + ORIGINAL_UPPERCASE_FIELD_1, "UPPERCASE", + ORIGINAL_UPPERCASE_FIELD_2, "CamelCase_1", + ORIGINAL_LOWERCASE_1, "lowercase", + ORIGINAL_LOWERCASE_2, "CamelCase_2", + FIELD_NOT_TRANSFORMED, "DoNotTouch" + ), + STRUCT_SCHEMA, Map.of( + ORIGINAL_UPPERCASE_FIELD_1, "UPPERCASE", + ORIGINAL_UPPERCASE_FIELD_2, "CamelCase_1", + ORIGINAL_LOWERCASE_1, "lowercase", + ORIGINAL_LOWERCASE_2, "CamelCase_2", + FIELD_NOT_TRANSFORMED, "DoNotTouch" + ) + ) + ); + } + + private void assertRecordFieldMatches( + final SchemaAndValue schemaAndValue, final String fieldName, final String expected) { + final String actual; + if (schemaAndValue.value() instanceof Struct) { + final Struct struct = (Struct) schemaAndValue.value(); + actual = struct.getString(fieldName); + } else { + final Map map = (Map) schemaAndValue.value(); + actual = map.get(fieldName).toString(); + } + assertThat(actual).isEqualTo(expected); + } + + @ParameterizedTest + @MethodSource("recordProvider") + void testLowercaseKeyTransformation(final SinkRecord originalRecord) { + final SinkRecord transformedRecord = keyTransform( + String.format( + "%s, %s", + ORIGINAL_UPPERCASE_FIELD_1, + ORIGINAL_UPPERCASE_FIELD_2 + ), "lower") + .apply(originalRecord); + final SchemaAndValue schemaAndValue = + new SchemaAndValue(transformedRecord.keySchema(), transformedRecord.key()); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_UPPERCASE_FIELD_1, "uppercase"); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_UPPERCASE_FIELD_2, "camelcase_1"); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_LOWERCASE_1, "lowercase"); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_LOWERCASE_2, "CamelCase_2"); + assertRecordFieldMatches(schemaAndValue, FIELD_NOT_TRANSFORMED, "DoNotTouch"); + } + + @ParameterizedTest + @MethodSource("recordProvider") + void testUppercaseKeyTransformation(final SinkRecord originalRecord) { + final SinkRecord transformedRecord = keyTransform( + String.format( + "%s, %s", + ORIGINAL_LOWERCASE_1, + ORIGINAL_LOWERCASE_2 + ), "upper") + .apply(originalRecord); + final SchemaAndValue schemaAndValue = + new SchemaAndValue(transformedRecord.keySchema(), transformedRecord.key()); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_UPPERCASE_FIELD_1, "UPPERCASE"); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_UPPERCASE_FIELD_2, "CamelCase_1"); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_LOWERCASE_1, "LOWERCASE"); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_LOWERCASE_2, "CAMELCASE_2"); + assertRecordFieldMatches(schemaAndValue, FIELD_NOT_TRANSFORMED, "DoNotTouch"); + } + + @ParameterizedTest + @MethodSource("recordProvider") + void testLowercaseValueTransformation(final SinkRecord originalRecord) { + final SinkRecord transformedRecord = valueTransform( + String.format( + "%s, %s", + ORIGINAL_UPPERCASE_FIELD_1, + ORIGINAL_UPPERCASE_FIELD_2 + ), "lower") + .apply(originalRecord); + final SchemaAndValue schemaAndValue = + new SchemaAndValue(transformedRecord.valueSchema(), transformedRecord.value()); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_UPPERCASE_FIELD_1, "uppercase"); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_UPPERCASE_FIELD_2, "camelcase_1"); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_LOWERCASE_1, "lowercase"); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_LOWERCASE_2, "CamelCase_2"); + assertRecordFieldMatches(schemaAndValue, FIELD_NOT_TRANSFORMED, "DoNotTouch"); + } + + @ParameterizedTest + @MethodSource("recordProvider") + void testUppercaseValueTransformation(final SinkRecord originalRecord) { + final SinkRecord transformedRecord = valueTransform( + String.format( + "%s, %s", + ORIGINAL_LOWERCASE_1, + ORIGINAL_LOWERCASE_2 + ), "upper") + .apply(originalRecord); + final SchemaAndValue schemaAndValue = + new SchemaAndValue(transformedRecord.valueSchema(), transformedRecord.value()); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_UPPERCASE_FIELD_1, "UPPERCASE"); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_UPPERCASE_FIELD_2, "CamelCase_1"); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_LOWERCASE_1, "LOWERCASE"); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_LOWERCASE_2, "CAMELCASE_2"); + assertRecordFieldMatches(schemaAndValue, FIELD_NOT_TRANSFORMED, "DoNotTouch"); + } + + @Test + void testStructNullAndMissingFieldTransformation() { + final SinkRecord originalRecord = record( + STRUCT_SCHEMA, new Struct(STRUCT_SCHEMA) + .put(ORIGINAL_UPPERCASE_FIELD_1, "UPPERCASE") + .put(ORIGINAL_UPPERCASE_FIELD_2, "CamelCase_1") + .put(ORIGINAL_LOWERCASE_1, "lowercase") // This is null in the value. + .put(ORIGINAL_LOWERCASE_2, "CamelCase_2") // This is missing in the value. + .put(FIELD_NOT_TRANSFORMED, "DoNotTouch"), + STRUCT_SCHEMA, new Struct(STRUCT_SCHEMA) + .put(ORIGINAL_UPPERCASE_FIELD_1, "UPPERCASE") + .put(ORIGINAL_UPPERCASE_FIELD_2, "CamelCase_1") + .put(ORIGINAL_LOWERCASE_1, null) + .put(FIELD_NOT_TRANSFORMED, "DoNotTouch") + ); + final SinkRecord transformedRecord = valueTransform( + String.format("%s, %s", ORIGINAL_LOWERCASE_1, ORIGINAL_LOWERCASE_2), "upper") + .apply(originalRecord); + + final SchemaAndValue schemaAndValue = + new SchemaAndValue(transformedRecord.valueSchema(), transformedRecord.value()); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_UPPERCASE_FIELD_1, "UPPERCASE"); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_UPPERCASE_FIELD_2, "CamelCase_1"); + assertThat(((Struct) schemaAndValue.value()).get(ORIGINAL_LOWERCASE_1)).isNull(); + assertThat(((Struct) schemaAndValue.value()).get(ORIGINAL_LOWERCASE_2)).isNull(); + assertRecordFieldMatches(schemaAndValue, FIELD_NOT_TRANSFORMED, "DoNotTouch"); + } + + @Test + void testMapNullAndMissingFieldTransformation() { + final HashMap valueMap = new HashMap<>(); + valueMap.put(ORIGINAL_UPPERCASE_FIELD_1, "UPPERCASE"); + valueMap.put(ORIGINAL_UPPERCASE_FIELD_2, "CamelCase_1"); + valueMap.put(ORIGINAL_LOWERCASE_1, null); + valueMap.put(FIELD_NOT_TRANSFORMED, "DoNotTouch"); + + final SinkRecord originalRecord = record( + STRUCT_SCHEMA, Map.of( + ORIGINAL_UPPERCASE_FIELD_1, "UPPERCASE", + ORIGINAL_UPPERCASE_FIELD_2, "CamelCase_1", + ORIGINAL_LOWERCASE_1, "lowercase", // This is null in the value + ORIGINAL_LOWERCASE_2, "CamelCase_2", // This is missing in the value + FIELD_NOT_TRANSFORMED, "DoNotTouch" + ), + STRUCT_SCHEMA, valueMap + ); + final SinkRecord transformedRecord = valueTransform( + String.format("%s, %s", ORIGINAL_LOWERCASE_1, ORIGINAL_LOWERCASE_2), "upper") + .apply(originalRecord); + + final SchemaAndValue schemaAndValue = + new SchemaAndValue(transformedRecord.valueSchema(), transformedRecord.value()); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_UPPERCASE_FIELD_1, "UPPERCASE"); + assertRecordFieldMatches(schemaAndValue, ORIGINAL_UPPERCASE_FIELD_2, "CamelCase_1"); + assertThat(((Map) schemaAndValue.value()).get(ORIGINAL_LOWERCASE_1)).isNull(); + assertThat(((Map) schemaAndValue.value()).get(ORIGINAL_LOWERCASE_2)).isNull(); + assertRecordFieldMatches(schemaAndValue, FIELD_NOT_TRANSFORMED, "DoNotTouch"); + } + + private CaseTransform keyTransform(final String fieldNames, final String transformCase) { + final Map props = new HashMap<>(); + props.put("field.names", fieldNames); + props.put("case", transformCase); + final CaseTransform.Key caseTransform = new CaseTransform.Key<>(); + caseTransform.configure(props); + return caseTransform; + } + + private CaseTransform valueTransform(final String fieldNames, final String transformCase) { + final Map props = new HashMap<>(); + props.put("field.names", fieldNames); + props.put("case", transformCase); + final CaseTransform.Value caseTransform = new CaseTransform.Value<>(); + caseTransform.configure(props); + return caseTransform; + } + + private static SinkRecord record(final Schema keySchema, + final Object key, + final Schema valueSchema, + final Object value) { + return new SinkRecord("original_topic", 0, + keySchema, key, + valueSchema, value, + 123L, + 456L, TimestampType.CREATE_TIME); + } +}