Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,32 @@ transforms.ExtractTopicFromValueSchema.type=io.aiven.kafka.connect.transforms.Ex
transforms.ExtractTopicFromValueSchema.schema.name.regex=(?:[.]|^)([^.]*)$
```

### `CaseTransform`

This transformation transforms the case a string value from the record field to uppercase or lowercase.

This transform can modify fields of `STRING` type.

It supports fields with (e.g. Avro) or without schema (e.g. JSON).

Exists in two variants:
- `io.aiven.kafka.connect.transforms.CaseTransform$Key` - works on keys;
- `io.aiven.kafka.connect.transforms.CaseTransform$Value` - works on values.

The transformation defines the following configurations:

- `field.names` - The name of the fields which should be case transformed.
- `case` - either `lower` or `upper` for transforming the case as desired.

Here is an example of this transformation configuration:

```properties
transforms=caseTransform
transforms.caseTransform.type=io.aiven.kafka.connect.transforms.CaseTransform$Value
transforms.caseTransform.field.names=field_name_1, field_name_2
```


## License

This project is licensed under the [Apache License, Version 2.0](LICENSE).
Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11

ext {
jacksonVersion = "2.18.2"
kafkaVersion = "2.0.1"
testcontainersVersion = "1.20.4"
debeziumVersion = "1.3.0.Final"
Expand Down Expand Up @@ -114,6 +115,7 @@ dependencies {
integrationTestImplementation "org.apache.kafka:connect-transforms:$kafkaVersion"

integrationTestImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"
integrationTestImplementation("com.fasterxml.jackson.core:jackson-core:$jacksonVersion")
integrationTestImplementation "org.testcontainers:kafka:$testcontainersVersion" // this is not Kafka version
// Make test utils from 'test' available in 'integration-test'
integrationTestImplementation sourceSets.test.output
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2025 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.transforms;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.source.SourceConnector;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractTestSourceConnector extends SourceConnector {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTestSourceConnector.class);

@Override
public void start(final Map<String, String> props) {
// no-op
}

@Override
public void stop() {
// no-op
}

@Override
public List<Map<String, String>> taskConfigs(final int maxTasks) {
return Collections.singletonList(Collections.emptyMap());
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

@Override
public String version() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
Expand All @@ -30,9 +32,12 @@
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -130,6 +135,9 @@ void setUp() throws ExecutionException, InterruptedException {
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer = new KafkaConsumer<>(consumerProps);

final NewTopic originalTopic = new NewTopic(TestSourceConnector.ORIGINAL_TOPIC, 1, (short) 1);
Expand All @@ -156,7 +164,7 @@ final void tearDown() {

@Test
@Timeout(10)
final void testExtractTopic() throws ExecutionException, InterruptedException, IOException {
final void testExtractTopic() throws ExecutionException, InterruptedException {
final Map<String, String> connectorConfig = new HashMap<>();
connectorConfig.put("name", "test-source-connector");
connectorConfig.put("connector.class", TestSourceConnector.class.getName());
Expand Down Expand Up @@ -194,6 +202,55 @@ final void testExtractTopicFromValueSchemaName() throws ExecutionException, Inte

}

@Test
@Timeout(10)
void testCaseTransform() throws ExecutionException, InterruptedException, IOException {
adminClient.createTopics(Arrays.asList(new NewTopic(TestCaseTransformConnector.SOURCE_TOPIC, 1, (short) 1)))
.all().get();
adminClient.createTopics(Arrays.asList(new NewTopic(TestCaseTransformConnector.TARGET_TOPIC, 1, (short) 1)))
.all().get();

final Map<String, String> connectorConfig = new HashMap<>();
connectorConfig.put("name", "test-source-connector");
connectorConfig.put("connector.class", TestCaseTransformConnector.class.getName());
connectorConfig.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
connectorConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
connectorConfig.put("value.converter.value.subject.name.strategy",
"io.confluent.kafka.serializers.subject.RecordNameStrategy");
connectorConfig.put("tasks.max", "1");
connectorConfig.put("transforms", "regexRouteToTargetTopic, caseTransform");
connectorConfig.put("transforms.caseTransform.case", "upper");
connectorConfig.put("transforms.caseTransform.field.names", TestCaseTransformConnector.TRANSFORM_FIELD);
connectorConfig.put("transforms.caseTransform.type", "io.aiven.kafka.connect.transforms.CaseTransform$Value");
connectorConfig.put("transforms.regexRouteToTargetTopic.type",
"org.apache.kafka.connect.transforms.RegexRouter");
connectorConfig.put("transforms.regexRouteToTargetTopic.regex", "(.*)-source-(.*)");
connectorConfig.put("transforms.regexRouteToTargetTopic.replacement", String.format("$1-target-$2"));

connectRunner.createConnector(connectorConfig);
checkMessageTransformInTopic(
new TopicPartition(TestCaseTransformConnector.TARGET_TOPIC, 0),
TestCaseTransformConnector.MESSAGES_TO_PRODUCE
);
}

final void checkMessageTransformInTopic(final TopicPartition topicPartition, final long expectedNumberOfMessages)
throws InterruptedException, IOException {
waitForCondition(
() -> consumer.endOffsets(Arrays.asList(topicPartition))
.values().stream().reduce(Long::sum).map(s -> s == expectedNumberOfMessages)
.orElse(false), 5000, "Messages appear in target topic"
);
consumer.subscribe(Collections.singletonList(topicPartition.topic()));
final ObjectMapper objectMapper = new ObjectMapper();
final TypeReference<Map<String, Object>> tr = new TypeReference<>() {};
for (final ConsumerRecord<byte[], byte[]> consumerRecord : consumer.poll(Duration.ofSeconds(1))) {
final Map<String, Object> value = objectMapper.readValue(consumerRecord.value(), tr);
final Map<String, String> payload = (Map<String, String>) value.get("payload");
assertThat(payload.get("transform")).isEqualTo("LOWER-CASE-DATA-TRANSFORMS-TO-UPPERCASE");
}
}

final void checkMessageTopics(final TopicPartition originalTopicPartition, final TopicPartition newTopicPartition)
throws InterruptedException {
waitForCondition(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2025 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.transforms;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

public class TestCaseTransformConnector extends AbstractTestSourceConnector {
static final long MESSAGES_TO_PRODUCE = 10L;

static final String SOURCE_TOPIC = "case-transform-source-topic";
static final String TARGET_TOPIC = "case-transform-target-topic";
static final String TRANSFORM_FIELD = "transform";

@Override
public Class<? extends Task> taskClass() {
return TestCaseTransformConnector.TestSourceConnectorTask.class;
}

public static class TestSourceConnectorTask extends SourceTask {
private int counter = 0;

private final Schema valueSchema = SchemaBuilder.struct()
.field(TRANSFORM_FIELD, SchemaBuilder.STRING_SCHEMA)
.schema();
private final Struct value =
new Struct(valueSchema).put(TRANSFORM_FIELD, "lower-case-data-transforms-to-uppercase");

@Override
public void start(final Map<String, String> props) {
}

@Override
public List<SourceRecord> poll() {
if (counter >= MESSAGES_TO_PRODUCE) {
return null; // indicate pause
}

final Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put("partition", "0");
final Map<String, String> sourceOffset = new HashMap<>();
sourceOffset.put("offset", Integer.toString(counter));

counter += 1;

return Collections.singletonList(
new SourceRecord(sourcePartition, sourceOffset,
SOURCE_TOPIC,
valueSchema, value)
);
}

@Override
public void stop() {
}

@Override
public String version() {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

Expand All @@ -35,41 +33,18 @@
*
* <p>It just produces a fixed number of struct records.
*/
public class TestSourceConnector extends SourceConnector {
public final class TestSourceConnector extends AbstractTestSourceConnector {
static final long MESSAGES_TO_PRODUCE = 10L;

static final String ORIGINAL_TOPIC = "original-topic";
static final String NEW_TOPIC = "new-topic";
static final String ROUTING_FIELD = "field-0";

@Override
public void start(final Map<String, String> props) {
}

@Override
public Class<? extends Task> taskClass() {
return TestSourceConnectorTask.class;
}

@Override
public List<Map<String, String>> taskConfigs(final int maxTasks) {
return Collections.singletonList(Collections.emptyMap());
}

@Override
public void stop() {
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

@Override
public String version() {
return null;
}

public static class TestSourceConnectorTask extends SourceTask {
private int counter = 0;

Expand All @@ -83,7 +58,7 @@ public void start(final Map<String, String> props) {
}

@Override
public List<SourceRecord> poll() throws InterruptedException {
public List<SourceRecord> poll() {
if (counter >= MESSAGES_TO_PRODUCE) {
return null; // indicate pause
}
Expand Down
Loading
Loading