Skip to content

Commit 6aaa720

Browse files
Merge pull request #13 from aiven/ivanyu-hashfield-clean-and-readme
Improve Hash implementation and update README
2 parents 1b7ebd6 + c212d42 commit 6aaa720

File tree

8 files changed

+428
-282
lines changed

8 files changed

+428
-282
lines changed

README.md

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ Exists in two variants:
4444
The transformation defines the following configurations:
4545

4646
- `field.name` - The name of the field which should be used as the topic name. If `null` or empty, the entire key or value is used (and assumed to be a string). By default is `null`.
47-
- `skip.missing.or.null` - In case the source of the new topic name is `null` or missing, should a record be silently passed without transformation. By default is `false`.
47+
- `skip.missing.or.null` - In case the source of the new topic name is `null` or missing, should a record be silently passed without transformation. By default, is `false`.
4848

4949
Here is an example of this transformation configuration:
5050

@@ -54,6 +54,32 @@ transforms.ExtractTopicFromValueField.type=io.aiven.kafka.connect.transforms.Ext
5454
transforms.ExtractTopicFromValueField.field.name=inner_field_name
5555
```
5656

57+
### `Hash`
58+
59+
This transformation replaces a string value with its hash.
60+
61+
The transformation can hash either the whole key or value (in this case, it must have `STRING` type) or a field in them (in this case, it must have `STRUCT` type and the field's value must be `STRING`).
62+
63+
Exists in two variants:
64+
65+
- `io.aiven.kafka.connect.transforms.Hash$Key` - works on keys;
66+
- `io.aiven.kafka.connect.transforms.Hash$Value` - works on values.
67+
68+
The transformation defines the following configurations:
69+
70+
- `field.name` - The name of the field which value should be hashed. If `null` or empty, the entire key or value is used (and assumed to be a string). By default, is `null`.
71+
- `function` - The name of the hash function to use. The supported values are: `md5`, `sha1`, and `sha256`.
72+
- `skip.missing.or.null` - In case the value to be hashed is `null` or missing, should a record be silently passed without transformation. By default, is `false`.
73+
74+
Here is an example of this transformation configuration:
75+
76+
```
77+
transforms=HashEmail
78+
transforms.HashEmail.type=io.aiven.kafka.connect.transforms.Hash$Value
79+
transforms.HashEmail.field.name=email
80+
transforms.HashEmail.function=sha1
81+
```
82+
5783
### `TombstoneHandler`
5884

5985
This transformation manages tombstone records,

src/main/java/io/aiven/kafka/connect/transforms/HashField.java renamed to src/main/java/io/aiven/kafka/connect/transforms/Hash.java

Lines changed: 56 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@
1717
package io.aiven.kafka.connect.transforms;
1818

1919
import java.security.MessageDigest;
20-
import java.util.Arrays;
20+
import java.security.NoSuchAlgorithmException;
2121
import java.util.Base64;
22-
import java.util.List;
2322
import java.util.Map;
2423
import java.util.Optional;
2524

@@ -29,29 +28,46 @@
2928
import org.apache.kafka.connect.data.Schema;
3029
import org.apache.kafka.connect.data.SchemaAndValue;
3130
import org.apache.kafka.connect.data.Struct;
31+
import org.apache.kafka.connect.errors.ConnectException;
3232
import org.apache.kafka.connect.errors.DataException;
3333
import org.apache.kafka.connect.transforms.Transformation;
3434

3535
import org.slf4j.Logger;
3636
import org.slf4j.LoggerFactory;
3737

3838

39-
public abstract class HashField<R extends ConnectRecord<R>> implements Transformation<R> {
40-
private static final Logger log = LoggerFactory.getLogger(HashField.class);
39+
public abstract class Hash<R extends ConnectRecord<R>> implements Transformation<R> {
40+
private static final Logger log = LoggerFactory.getLogger(Hash.class);
4141

42-
private HashFieldConfig config;
43-
private static final List<Schema.Type> SUPPORTED_TYPES_TO_CONVERT_FROM = Arrays.asList(
44-
Schema.Type.STRING
45-
);
42+
private HashConfig config;
43+
private MessageDigest messageDigest;
4644

4745
@Override
4846
public ConfigDef config() {
49-
return HashFieldConfig.config();
47+
return HashConfig.config();
5048
}
5149

5250
@Override
5351
public void configure(final Map<String, ?> configs) {
54-
this.config = new HashFieldConfig(configs);
52+
this.config = new HashConfig(configs);
53+
54+
try {
55+
switch (config.hashFunction()) {
56+
case MD5:
57+
messageDigest = MessageDigest.getInstance("MD5");
58+
break;
59+
case SHA1:
60+
messageDigest = MessageDigest.getInstance("SHA1");
61+
break;
62+
case SHA256:
63+
messageDigest = MessageDigest.getInstance("SHA-256");
64+
break;
65+
default:
66+
throw new ConnectException("Unknown hash function " + config.hashFunction());
67+
}
68+
} catch (final NoSuchAlgorithmException e) {
69+
throw new ConnectException(e);
70+
}
5571
}
5672

5773
@Override
@@ -66,12 +82,8 @@ public R apply(final R record) {
6682
newValue = getNewValueForNamedField(
6783
record.toString(), schemaAndValue.schema(), schemaAndValue.value(), config.fieldName().get());
6884
} else {
69-
if (config.skipMissingOrNull()) {
70-
newValue = getNewValueWithoutFieldName(
71-
record.toString(), schemaAndValue.schema(), schemaAndValue.value());
72-
} else {
73-
throw new DataException(dataPlace() + " can't be null or empty: " + record);
74-
}
85+
newValue = getNewValueWithoutFieldName(
86+
record.toString(), schemaAndValue.schema(), schemaAndValue.value());
7587
}
7688

7789
if (newValue.isPresent()) {
@@ -114,60 +126,61 @@ private Optional<String> getNewValueForNamedField(final String recordStr,
114126
final Field field = schema.field(fieldName);
115127
if (field == null) {
116128
if (config.skipMissingOrNull()) {
129+
log.debug(fieldName + " in " + dataPlace() + " schema is missing, skipping transformation");
117130
return Optional.empty();
118131
} else {
119132
throw new DataException(fieldName + " in " + dataPlace() + " schema can't be missing: " + recordStr);
120133
}
121134
}
122135

123-
if (!SUPPORTED_TYPES_TO_CONVERT_FROM.contains(field.schema().type())) {
136+
if (field.schema().type() != Schema.Type.STRING) {
124137
throw new DataException(fieldName + " schema type in " + dataPlace()
125-
+ " must be " + SUPPORTED_TYPES_TO_CONVERT_FROM
138+
+ " must be " + Schema.Type.STRING
126139
+ ": " + recordStr);
127140
}
128141

129142
final Struct struct = (Struct) value;
130-
131-
final Optional<String> result = Optional.ofNullable(struct.get(fieldName))
132-
.map(Object::toString).map(s -> hashString(config.hashFunction(), s));
133-
134-
if (result.isPresent() && !result.get().equals("")) {
135-
return result;
136-
} else {
143+
final String stringValue = struct.getString(fieldName);
144+
if (stringValue == null) {
137145
if (config.skipMissingOrNull()) {
146+
log.debug(fieldName + " in " + dataPlace() + " is null, skipping transformation");
138147
return Optional.empty();
139148
} else {
140-
throw new DataException(fieldName + " in " + dataPlace() + " can't be null or empty: " + recordStr);
149+
throw new DataException(fieldName + " in " + dataPlace() + " can't be null: " + recordStr);
141150
}
151+
} else {
152+
return Optional.of(hashString(stringValue));
142153
}
143154
}
144155

145-
static final String hashString(final MessageDigest md, final String input) {
146-
final byte[] digest = md.digest(input.getBytes());
147-
return Base64.getEncoder().encodeToString(digest);
148-
}
149-
150156
private Optional<String> getNewValueWithoutFieldName(final String recordStr,
151157
final Schema schema,
152158
final Object value) {
153-
if (!SUPPORTED_TYPES_TO_CONVERT_FROM.contains(schema.type())) {
159+
if (schema.type() != Schema.Type.STRING) {
154160
throw new DataException(dataPlace() + " schema type must be "
155-
+ SUPPORTED_TYPES_TO_CONVERT_FROM
161+
+ Schema.Type.STRING
156162
+ " if field name is not specified: "
157163
+ recordStr);
158164
}
159165

160-
final Optional<String> result = Optional.ofNullable(value)
161-
.map(Object::toString).map(s -> hashString(config.hashFunction(), s));
162-
163-
if (result.isPresent() && !result.get().equals("")) {
164-
return result;
165-
} else {
166-
return Optional.empty();
166+
if (value == null) {
167+
if (config.skipMissingOrNull()) {
168+
log.debug(dataPlace() + " is null, skipping transformation");
169+
return Optional.empty();
170+
} else {
171+
throw new DataException(dataPlace() + " can't be null: " + recordStr);
172+
}
167173
}
174+
175+
return Optional.of(hashString(value.toString()));
176+
}
177+
178+
private String hashString(final String string) {
179+
final byte[] digest = messageDigest.digest(string.getBytes());
180+
return Base64.getEncoder().encodeToString(digest);
168181
}
169182

170-
public static class Key<R extends ConnectRecord<R>> extends HashField<R> {
183+
public static class Key<R extends ConnectRecord<R>> extends Hash<R> {
171184
@Override
172185
protected SchemaAndValue getSchemaAndValue(final R record) {
173186
return new SchemaAndValue(record.keySchema(), record.key());
@@ -179,7 +192,7 @@ protected String dataPlace() {
179192
}
180193
}
181194

182-
public static class Value<R extends ConnectRecord<R>> extends HashField<R> {
195+
public static class Value<R extends ConnectRecord<R>> extends Hash<R> {
183196
@Override
184197
protected SchemaAndValue getSchemaAndValue(final R record) {
185198
return new SchemaAndValue(record.valueSchema(), record.value());
@@ -191,7 +204,7 @@ protected String dataPlace() {
191204
}
192205
}
193206

194-
protected HashFieldConfig getConfig() {
207+
protected HashConfig getConfig() {
195208
return this.config;
196209
}
197210
}

src/main/java/io/aiven/kafka/connect/transforms/HashFieldConfig.java renamed to src/main/java/io/aiven/kafka/connect/transforms/HashConfig.java

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,26 @@
1616

1717
package io.aiven.kafka.connect.transforms;
1818

19-
import java.security.MessageDigest;
20-
import java.security.NoSuchAlgorithmException;
2119
import java.util.Map;
2220
import java.util.Optional;
23-
import java.util.stream.Collectors;
24-
import java.util.stream.Stream;
2521

2622
import org.apache.kafka.common.config.AbstractConfig;
2723
import org.apache.kafka.common.config.ConfigDef;
28-
import org.apache.kafka.connect.errors.DataException;
2924

30-
class HashFieldConfig extends AbstractConfig {
31-
public static final String FIELD_NAME_CONFIG = "field.name";
25+
class HashConfig extends AbstractConfig {
26+
private static final String FIELD_NAME_CONFIG = "field.name";
3227
private static final String FIELD_NAME_DOC =
3328
"The name of the field which value should be hashed. If null or empty, "
3429
+ "the entire key or value is used (and assumed to be a string). By default is null.";
35-
public static final String SKIP_MISSING_OR_NULL_CONFIG = "skip.missing.or.null";
30+
private static final String SKIP_MISSING_OR_NULL_CONFIG = "skip.missing.or.null";
3631
private static final String SKIP_MISSING_OR_NULL_DOC =
37-
"In case the source of the new topic name is null or missing, "
32+
"In case the value to be hashed is null or missing, "
3833
+ "should a record be silently passed without transformation.";
39-
public static final String FUNCTION_CONFIG = "function";
34+
private static final String FUNCTION_CONFIG = "function";
4035
private static final String FUNCTION_DOC =
4136
"The name of the hash function to use. The supported values are: md5, sha1, sha256.";
42-
private MessageDigest md = null;
4337

44-
HashFieldConfig(final Map<?, ?> originals) {
38+
HashConfig(final Map<?, ?> originals) {
4539
super(config(), originals);
4640
}
4741

@@ -72,20 +66,25 @@ static ConfigDef config() {
7266
}
7367

7468
public enum HashFunction {
75-
MD5,
69+
MD5 {
70+
public String toString() {
71+
return "md5";
72+
}
73+
},
7674
SHA1 {
7775
public String toString() {
78-
return "SHA-1";
76+
return "sha1";
7977
}
8078
},
8179
SHA256 {
8280
public String toString() {
83-
return "SHA-256";
81+
return "sha256";
8482
}
8583
};
86-
static String stringValues = Stream.of(HashFunction.values())
87-
.map(Enum::toString)
88-
.collect(Collectors.joining("|", "[", "]"));
84+
85+
public static HashFunction fromString(final String value) {
86+
return valueOf(value.toUpperCase());
87+
}
8988
}
9089

9190
Optional<String> fieldName() {
@@ -100,18 +99,8 @@ boolean skipMissingOrNull() {
10099
return getBoolean(SKIP_MISSING_OR_NULL_CONFIG);
101100
}
102101

103-
MessageDigest hashFunction() {
104-
final String hashAlg = getString(FUNCTION_CONFIG);
105-
try {
106-
if (md == null) {
107-
md = MessageDigest.getInstance(hashAlg);
108-
}
109-
} catch (final NoSuchAlgorithmException e) {
110-
throw new DataException("Hash function " + hashAlg
111-
+ " must be "
112-
+ HashFieldConfig.HashFunction.stringValues);
113-
}
114-
return md;
102+
HashFunction hashFunction() {
103+
return HashFunction.fromString(getString(FUNCTION_CONFIG));
115104
}
116105
}
117106

0 commit comments

Comments
 (0)