Skip to content

Commit 3ade150

Browse files
committed
refactor: field by field value
1 parent 5cec48a commit 3ade150

File tree

4 files changed

+167
-237
lines changed

4 files changed

+167
-237
lines changed

README.md

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,19 +141,23 @@ transforms=MakeTombstone
141141
transforms.MakeTombstone.type=io.aiven.kafka.connect.transforms.MakeTombstone
142142
```
143143

144-
### `FilterByValueRegex`
144+
### `FilterByFieldValue`
145145

146-
This transformation allows filtering records based on a specific field value and a given regex pattern. The filter supports two modes: equality (equals) and inequality (not_equals).
146+
This transformation allows filtering records based on a specific field value or a given regex pattern.
147147

148148
Here is an example of this transformation configuration:
149+
149150
```properties
150151
transforms=Filter
151-
transforms.Filter.type=io.aiven.kafka.connect.transforms.FilterByValueRegex
152-
transforms.Filter.fieldName=<field_name>
153-
transforms.Filter.pattern=<regex_pattern>
154-
transforms.Filter.matches=<true|false>
152+
transforms.Filter.type=io.aiven.kafka.connect.transforms.FilterByFieldValue
153+
transforms.Filter.field.name=<field_name>
154+
transforms.Filter.field.value=<field_value>
155+
transforms.Filter.field.value.pattern=<regex_pattern>
156+
transforms.Filter.field.value.matches=<true|false>
155157
```
156158

159+
Either `field.value` or `field.value.pattern` must be defined to apply filter.
160+
157161
## License
158162

159163
This project is licensed under the [Apache License, Version 2.0](LICENSE).
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Map;
20+
import java.util.Optional;
21+
import java.util.function.Predicate;
22+
23+
import org.apache.kafka.common.config.AbstractConfig;
24+
import org.apache.kafka.common.config.ConfigDef;
25+
import org.apache.kafka.common.config.ConfigException;
26+
import org.apache.kafka.connect.connector.ConnectRecord;
27+
import org.apache.kafka.connect.data.Field;
28+
import org.apache.kafka.connect.data.Schema;
29+
import org.apache.kafka.connect.data.Struct;
30+
import org.apache.kafka.connect.transforms.Transformation;
31+
32+
import static org.apache.kafka.connect.data.Schema.Type.STRING;
33+
34+
public class FilterByFieldValue<R extends ConnectRecord<R>> implements Transformation<R> {
35+
36+
private String fieldName;
37+
private Optional<String> fieldExpectedValue;
38+
private Optional<String> fieldValuePattern;
39+
private Predicate<Optional<String>> filterCondition;
40+
41+
@Override
42+
public R apply(final R record) {
43+
if (record.value() instanceof Struct) {
44+
return handleStruct(record);
45+
} else if (record.value() instanceof Map) {
46+
return handleMap(record);
47+
}
48+
return record; // if record is other than map or struct, pass-by
49+
}
50+
51+
private R handleStruct(final R record) {
52+
Struct struct = (Struct) record.value();
53+
final Optional<String> fieldValue = extractStructFieldValue(struct, fieldName);
54+
return filterCondition.test(fieldValue) ? record : null;
55+
}
56+
57+
private Optional<String> extractStructFieldValue(Struct struct, String fieldName) {
58+
final Schema schema = struct.schema();
59+
final Field field = schema.field(fieldName);
60+
final Object fieldValue = struct.get(field);
61+
62+
Optional<String> text = Optional.empty();
63+
if (STRING.equals(field.schema().type())) {
64+
text = Optional.of((String) fieldValue);
65+
} else if (schema.type().isPrimitive()) {
66+
text = Optional.of(fieldValue.toString());
67+
}
68+
return text;
69+
}
70+
71+
@SuppressWarnings("unchecked")
72+
private R handleMap(final R record) {
73+
final Map<String, Object> map = (Map<String, Object>) record.value();
74+
final Optional<String> fieldValue = extractMapFieldValue(map, fieldName);
75+
return filterCondition.test(fieldValue) ? record : null;
76+
}
77+
78+
private Optional<String> extractMapFieldValue(Map<String, Object> map, String fieldName) {
79+
if (!map.containsKey(fieldName)) return Optional.empty();
80+
81+
final Object fieldValue = map.get(fieldName);
82+
83+
Optional<String> text = Optional.empty();
84+
if (fieldValue instanceof String
85+
|| fieldValue instanceof Long
86+
|| fieldValue instanceof Integer
87+
|| fieldValue instanceof Short
88+
|| fieldValue instanceof Double
89+
|| fieldValue instanceof Float
90+
|| fieldValue instanceof Boolean) {
91+
text = Optional.of(fieldValue.toString());
92+
}
93+
return text;
94+
}
95+
96+
@Override
97+
public ConfigDef config() {
98+
return new ConfigDef()
99+
.define("field.name",
100+
ConfigDef.Type.STRING,
101+
ConfigDef.Importance.HIGH, "The field name to filter by")
102+
.define("field.value",
103+
ConfigDef.Type.STRING, null,
104+
ConfigDef.Importance.HIGH, "Expected value to match. Either define this, or a regex pattern")
105+
.define("field.value.pattern",
106+
ConfigDef.Type.STRING, null,
107+
ConfigDef.Importance.HIGH, "The pattern to match. Either define this, or an expected value")
108+
.define("field.value.matches",
109+
ConfigDef.Type.BOOLEAN, true,
110+
ConfigDef.Importance.MEDIUM, "The filter mode, 'true' for matching or 'false' for non-matching");
111+
}
112+
113+
@Override
114+
public void close() {
115+
}
116+
117+
@Override
118+
public void configure(final Map<String, ?> configs) {
119+
final AbstractConfig config = new AbstractConfig(config(), configs);
120+
this.fieldName = config.getString("field.name");
121+
this.fieldExpectedValue = Optional.ofNullable(config.getString("field.value"));
122+
this.fieldValuePattern = Optional.ofNullable(config.getString("field.value.pattern"));
123+
boolean expectedValuePresent = fieldExpectedValue.map(s -> !s.isEmpty()).orElse(false);
124+
boolean regexPatternPresent = fieldValuePattern.map(s -> !s.isEmpty()).orElse(false);
125+
if ((expectedValuePresent && regexPatternPresent)
126+
|| (!expectedValuePresent && !regexPatternPresent)) {
127+
throw new ConfigException("Either field.value or field.value.pattern have to be set to apply filter transform");
128+
}
129+
Predicate<Optional<String>> matchCondition = fieldValue -> fieldValue
130+
.filter(value -> expectedValuePresent ?
131+
fieldExpectedValue.get().equals(value) :
132+
value.matches(fieldValuePattern.get()))
133+
.isPresent();
134+
this.filterCondition = config.getBoolean("field.value.matches")
135+
? (matchCondition)
136+
: (result -> !matchCondition.test(result));
137+
}
138+
}

src/main/java/io/aiven/kafka/connect/transforms/FilterByValueRegex.java

Lines changed: 0 additions & 163 deletions
This file was deleted.

0 commit comments

Comments
 (0)