Skip to content

Commit 2e308f0

Browse files
author
Brandon Brown
committed
pr feedback
1 parent d8aa4cb commit 2e308f0

File tree

5 files changed

+109
-87
lines changed

5 files changed

+109
-87
lines changed

src/main/java/io/aiven/kafka/connect/transforms/HashField.java

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@
1717
package io.aiven.kafka.connect.transforms;
1818

1919
import java.security.MessageDigest;
20-
import java.security.NoSuchAlgorithmException;
2120
import java.util.Arrays;
2221
import java.util.Base64;
2322
import java.util.List;
2423
import java.util.Map;
2524
import java.util.Optional;
26-
import java.util.stream.Collectors;
2725

2826
import org.apache.kafka.common.config.ConfigDef;
2927
import org.apache.kafka.connect.connector.ConnectRecord;
@@ -68,8 +66,12 @@ public R apply(final R record) {
6866
newValue = getNewValueForNamedField(
6967
record.toString(), schemaAndValue.schema(), schemaAndValue.value(), config.fieldName().get());
7068
} else {
71-
newValue = getNewValueWithoutFieldName(
72-
record.toString(), schemaAndValue.schema(), schemaAndValue.value());
69+
if (config.skipMissingOrNull()) {
70+
newValue = getNewValueWithoutFieldName(
71+
record.toString(), schemaAndValue.schema(), schemaAndValue.value());
72+
} else {
73+
throw new DataException(dataPlace() + " can't be null or empty: " + record);
74+
}
7375
}
7476

