Skip to content

Commit 747b4d6

Browse files
authored
Merge pull request #17 from brbrown25/bugfix/ISSUE-15
FIX: Hash hashing whole structure when field name set
2 parents b520266 + c94d6a9 commit 747b4d6

File tree

2 files changed

+30
-10
lines changed
  • src

2 files changed

+30
-10
lines changed

src/main/java/io/aiven/kafka/connect/transforms/Hash.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public R apply(final R record) {
7777
throw new DataException(dataPlace() + " schema can't be null: " + record);
7878
}
7979

80-
final Optional<String> newValue;
80+
final Optional<Object> newValue;
8181
if (config.fieldName().isPresent()) {
8282
newValue = getNewValueForNamedField(
8383
record.toString(), schemaAndValue.schema(), schemaAndValue.value(), config.fieldName().get());
@@ -110,7 +110,7 @@ public void close() {
110110

111111
protected abstract SchemaAndValue getSchemaAndValue(final R record);
112112

113-
private Optional<String> getNewValueForNamedField(final String recordStr,
113+
private Optional<Object> getNewValueForNamedField(final String recordStr,
114114
final Schema schema,
115115
final Object value,
116116
final String fieldName) {
@@ -149,11 +149,13 @@ private Optional<String> getNewValueForNamedField(final String recordStr,
149149
throw new DataException(fieldName + " in " + dataPlace() + " can't be null: " + recordStr);
150150
}
151151
} else {
152-
return Optional.of(hashString(stringValue));
152+
final String updatedValue = hashString(stringValue);
153+
final Struct updatedRecord = struct.put(fieldName, updatedValue);
154+
return Optional.ofNullable(updatedRecord);
153155
}
154156
}
155157

156-
private Optional<String> getNewValueWithoutFieldName(final String recordStr,
158+
private Optional<Object> getNewValueWithoutFieldName(final String recordStr,
157159
final Schema schema,
158160
final Object value) {
159161
if (schema.type() != Schema.Type.STRING) {

src/test/java/io/aiven/kafka/connect/transforms/HashTest.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ abstract class HashTest {
4242
private static final String EMPTY_FIELD_VALUE = "";
4343
private static final String NON_EMPTY_FIELD_VALUE = "jerry@all_your_bases.com";
4444
private static final String DEFAULT_HASH_FUNCTION = HashConfig.HashFunction.SHA256.toString();
45+
private static final String UNAFFECTED_FIELD = "name";
46+
private static final String UNAFFECTED_FIELD_VALUE = "jerry";
4547

4648
@Test
4749
void noFieldName_NullValue_NoSkip() {
@@ -132,8 +134,12 @@ void fieldName_MissingValue_NoSkip() {
132134
void fieldName_NullValue_Skip() {
133135
final Schema schema = SchemaBuilder.struct()
134136
.field(FIELD, SchemaBuilder.OPTIONAL_STRING_SCHEMA)
137+
.field(UNAFFECTED_FIELD, SchemaBuilder.STRING_SCHEMA)
135138
.schema();
136-
final SinkRecord originalRecord = record(schema, new Struct(schema).put(FIELD, null));
139+
final Struct originalStruct = new Struct(schema)
140+
.put(FIELD, null)
141+
.put(UNAFFECTED_FIELD, UNAFFECTED_FIELD_VALUE);
142+
final SinkRecord originalRecord = record(schema, originalStruct);
137143
final Hash<SinkRecord> transform = transformation(FIELD, true, DEFAULT_HASH_FUNCTION);
138144
final SinkRecord result = transform.apply(originalRecord);
139145
// No changes.
@@ -193,11 +199,17 @@ void fieldName_UnsupportedTypeInField() {
193199
void fieldName_NormalStringValue(final String hashFunction) {
194200
final Schema schema = SchemaBuilder.struct()
195201
.field(FIELD, SchemaBuilder.STRING_SCHEMA)
202+
.field(UNAFFECTED_FIELD, SchemaBuilder.STRING_SCHEMA)
196203
.schema();
197-
final SinkRecord originalRecord = record(schema, new Struct(schema).put(FIELD, NON_EMPTY_FIELD_VALUE));
204+
final Struct originalStruct = new Struct(schema)
205+
.put(FIELD, NON_EMPTY_FIELD_VALUE)
206+
.put(UNAFFECTED_FIELD, UNAFFECTED_FIELD_VALUE);
207+
final SinkRecord originalRecord = record(schema, originalStruct);
198208
final Hash<SinkRecord> transform = transformation(FIELD, true, hashFunction);
199209
final SinkRecord result = transform.apply(originalRecord);
200-
final String newValue = hash(hashFunction, NON_EMPTY_FIELD_VALUE);
210+
final Struct newValue = new Struct(schema)
211+
.put(FIELD, hash(hashFunction, NON_EMPTY_FIELD_VALUE))
212+
.put(UNAFFECTED_FIELD, UNAFFECTED_FIELD_VALUE);
201213
assertEquals(setNewValue(originalRecord, newValue), result);
202214
}
203215

@@ -206,11 +218,17 @@ void fieldName_NormalStringValue(final String hashFunction) {
206218
void fieldName_EmptyStringValue(final String hashFunction) {
207219
final Schema schema = SchemaBuilder.struct()
208220
.field(FIELD, SchemaBuilder.STRING_SCHEMA)
221+
.field(UNAFFECTED_FIELD, SchemaBuilder.STRING_SCHEMA)
209222
.schema();
210-
final SinkRecord originalRecord = record(schema, new Struct(schema).put(FIELD, EMPTY_FIELD_VALUE));
223+
final Struct originalStruct = new Struct(schema)
224+
.put(FIELD, EMPTY_FIELD_VALUE)
225+
.put(UNAFFECTED_FIELD, UNAFFECTED_FIELD_VALUE);
226+
final SinkRecord originalRecord = record(schema, originalStruct);
211227
final Hash<SinkRecord> transform = transformation(FIELD, true, hashFunction);
212228
final SinkRecord result = transform.apply(originalRecord);
213-
final String newValue = hash(hashFunction, EMPTY_FIELD_VALUE);
229+
final Struct newValue = new Struct(schema)
230+
.put(FIELD, hash(hashFunction, EMPTY_FIELD_VALUE))
231+
.put(UNAFFECTED_FIELD, UNAFFECTED_FIELD_VALUE);
214232
assertEquals(setNewValue(originalRecord, newValue), result);
215233
}
216234

@@ -246,7 +264,7 @@ protected SinkRecord record(final Schema keySchema,
246264
456L, TimestampType.CREATE_TIME);
247265
}
248266

249-
private SinkRecord setNewValue(final SinkRecord record, final String newValue) {
267+
private SinkRecord setNewValue(final SinkRecord record, final Object newValue) {
250268
return record.newRecord(record.topic(),
251269
record.kafkaPartition(),
252270
record.keySchema(),

0 commit comments

Comments
 (0)