Skip to content

Commit 7cc5a45

Browse files
author
Brandon Brown
committed
bug(Hash Transformer): Fixing an issue where the hash transformer tests dont run correctly when testing the key version.
1 parent 8e0de6c commit 7cc5a45

File tree

5 files changed

+83
-37
lines changed

5 files changed

+83
-37
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ targetCompatibility = JavaVersion.VERSION_11
4242

4343
ext {
4444
kafkaVersion = "2.0.1"
45-
testcontainersVersion = "1.12.1"
45+
testcontainersVersion = "1.15.1"
4646
}
4747

4848
distributions {

src/main/java/io/aiven/kafka/connect/transforms/Hash.java

Lines changed: 54 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -71,36 +71,16 @@ public void configure(final Map<String, ?> configs) {
7171
}
7272
}
7373

74-
@Override
75-
public R apply(final R record) {
76-
final SchemaAndValue schemaAndValue = getSchemaAndValue(record);
77-
if (schemaAndValue.schema() == null) {
78-
throw new DataException(dataPlace() + " schema can't be null: " + record);
79-
}
80-
74+
public final Optional<Object> getNewValue(final R record, final SchemaAndValue schemaAndValue) {
8175
final Optional<Object> newValue;
8276
if (config.fieldName().isPresent()) {
8377
newValue = getNewValueForNamedField(
84-
record.toString(), schemaAndValue.schema(), schemaAndValue.value(), config.fieldName().get());
78+
record.toString(), schemaAndValue.schema(), schemaAndValue.value(), config.fieldName().get());
8579
} else {
8680
newValue = getNewValueWithoutFieldName(
87-
record.toString(), schemaAndValue.schema(), schemaAndValue.value());
88-
}
89-
90-
if (newValue.isPresent()) {
91-
return record.newRecord(
92-
record.topic(),
93-
record.kafkaPartition(),
94-
record.keySchema(),
95-
record.key(),
96-
record.valueSchema(),
97-
newValue.get(),
98-
record.timestamp(),
99-
record.headers()
100-
);
101-
} else {
102-
return record;
81+
record.toString(), schemaAndValue.schema(), schemaAndValue.value());
10382
}
83+
return newValue;
10484
}
10585

10686
@Override
@@ -190,6 +170,31 @@ protected SchemaAndValue getSchemaAndValue(final R record) {
190170
return new SchemaAndValue(record.keySchema(), record.key());
191171
}
192172

173+
@Override
174+
public R apply(final R record) {
175+
final SchemaAndValue schemaAndValue = getSchemaAndValue(record);
176+
if (schemaAndValue.schema() == null) {
177+
throw new DataException(dataPlace() + " schema can't be null: " + record);
178+
}
179+
180+
final Optional<Object> newValue = getNewValue(record, schemaAndValue);
181+
182+
if (newValue.isPresent()) {
183+
return record.newRecord(
184+
record.topic(),
185+
record.kafkaPartition(),
186+
record.keySchema(),
187+
newValue.get(),
188+
record.valueSchema(),
189+
record.value(),
190+
record.timestamp(),
191+
record.headers()
192+
);
193+
} else {
194+
return record;
195+
}
196+
}
197+
193198
@Override
194199
protected String dataPlace() {
195200
return "key";
@@ -202,6 +207,31 @@ protected SchemaAndValue getSchemaAndValue(final R record) {
202207
return new SchemaAndValue(record.valueSchema(), record.value());
203208
}
204209

210+
@Override
211+
public R apply(final R record) {
212+
final SchemaAndValue schemaAndValue = getSchemaAndValue(record);
213+
if (schemaAndValue.schema() == null) {
214+
throw new DataException(dataPlace() + " schema can't be null: " + record);
215+
}
216+
217+
final Optional<Object> newValue = getNewValue(record, schemaAndValue);
218+
219+
if (newValue.isPresent()) {
220+
return record.newRecord(
221+
record.topic(),
222+
record.kafkaPartition(),
223+
record.keySchema(),
224+
record.key(),
225+
record.valueSchema(),
226+
newValue.get(),
227+
record.timestamp(),
228+
record.headers()
229+
);
230+
} else {
231+
return record;
232+
}
233+
}
234+
205235
@Override
206236
protected String dataPlace() {
207237
return "value";

src/test/java/io/aiven/kafka/connect/transforms/HashKeyTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,17 @@ protected Hash<SinkRecord> createTransformationObject() {
3535
protected SinkRecord record(final Schema schema, final Object data) {
3636
return record(schema, data, null, null);
3737
}
38+
39+
@Override
40+
protected SinkRecord setNewValue(final SinkRecord record, final Object newValue) {
41+
return record.newRecord(record.topic(),
42+
record.kafkaPartition(),
43+
record.keySchema(),
44+
newValue,
45+
record.valueSchema(),
46+
record.value(),
47+
record.timestamp(),
48+
record.headers()
49+
);
50+
}
3851
}

src/test/java/io/aiven/kafka/connect/transforms/HashTest.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,8 @@ private Hash<SinkRecord> transformation(
287287

288288
protected abstract Hash<SinkRecord> createTransformationObject();
289289

290+
protected abstract SinkRecord setNewValue(final SinkRecord record, final Object newValue);
291+
290292
protected abstract SinkRecord record(final Schema schema, final Object data);
291293

292294
protected SinkRecord record(final Schema keySchema,
@@ -300,18 +302,6 @@ protected SinkRecord record(final Schema keySchema,
300302
456L, TimestampType.CREATE_TIME);
301303
}
302304

303-
private SinkRecord setNewValue(final SinkRecord record, final Object newValue) {
304-
return record.newRecord(record.topic(),
305-
record.kafkaPartition(),
306-
record.keySchema(),
307-
record.key(),
308-
record.valueSchema(),
309-
newValue,
310-
record.timestamp(),
311-
record.headers()
312-
);
313-
}
314-
315305
private String hash(final String function, final String value) {
316306
return HASHED_VALUES.get(function).get(value);
317307
}

src/test/java/io/aiven/kafka/connect/transforms/HashValueTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,17 @@ protected Hash<SinkRecord> createTransformationObject() {
3535
protected SinkRecord record(final Schema schema, final Object data) {
3636
return record(null, null, schema, data);
3737
}
38+
39+
@Override
40+
protected SinkRecord setNewValue(final SinkRecord record, final Object newValue) {
41+
return record.newRecord(record.topic(),
42+
record.kafkaPartition(),
43+
record.keySchema(),
44+
record.key(),
45+
record.valueSchema(),
46+
newValue,
47+
record.timestamp(),
48+
record.headers()
49+
);
50+
}
3851
}

0 commit comments

Comments
 (0)