Skip to content

Commit 25aaa9a

Browse files
committed
feat: Add KeyToValue transform
1 parent 0dfd9e8 commit 25aaa9a

File tree

4 files changed

+686
-0
lines changed

4 files changed

+686
-0
lines changed

README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,38 @@ transforms.caseTransform.type=io.aiven.kafka.connect.transforms.CaseTransform$Va
221221
transforms.caseTransform.field.names=field_name_1, field_name_2
222222
```
223223

224+
### `KeyToValue`
225+
226+
Updates the record value with information found in the record key.
227+
228+
This transformation extracts fields from the key and adds them to the value. This is similar to the standard [ValueToKey](https://kafka.apache.org/documentation/#org.apache.kafka.connect.transforms.ValueToKey) transformation from Kafka, but doesn't replace the value.
229+
230+
This supports extracting information from a record key with a schema (e.g. Avro) or without a schema (e.g. JSON), as well as from a record value with a schema or without a schema.
231+
232+
The transformation defines the following configurations:
233+
234+
- `key.fields` - The comma-separated name(s) of the fields in the record key that should be extracted, or `*` to use the entire key.
235+
- `value.fields` - The comma-separated name(s) of the fields to add into the record value, in the same order as `key.field.names`.
236+
237+
Any empty or missing value field uses the same name as the key field by default. If a `*` is specified as the key field, its default value field name is `_key`.
238+
239+
Here is an example of this transformation configuration that copies the `id`, `department` and `cost` fields from the key to the value, and renames the `department` field in the value to `dept`:
240+
241+
```properties
242+
transforms=keyToValue
243+
transforms.keyToValue.type=io.aiven.kafka.connect.transforms.KeyToValue
244+
transforms.keyToValue.key.fields=id, department, cost
245+
transforms.keyToValue.value.fields=id, dept
246+
```
247+
248+
Here is an example of this transformation configuration that copies the entire key to the value, under the field `_key`:
249+
250+
```properties
251+
transforms=copyKey
252+
transforms.copyKey.type=io.aiven.kafka.connect.transforms.KeyToValue
253+
transforms.copyKey.key.fields=*
254+
```
255+
224256

225257
## License
226258

Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
/*
2+
* Copyright 2025 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
20+
import java.util.Collections;
21+
import java.util.HashMap;
22+
import java.util.LinkedHashMap;
23+
import java.util.List;
24+
import java.util.Map;
25+
26+
import org.apache.kafka.common.cache.Cache;
27+
import org.apache.kafka.common.cache.LRUCache;
28+
import org.apache.kafka.common.cache.SynchronizedCache;
29+
import org.apache.kafka.common.config.ConfigDef;
30+
import org.apache.kafka.connect.connector.ConnectRecord;
31+
import org.apache.kafka.connect.data.Field;
32+
import org.apache.kafka.connect.data.Schema;
33+
import org.apache.kafka.connect.data.SchemaBuilder;
34+
import org.apache.kafka.connect.data.Struct;
35+
import org.apache.kafka.connect.errors.DataException;
36+
import org.apache.kafka.connect.transforms.Transformation;
37+
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
40+
41+
/**
42+
* @param <R> ConnectRecord
43+
*/
44+
public class KeyToValue<R extends ConnectRecord<R>> implements Transformation<R> {
45+
46+
private static final Logger LOGGER = LoggerFactory.getLogger(KeyToValue.class);
47+
48+
private KeyToValueConfig config;
49+
50+
/**
51+
* A list of the new fields that will be added to the value schema, mapped to the key fields used to populate them.
52+
*/
53+
private LinkedHashMap<String, String> valueFields;
54+
55+
/**
56+
* Whether any keyFields are non-wildcard (that copies a field from the key).
57+
*/
58+
private boolean keyFieldsHasNamedFields;
59+
60+
/**
61+
* Whether any keyFields are a wildcard (that copies the entire key).
62+
*/
63+
private boolean keyFieldsHasWildcard;
64+
65+
66+
private Cache<List<Object>, Schema> schemaCache;
67+
68+
69+
@Override
70+
public ConfigDef config() {
71+
return KeyToValueConfig.config();
72+
}
73+
74+
@Override
75+
public void configure(final Map<String, ?> settings) {
76+
this.config = new KeyToValueConfig(settings);
77+
// Construct the mapping from the list in the config
78+
final List<String> keyFieldsList = config.keyFields();
79+
final List<String> valueFieldsList = config.valueFields();
80+
valueFields = new LinkedHashMap<>();
81+
82+
for (int i = 0; i < keyFieldsList.size(); i++) {
83+
final String kf = keyFieldsList.get(i);
84+
final String vfIfPresent = i < valueFieldsList.size() ? valueFieldsList.get(i) : kf;
85+
final String vf = "*".equals(vfIfPresent) ? KeyToValueConfig.DEFAULT_WHOLE_KEY_FIELD : vfIfPresent;
86+
87+
if (valueFields.containsKey(vf)) {
88+
throw new DataException(
89+
String.format("More than one key value is copied to the value field name '%s'", vf));
90+
}
91+
valueFields.put(vf, kf);
92+
}
93+
94+
keyFieldsHasNamedFields = valueFields.values().stream().anyMatch(kf -> !kf.equals("*"));
95+
keyFieldsHasWildcard = valueFields.containsValue("*");
96+
97+
schemaCache = new SynchronizedCache<>(new LRUCache<>(16));
98+
}
99+
100+
101+
private void validateUnextractableKeyFields(final R record) {
102+
// If the key fields have names, but the key value doesn't support names
103+
if (keyFieldsHasNamedFields && record.keySchema() != null && record.keySchema().type() != Schema.Type.STRUCT) {
104+
throw new DataException(
105+
String.format("Named key fields %s cannot be copied from the key schema: %s", valueFields.values(),
106+
record.keySchema().type()));
107+
}
108+
109+
if (keyFieldsHasNamedFields && record.keySchema() == null
110+
&& !(record.key() instanceof Map || record.key() instanceof Struct)) {
111+
throw new DataException(
112+
String.format("Named key fields %s cannot be copied from the key class: %s", valueFields.values(),
113+
record.key().getClass().getName()));
114+
}
115+
}
116+
117+
private void validateKeyFieldSchemaRequired(final R record) {
118+
if (keyFieldsHasWildcard && record.keySchema() == null && record.key() instanceof Map) {
119+
if (record.valueSchema() != null && record.valueSchema().type() == Schema.Type.STRUCT) {
120+
throw new DataException("The value requires a schema, but the key class is a schemaless Map");
121+
}
122+
}
123+
}
124+
125+
private void validateStructuredValue(final R record) {
126+
if (record.valueSchema() == null && !(record.value() instanceof Map)
127+
|| record.valueSchema() != null && record.valueSchema().type() != Schema.Type.STRUCT) {
128+
throw new DataException("The value needs to be a Struct or Map in order to append fields");
129+
}
130+
}
131+
132+
@Override
133+
public R apply(final R record) {
134+
validateUnextractableKeyFields(record);
135+
validateKeyFieldSchemaRequired(record);
136+
validateStructuredValue(record);
137+
138+
if (record.value() instanceof Struct) {
139+
if (record.keySchema() != null) {
140+
return applyToStruct(record, record.keySchema());
141+
} else {
142+
return applyToStruct(record, inferSchemaFromPrimitive(record.key()));
143+
}
144+
} else {
145+
return applyToMap(record);
146+
}
147+
}
148+
149+
@Override
150+
public void close() {
151+
schemaCache = null;
152+
}
153+
154+
/**
155+
* Merges the key and value schemas into a new schema, according to the configuration.
156+
*
157+
* @param record The incoming record to be transformed.
158+
* @param keySchema The schema to be used for the incoming key. This may have been inferred from the key value.
159+
* @return The transformed record with the new schema.
160+
*/
161+
private R applyToStruct(final R record, final Schema keySchema) {
162+
Schema newSchema;
163+
164+
final List<Object> schemaKey = List.of(keySchema, record.valueSchema());
165+
newSchema = schemaCache.get(schemaKey);
166+
if (newSchema == null) {
167+
newSchema = mergeSchema(keySchema, record.valueSchema());
168+
LOGGER.debug("Merging into new schema {}", newSchema);
169+
schemaCache.put(schemaKey, newSchema);
170+
}
171+
172+
final Struct value = (Struct) record.value();
173+
final Struct newValue = new Struct(newSchema);
174+
175+
if (record.key() instanceof Struct) {
176+
final Struct key = (Struct) record.key();
177+
for (final Field f : newSchema.fields()) {
178+
final String kf = valueFields.get(f.name());
179+
if (kf == null) {
180+
newValue.put(f.name(), value.get(f.name()));
181+
} else if (kf.equals("*")) {
182+
newValue.put(f.name(), key);
183+
} else {
184+
newValue.put(f.name(), key.get(kf));
185+
}
186+
}
187+
} else {
188+
for (final Field f : newSchema.fields()) {
189+
final String kf = valueFields.get(f.name());
190+
if (kf == null) {
191+
newValue.put(f.name(), value.get(f.name()));
192+
} else if (kf.equals("*")) {
193+
newValue.put(f.name(), record.key());
194+
}
195+
}
196+
}
197+
198+
// Replace the value in the record
199+
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
200+
newValue.schema(), newValue, record.timestamp());
201+
}
202+
203+
/**
204+
* Infers the schema from a primitive key.
205+
*
206+
* @param key The key value.
207+
* @return The inferred schema.
208+
*/
209+
private Schema inferSchemaFromPrimitive(final Object key) {
210+
if (key instanceof String) {
211+
return Schema.OPTIONAL_STRING_SCHEMA;
212+
} else if (key instanceof Boolean) {
213+
return Schema.OPTIONAL_BOOLEAN_SCHEMA;
214+
} else if (key instanceof Byte) {
215+
return Schema.OPTIONAL_INT8_SCHEMA;
216+
} else if (key instanceof Short) {
217+
return Schema.OPTIONAL_INT16_SCHEMA;
218+
} else if (key instanceof Integer) {
219+
return Schema.OPTIONAL_INT32_SCHEMA;
220+
} else if (key instanceof Long) {
221+
return Schema.OPTIONAL_INT64_SCHEMA;
222+
} else if (key instanceof Float) {
223+
return Schema.OPTIONAL_FLOAT32_SCHEMA;
224+
} else if (key instanceof Double) {
225+
return Schema.OPTIONAL_FLOAT64_SCHEMA;
226+
} else {
227+
throw new DataException("Cannot infer schema for unsupported key class: " + key.getClass().getName());
228+
}
229+
}
230+
231+
/**
232+
* Merges the key and value schemas into a new schema, according to the configuration of keyFields to copy into the
233+
* new value schema, and how they are renamed via valueFields.
234+
*
235+
* @param keySchema The key schema.
236+
* @param valueSchema The original value schema.
237+
* @return The merged schema with any new types copied from the key schema.
238+
*/
239+
private Schema mergeSchema(final Schema keySchema, final Schema valueSchema) {
240+
241+
// Build a map of all the field names and schemas for the output, starting with the ones already present
242+
// in the value.
243+
final Map<String, Schema> updatedFieldSchemas = new LinkedHashMap<>();
244+
for (final Field vf : valueSchema.fields()) {
245+
updatedFieldSchemas.put(vf.name(), vf.schema());
246+
}
247+
248+
// Add all the value fields that are going to be extracted by the key (overwriting any old ones).
249+
for (final Map.Entry<String, String> names : valueFields.entrySet()) {
250+
final String kf = names.getValue();
251+
final String vf = names.getKey();
252+
if (kf.equals("*")) {
253+
updatedFieldSchemas.put(vf, keySchema);
254+
} else if (keySchema.field(kf) == null) {
255+
throw new DataException(String.format("Key field '%s' not found in key schema", kf));
256+
} else {
257+
updatedFieldSchemas.put(vf, keySchema.field(kf).schema());
258+
}
259+
}
260+
261+
// Create a copy of the output schema.
262+
final SchemaBuilder preVsb = SchemaBuilder.struct()
263+
.name(valueSchema.name())
264+
.version(valueSchema.version())
265+
.doc(valueSchema.doc())
266+
.parameters(valueSchema.parameters() != null ? valueSchema.parameters() : Collections.emptyMap());
267+
final SchemaBuilder vsb = valueSchema.isOptional() ? preVsb.optional() : preVsb;
268+
269+
// Apply the fields retaining the order of the original value schema and valueFields configuration
270+
for (final Map.Entry<String, Schema> entry : updatedFieldSchemas.entrySet()) {
271+
vsb.field(entry.getKey(), entry.getValue());
272+
}
273+
274+
return vsb.build();
275+
}
276+
277+
@SuppressWarnings("unchecked")
278+
private R applyToMap(final R record) {
279+
280+
final Map<String, Object> value = (Map<String, Object>) record.value();
281+
final Map<String, Object> newValue = new HashMap<>(value);
282+
283+
if (record.key() instanceof Struct) {
284+
final Struct key = (Struct) record.key();
285+
for (final String vf : valueFields.keySet()) {
286+
final String kf = valueFields.get(vf);
287+
if (kf.equals("*")) {
288+
newValue.put(vf, key);
289+
} else {
290+
newValue.put(vf, key.get(kf));
291+
}
292+
}
293+
} else if (record.key() instanceof Map) {
294+
final Map<String, Object> key = (Map<String, Object>) record.key();
295+
for (final String vf : valueFields.keySet()) {
296+
final String kf = valueFields.get(vf);
297+
if (kf.equals("*")) {
298+
newValue.put(vf, key);
299+
} else {
300+
newValue.put(vf, key.get(kf));
301+
}
302+
}
303+
} else {
304+
for (final String vf : valueFields.keySet()) {
305+
final String kf = valueFields.get(vf);
306+
if (kf.equals("*")) {
307+
newValue.put(vf, record.key());
308+
}
309+
}
310+
}
311+
312+
// Replace the value in the record
313+
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(),
314+
null, newValue, record.timestamp());
315+
}
316+
}
317+

0 commit comments

Comments
 (0)