7577
if (newValue.isPresent()) {
@@ -111,7 +113,11 @@ private Optional<String> getNewValueForNamedField(final String recordStr,
111113

112114
final Field field = schema.field(fieldName);
113115
if (field == null) {
114-
return Optional.empty();
116+
if (config.skipMissingOrNull()) {
117+
return Optional.empty();
118+
} else {
119+
throw new DataException(fieldName + " in " + dataPlace() + " schema can't be missing: " + recordStr);
120+
}
115121
}
116122

117123
if (!SUPPORTED_TYPES_TO_CONVERT_FROM.contains(field.schema().type())) {
@@ -123,27 +129,22 @@ private Optional<String> getNewValueForNamedField(final String recordStr,
123129
final Struct struct = (Struct) value;
124130

125131
final Optional<String> result = Optional.ofNullable(struct.get(fieldName))
126-
.map(Object::toString).flatMap(s -> config.hashFunction().map(alg -> hashString(alg, s)));
132+
.map(Object::toString).map(s -> hashString(config.hashFunction(), s));
133+
127134
if (result.isPresent() && !result.get().equals("")) {
128135
return result;
129136
} else {
130-
return Optional.empty();
137+
if (config.skipMissingOrNull()) {
138+
return Optional.empty();
139+
} else {
140+
throw new DataException(fieldName + " in " + dataPlace() + " can't be null or empty: " + recordStr);
141+
}
131142
}
132143
}
133144

134-
static final String hashString(final String hashAlg, final String input) {
135-
try {
136-
final MessageDigest md = MessageDigest.getInstance(hashAlg);
137-
final byte[] digest = md.digest(input.getBytes());
138-
return Base64.getEncoder().encodeToString(digest);
139-
} catch (final NoSuchAlgorithmException e) {
140-
System.out.println("EXCEPTION: " + e.getMessage());
141-
throw new DataException("Hash function " + hashAlg
142-
+ " must be "
143-
+ HashFieldConfig.HashFunction.stringValues
144-
.stream()
145-
.collect(Collectors.joining("|", "[", "]")));
146-
}
145+
static final String hashString(final MessageDigest md, final String input) {
146+
final byte[] digest = md.digest(input.getBytes());
147+
return Base64.getEncoder().encodeToString(digest);
147148
}
148149

149150
private Optional<String> getNewValueWithoutFieldName(final String recordStr,
@@ -157,7 +158,7 @@ private Optional<String> getNewValueWithoutFieldName(final String recordStr,
157158
}
158159

159160
final Optional<String> result = Optional.ofNullable(value)
160-
.map(Object::toString).flatMap(s -> config.hashFunction().map(alg -> hashString(alg, s)));
161+
.map(Object::toString).map(s -> hashString(config.hashFunction(), s));
161162

162163
if (result.isPresent() && !result.get().equals("")) {
163164
return result;
@@ -189,4 +190,8 @@ protected String dataPlace() {
189190
return "value";
190191
}
191192
}
193+
194+
protected HashFieldConfig getConfig() {
195+
return this.config;
196+
}
192197
}

src/main/java/io/aiven/kafka/connect/transforms/HashFieldConfig.java

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,30 @@
1616

1717
package io.aiven.kafka.connect.transforms;
1818

19-
import java.util.List;
19+
import java.security.MessageDigest;
20+
import java.security.NoSuchAlgorithmException;
2021
import java.util.Map;
2122
import java.util.Optional;
2223
import java.util.stream.Collectors;
2324
import java.util.stream.Stream;
2425

2526
import org.apache.kafka.common.config.AbstractConfig;
2627
import org.apache.kafka.common.config.ConfigDef;
28+
import org.apache.kafka.connect.errors.DataException;
2729

2830
class HashFieldConfig extends AbstractConfig {
2931
public static final String FIELD_NAME_CONFIG = "field.name";
3032
private static final String FIELD_NAME_DOC =
3133
"The name of the field which value should be hashed. If null or empty, "
3234
+ "the entire key or value is used (and assumed to be a string). By default is null.";
35+
public static final String SKIP_MISSING_OR_NULL_CONFIG = "skip.missing.or.null";
36+
private static final String SKIP_MISSING_OR_NULL_DOC =
37+
"In case the source of the new topic name is null or missing, "
38+
+ "should a record be silently passed without transformation.";
3339
public static final String FUNCTION_CONFIG = "function";
3440
private static final String FUNCTION_DOC =
3541
"The name of the hash function to use. The supported values are: md5, sha1, sha256.";
42+
private MessageDigest md = null;
3643

3744
HashFieldConfig(final Map<?, ?> originals) {
3845
super(config(), originals);
@@ -46,11 +53,20 @@ static ConfigDef config() {
4653
null,
4754
ConfigDef.Importance.HIGH,
4855
FIELD_NAME_DOC)
56+
.define(
57+
SKIP_MISSING_OR_NULL_CONFIG,
58+
ConfigDef.Type.BOOLEAN,
59+
false,
60+
ConfigDef.Importance.LOW,
61+
SKIP_MISSING_OR_NULL_DOC)
4962
.define(
5063
FUNCTION_CONFIG,
5164
ConfigDef.Type.STRING,
52-
null,
53-
HashFunctionValidator.INSTANCE,
65+
ConfigDef.NO_DEFAULT_VALUE,
66+
ConfigDef.ValidString.in(
67+
HashFunction.MD5.toString(),
68+
HashFunction.SHA1.toString(),
69+
HashFunction.SHA256.toString()),
5470
ConfigDef.Importance.HIGH,
5571
FUNCTION_DOC);
5672
}
@@ -67,9 +83,9 @@ public String toString() {
6783
return "SHA-256";
6884
}
6985
};
70-
static List<String> stringValues = Stream.of(HashFunction.values())
86+
static String stringValues = Stream.of(HashFunction.values())
7187
.map(Enum::toString)
72-
.collect(Collectors.toList());
88+
.collect(Collectors.joining("|", "[", "]"));
7389
}
7490

7591
Optional<String> fieldName() {
@@ -80,12 +96,22 @@ Optional<String> fieldName() {
8096
return Optional.of(rawFieldName);
8197
}
8298

83-
Optional<String> hashFunction() {
84-
final String rawFieldName = getString(FUNCTION_CONFIG);
85-
if (null == rawFieldName || "".equals(rawFieldName)) {
86-
return Optional.empty();
99+
boolean skipMissingOrNull() {
100+
return getBoolean(SKIP_MISSING_OR_NULL_CONFIG);
101+
}
102+
103+
MessageDigest hashFunction() {
104+
final String hashAlg = getString(FUNCTION_CONFIG);
105+
try {
106+
if (md == null) {
107+
md = MessageDigest.getInstance(hashAlg);
108+
}
109+
} catch (final NoSuchAlgorithmException e) {
110+
throw new DataException("Hash function " + hashAlg
111+
+ " must be "
112+
+ HashFieldConfig.HashFunction.stringValues);
87113
}
88-
return Optional.of(rawFieldName);
114+
return md;
89115
}
90116
}
91117

src/main/java/io/aiven/kafka/connect/transforms/HashFunctionValidator.java

Lines changed: 0 additions & 44 deletions
This file was deleted.

src/test/java/io/aiven/kafka/connect/transforms/HashFieldConfigTest.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,33 @@
2020
import java.util.Map;
2121
import java.util.Optional;
2222

23+
import org.apache.kafka.common.config.ConfigException;
24+
2325
import org.junit.jupiter.api.Test;
2426
import org.junit.jupiter.params.ParameterizedTest;
2527
import org.junit.jupiter.params.provider.ValueSource;
2628

2729
import static org.junit.jupiter.api.Assertions.assertEquals;
30+
import static org.junit.jupiter.api.Assertions.assertThrows;
2831

2932
class HashFieldConfigTest {
3033
@Test
3134
void defaults() {
3235
final Map<String, String> props = new HashMap<>();
36+
final Throwable e = assertThrows(ConfigException.class,
37+
() -> new HashFieldConfig(props));
38+
assertEquals("Missing required configuration \"function\" which has no default value.",
39+
e.getMessage());
40+
}
41+
42+
@ParameterizedTest
43+
@ValueSource(booleans = {true, false})
44+
void skipMissingOrNull(final boolean skipMissingOrNull) {
45+
final Map<String, String> props = new HashMap<>();
46+
props.put(HashFieldConfig.SKIP_MISSING_OR_NULL_CONFIG, Boolean.toString(skipMissingOrNull));
47+
props.put(HashFieldConfig.FUNCTION_CONFIG, HashFieldConfig.HashFunction.SHA256.toString());
3348
final HashFieldConfig config = new HashFieldConfig(props);
34-
assertEquals(Optional.empty(), config.fieldName());
35-
assertEquals(Optional.empty(), config.hashFunction());
49+
assertEquals(skipMissingOrNull, config.skipMissingOrNull());
3650
}
3751

3852
@ParameterizedTest
@@ -41,13 +55,14 @@ void hashFunction(final String hashFunction) {
4155
final Map<String, String> props = new HashMap<>();
4256
props.put(HashFieldConfig.FUNCTION_CONFIG, hashFunction);
4357
final HashFieldConfig config = new HashFieldConfig(props);
44-
assertEquals(Optional.of(hashFunction), config.hashFunction());
58+
assertEquals(hashFunction, config.hashFunction().getAlgorithm());
4559
}
4660

4761
@Test
4862
void emptyFieldName() {
4963
final Map<String, String> props = new HashMap<>();
5064
props.put(HashFieldConfig.FIELD_NAME_CONFIG, "");
65+
props.put(HashFieldConfig.FUNCTION_CONFIG, HashFieldConfig.HashFunction.SHA256.toString());
5166
final HashFieldConfig config = new HashFieldConfig(props);
5267
assertEquals(Optional.empty(), config.fieldName());
5368
}
@@ -56,7 +71,9 @@ void emptyFieldName() {
5671
void definedFieldName() {
5772
final Map<String, String> props = new HashMap<>();
5873
props.put(HashFieldConfig.FIELD_NAME_CONFIG, "test");
74+
props.put(HashFieldConfig.FUNCTION_CONFIG, HashFieldConfig.HashFunction.SHA256.toString());
5975
final HashFieldConfig config = new HashFieldConfig(props);
6076
assertEquals(Optional.of("test"), config.fieldName());
77+
assertEquals(HashFieldConfig.HashFunction.SHA256.toString(), config.hashFunction().getAlgorithm());
6178
}
6279
}

src/test/java/io/aiven/kafka/connect/transforms/HashFieldTest.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,25 @@ abstract class HashFieldTest {
3838

3939
private static final String FIELD = "email";
4040
private static final String FIELD_VALUE = "jerry@all_your_bases.com";
41+
private static final String DEFAULT_HASH_FUNCTION = HashFieldConfig.HashFunction.SHA256.toString();
42+
43+
@ParameterizedTest
44+
@NullAndEmptySource
45+
void noFieldName_NullOrEmptyValue_NoSkip(final String value) {
46+
final Schema schema = SchemaBuilder.STRING_SCHEMA;
47+
final SinkRecord originalRecord = record(schema, value);
48+
final Throwable e = assertThrows(DataException.class,
49+
() -> transformation(null, false, DEFAULT_HASH_FUNCTION).apply(originalRecord));
50+
assertEquals(dataPlace() + " can't be null or empty: " + originalRecord,
51+
e.getMessage());
52+
}
4153

4254
@ParameterizedTest
4355
@ValueSource(strings = {"MD5", "SHA-1", "SHA-256"})
4456
void nullSchema(final String hashFunction) {
4557
final SinkRecord originalRecord = record(null, null);
4658
final Throwable e = assertThrows(DataException.class,
47-
() -> transformation(FIELD, hashFunction).apply(originalRecord));
59+
() -> transformation(FIELD, true, hashFunction).apply(originalRecord));
4860
assertEquals(dataPlace() + " schema can't be null: " + originalRecord, e.getMessage());
4961
}
5062

@@ -54,7 +66,7 @@ void noFieldName_UnsupportedType(final String hashFunction) {
5466
final Schema schema = SchemaBuilder.struct().build();
5567
final SinkRecord originalRecord = record(schema, new Struct(schema));
5668
final Throwable e = assertThrows(DataException.class,
57-
() -> transformation(null, hashFunction).apply(originalRecord));
69+
() -> transformation(null, true, hashFunction).apply(originalRecord));
5870
assertEquals(dataPlace()
5971
+ " schema type must be "
6072
+ "[STRING]"
@@ -69,15 +81,15 @@ void noFieldName_NullOrEmptyValue(final String value) {
6981
final Schema schema = SchemaBuilder.STRING_SCHEMA;
7082
final SinkRecord originalRecord = record(schema, value);
7183
assertDoesNotThrow(
72-
() -> transformation(null, HashFieldConfig.HashFunction.SHA256.toString()).apply(originalRecord));
84+
() -> transformation(null, true, DEFAULT_HASH_FUNCTION).apply(originalRecord));
7385
}
7486

7587
@ParameterizedTest
7688
@ValueSource(strings = {"MD5", "SHA-1", "SHA-256"})
7789
void fieldName_NonStruct(final String hashFunction) {
7890
final SinkRecord originalRecord = record(SchemaBuilder.INT8_SCHEMA, "some");
7991
final Throwable e = assertThrows(DataException.class,
80-
() -> transformation(FIELD, hashFunction).apply(originalRecord));
92+
() -> transformation(FIELD, true, hashFunction).apply(originalRecord));
8193
assertEquals(dataPlace() + " schema type must be STRUCT if field name is specified: "
8294
+ originalRecord,
8395
e.getMessage());
@@ -91,7 +103,7 @@ void fieldName_NullStruct(final String hashFunction) {
91103
.schema();
92104
final SinkRecord originalRecord = record(schema, null);
93105
final Throwable e = assertThrows(DataException.class,
94-
() -> transformation(FIELD, hashFunction).apply(originalRecord));
106+
() -> transformation(FIELD, true, hashFunction).apply(originalRecord));
95107
assertEquals(dataPlace() + " can't be null if field name is specified: " + originalRecord,
96108
e.getMessage());
97109
}
@@ -106,7 +118,7 @@ void fieldName_UnsupportedTypeInField(final String hashFunction) {
106118
final SinkRecord originalRecord = record(
107119
schema, new Struct(schema).put(FIELD, new Struct(innerSchema)));
108120
final Throwable e = assertThrows(DataException.class,
109-
() -> transformation(FIELD, hashFunction).apply(originalRecord));
121+
() -> transformation(FIELD, true, hashFunction).apply(originalRecord));
110122
assertEquals(FIELD + " schema type in " + dataPlace() + " must be "
111123
+ "[STRING]"
112124
+ ": " + originalRecord,
@@ -120,15 +132,21 @@ void fieldName_NormalStringValue(final String hashFunction) {
120132
.field(FIELD, SchemaBuilder.STRING_SCHEMA)
121133
.schema();
122134
final SinkRecord originalRecord = record(schema, new Struct(schema).put(FIELD, FIELD_VALUE));
123-
final SinkRecord result = transformation(FIELD, hashFunction).apply(originalRecord);
124-
assertEquals(setNewValue(originalRecord, HashField.hashString(hashFunction, FIELD_VALUE)), result);
135+
final HashField<SinkRecord> transform = transformation(FIELD, true, hashFunction);
136+
final SinkRecord result = transform.apply(originalRecord);
137+
final String newValue = HashField.hashString(transform.getConfig().hashFunction(), FIELD_VALUE);
138+
assertEquals(setNewValue(originalRecord, newValue), result);
125139
}
126140

127-
private HashField<SinkRecord> transformation(final String fieldName, final String hashFunction) {
141+
private HashField<SinkRecord> transformation(
142+
final String fieldName,
143+
final boolean skipMissingOrNull,
144+
final String hashFunction) {
128145
final Map<String, String> props = new HashMap<>();
129146
if (fieldName != null) {
130147
props.put(HashFieldConfig.FIELD_NAME_CONFIG, fieldName);
131148
}
149+
props.put("skip.missing.or.null", Boolean.toString(skipMissingOrNull));
132150
props.put(HashFieldConfig.FUNCTION_CONFIG, hashFunction);
133151
final HashField<SinkRecord> transform = createTransformationObject();
134152
transform.configure(props);

0 commit comments

Comments
 (0)