@@ -42,6 +42,8 @@ abstract class HashTest {
4242 private static final String EMPTY_FIELD_VALUE = "" ;
4343 private static final String NON_EMPTY_FIELD_VALUE = "jerry@all_your_bases.com" ;
4444 private static final String DEFAULT_HASH_FUNCTION = HashConfig .HashFunction .SHA256 .toString ();
45+ private static final String UNAFFECTED_FIELD = "name" ;
46+ private static final String UNAFFECTED_FIELD_VALUE = "jerry" ;
4547
4648 @ Test
4749 void noFieldName_NullValue_NoSkip () {
@@ -132,8 +134,12 @@ void fieldName_MissingValue_NoSkip() {
132134 void fieldName_NullValue_Skip () {
133135 final Schema schema = SchemaBuilder .struct ()
134136 .field (FIELD , SchemaBuilder .OPTIONAL_STRING_SCHEMA )
137+ .field (UNAFFECTED_FIELD , SchemaBuilder .STRING_SCHEMA )
135138 .schema ();
136- final SinkRecord originalRecord = record (schema , new Struct (schema ).put (FIELD , null ));
139+ final Struct originalStruct = new Struct (schema )
140+ .put (FIELD , null )
141+ .put (UNAFFECTED_FIELD , UNAFFECTED_FIELD_VALUE );
142+ final SinkRecord originalRecord = record (schema , originalStruct );
137143 final Hash <SinkRecord > transform = transformation (FIELD , true , DEFAULT_HASH_FUNCTION );
138144 final SinkRecord result = transform .apply (originalRecord );
139145 // No changes.
@@ -193,11 +199,17 @@ void fieldName_UnsupportedTypeInField() {
193199 void fieldName_NormalStringValue (final String hashFunction ) {
194200 final Schema schema = SchemaBuilder .struct ()
195201 .field (FIELD , SchemaBuilder .STRING_SCHEMA )
202+ .field (UNAFFECTED_FIELD , SchemaBuilder .STRING_SCHEMA )
196203 .schema ();
197- final SinkRecord originalRecord = record (schema , new Struct (schema ).put (FIELD , NON_EMPTY_FIELD_VALUE ));
204+ final Struct originalStruct = new Struct (schema )
205+ .put (FIELD , NON_EMPTY_FIELD_VALUE )
206+ .put (UNAFFECTED_FIELD , UNAFFECTED_FIELD_VALUE );
207+ final SinkRecord originalRecord = record (schema , originalStruct );
198208 final Hash <SinkRecord > transform = transformation (FIELD , true , hashFunction );
199209 final SinkRecord result = transform .apply (originalRecord );
200- final String newValue = hash (hashFunction , NON_EMPTY_FIELD_VALUE );
210+ final Struct newValue = new Struct (schema )
211+ .put (FIELD , hash (hashFunction , NON_EMPTY_FIELD_VALUE ))
212+ .put (UNAFFECTED_FIELD , UNAFFECTED_FIELD_VALUE );
201213 assertEquals (setNewValue (originalRecord , newValue ), result );
202214 }
203215
@@ -206,11 +218,17 @@ void fieldName_NormalStringValue(final String hashFunction) {
206218 void fieldName_EmptyStringValue (final String hashFunction ) {
207219 final Schema schema = SchemaBuilder .struct ()
208220 .field (FIELD , SchemaBuilder .STRING_SCHEMA )
221+ .field (UNAFFECTED_FIELD , SchemaBuilder .STRING_SCHEMA )
209222 .schema ();
210- final SinkRecord originalRecord = record (schema , new Struct (schema ).put (FIELD , EMPTY_FIELD_VALUE ));
223+ final Struct originalStruct = new Struct (schema )
224+ .put (FIELD , EMPTY_FIELD_VALUE )
225+ .put (UNAFFECTED_FIELD , UNAFFECTED_FIELD_VALUE );
226+ final SinkRecord originalRecord = record (schema , originalStruct );
211227 final Hash <SinkRecord > transform = transformation (FIELD , true , hashFunction );
212228 final SinkRecord result = transform .apply (originalRecord );
213- final String newValue = hash (hashFunction , EMPTY_FIELD_VALUE );
229+ final Struct newValue = new Struct (schema )
230+ .put (FIELD , hash (hashFunction , EMPTY_FIELD_VALUE ))
231+ .put (UNAFFECTED_FIELD , UNAFFECTED_FIELD_VALUE );
214232 assertEquals (setNewValue (originalRecord , newValue ), result );
215233 }
216234
@@ -246,7 +264,7 @@ protected SinkRecord record(final Schema keySchema,
246264 456L , TimestampType .CREATE_TIME );
247265 }
248266
249- private SinkRecord setNewValue (final SinkRecord record , final String newValue ) {
267+ private SinkRecord setNewValue (final SinkRecord record , final Object newValue ) {
250268 return record .newRecord (record .topic (),
251269 record .kafkaPartition (),
252270 record .keySchema (),
0 commit comments