Skip to content

Commit c9855d7

Browse files
authored
Merge pull request #82 from aiven/sebinsunny-filter-transformation
transforms: add FilterByFieldValue transformation
2 parents 7e876a0 + 09a5a25 commit c9855d7

File tree

3 files changed

+459
-0
lines changed

3 files changed

+459
-0
lines changed

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,27 @@ transforms=MakeTombstone
141141
transforms.MakeTombstone.type=io.aiven.kafka.connect.transforms.MakeTombstone
142142
```
143143

144+
### `FilterByFieldValue`
145+
146+
This transformation allows filtering records based either on a specific field or whole value and a matching expected value or regex pattern.
147+
148+
Here is an example of this transformation configuration:
149+
150+
```properties
151+
transforms=Filter
152+
transforms.Filter.type=io.aiven.kafka.connect.transforms.FilterByFieldValue
153+
transforms.Filter.field.name=<field_name>
154+
transforms.Filter.field.value=<field_value>
155+
transforms.Filter.field.value.pattern=<regex_pattern>
156+
transforms.Filter.field.value.matches=<true|false>
157+
```
158+
159+
If `field.name` is empty, the whole value is considered for filtering.
160+
161+
Either `field.value` or `field.value.pattern` must be defined to apply filter.
162+
163+
Only, `string`, `numeric` and `boolean` types are considered for matching purposes, other types are ignored.
164+
144165
## License
145166

146167
This project is licensed under the [Apache License, Version 2.0](LICENSE).
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Map;
20+
import java.util.Optional;
21+
import java.util.function.Predicate;
22+
import java.util.regex.Pattern;
23+
24+
import org.apache.kafka.common.config.AbstractConfig;
25+
import org.apache.kafka.common.config.ConfigDef;
26+
import org.apache.kafka.common.config.ConfigException;
27+
import org.apache.kafka.connect.connector.ConnectRecord;
28+
import org.apache.kafka.connect.data.Field;
29+
import org.apache.kafka.connect.data.Schema;
30+
import org.apache.kafka.connect.data.SchemaAndValue;
31+
import org.apache.kafka.connect.data.Struct;
32+
import org.apache.kafka.connect.data.Values;
33+
import org.apache.kafka.connect.transforms.Transformation;
34+
35+
public abstract class FilterByFieldValue<R extends ConnectRecord<R>> implements Transformation<R> {
36+
37+
private String fieldName;
38+
private Optional<String> fieldExpectedValue;
39+
private Optional<String> fieldValuePattern;
40+
41+
@Override
42+
public ConfigDef config() {
43+
return new ConfigDef()
44+
.define("field.name",
45+
ConfigDef.Type.STRING,
46+
null,
47+
ConfigDef.Importance.HIGH,
48+
"The field name to filter by."
49+
+ "Schema-based records (Avro), schemaless (e.g. JSON), and raw values are supported."
50+
+ "If empty, the whole key/value record will be filtered.")
51+
.define("field.value",
52+
ConfigDef.Type.STRING,
53+
null,
54+
ConfigDef.Importance.HIGH,
55+
"Expected value to match. Either define this, or a regex pattern")
56+
.define("field.value.pattern",
57+
ConfigDef.Type.STRING,
58+
null,
59+
ConfigDef.Importance.HIGH,
60+
"The pattern to match. Either define this, or an expected value")
61+
.define("field.value.matches",
62+
ConfigDef.Type.BOOLEAN,
63+
true,
64+
ConfigDef.Importance.MEDIUM,
65+
"The filter mode, 'true' for matching or 'false' for non-matching");
66+
}
67+
68+
@Override
69+
public void configure(final Map<String, ?> configs) {
70+
final AbstractConfig config = new AbstractConfig(config(), configs);
71+
this.fieldName = config.getString("field.name");
72+
this.fieldExpectedValue = Optional.ofNullable(config.getString("field.value"));
73+
this.fieldValuePattern = Optional.ofNullable(config.getString("field.value.pattern"));
74+
final boolean expectedValuePresent = fieldExpectedValue.isPresent();
75+
final boolean regexPatternPresent = fieldValuePattern.map(s -> !s.isEmpty()).orElse(false);
76+
if (expectedValuePresent == regexPatternPresent) {
77+
throw new ConfigException(
78+
"Either field.value or field.value.pattern have to be set to apply filter transform");
79+
}
80+
final Predicate<SchemaAndValue> matchCondition;
81+
82+
if (expectedValuePresent) {
83+
final SchemaAndValue expectedSchemaAndValue = Values.parseString(fieldExpectedValue.get());
84+
matchCondition = schemaAndValue -> expectedSchemaAndValue.value().equals(schemaAndValue.value());
85+
} else {
86+
final String pattern = fieldValuePattern.get();
87+
final Predicate<String> regexPredicate = Pattern.compile(pattern).asPredicate();
88+
matchCondition = schemaAndValue ->
89+
schemaAndValue != null
90+
&& regexPredicate.test(Values.convertToString(schemaAndValue.schema(), schemaAndValue.value()));
91+
}
92+
93+
this.filterCondition = config.getBoolean("field.value.matches")
94+
? matchCondition
95+
: (result -> !matchCondition.test(result));
96+
}
97+
98+
private Predicate<SchemaAndValue> filterCondition;
99+
100+
protected abstract Schema operatingSchema(R record);
101+
102+
protected abstract Object operatingValue(R record);
103+
104+
@Override
105+
public R apply(final R record) {
106+
if (operatingValue(record) == null) {
107+
return record;
108+
}
109+
110+
if (operatingSchema(record) == null) {
111+
return applySchemaless(record);
112+
} else {
113+
return applyWithSchema(record);
114+
}
115+
}
116+
117+
private R applyWithSchema(final R record) {
118+
final Struct struct = (Struct) operatingValue(record);
119+
final SchemaAndValue schemaAndValue = getStructFieldValue(struct, fieldName).orElse(null);
120+
return filterCondition.test(schemaAndValue) ? record : null;
121+
}
122+
123+
private Optional<SchemaAndValue> getStructFieldValue(final Struct struct, final String fieldName) {
124+
final Schema schema = struct.schema();
125+
final Field field = schema.field(fieldName);
126+
final Object fieldValue = struct.get(field);
127+
if (fieldValue == null) {
128+
return Optional.empty();
129+
} else {
130+
return Optional.of(new SchemaAndValue(field.schema(), struct.get(field)));
131+
}
132+
}
133+
134+
@SuppressWarnings("unchecked")
135+
private R applySchemaless(final R record) {
136+
if (fieldName == null || fieldName.isEmpty()) {
137+
final SchemaAndValue schemaAndValue = getSchemalessFieldValue(operatingValue(record)).orElse(null);
138+
return filterCondition.test(schemaAndValue) ? record : null;
139+
} else {
140+
final Map<String, Object> map = (Map<String, Object>) operatingValue(record);
141+
final SchemaAndValue schemaAndValue = getSchemalessFieldValue(map.get(fieldName)).orElse(null);
142+
return filterCondition.test(schemaAndValue) ? record : null;
143+
}
144+
}
145+
146+
private Optional<SchemaAndValue> getSchemalessFieldValue(final Object fieldValue) {
147+
if (fieldValue == null) {
148+
return Optional.empty();
149+
}
150+
return Optional.of(new SchemaAndValue(Values.inferSchema(fieldValue), fieldValue));
151+
}
152+
153+
@Override
154+
public void close() {
155+
}
156+
157+
public static final class Key<R extends ConnectRecord<R>> extends FilterByFieldValue<R> {
158+
159+
@Override
160+
protected Schema operatingSchema(final R record) {
161+
return record.keySchema();
162+
}
163+
164+
@Override
165+
protected Object operatingValue(final R record) {
166+
return record.key();
167+
}
168+
}
169+
170+
public static final class Value<R extends ConnectRecord<R>> extends FilterByFieldValue<R> {
171+
172+
@Override
173+
protected Schema operatingSchema(final R record) {
174+
return record.valueSchema();
175+
}
176+
177+
@Override
178+
protected Object operatingValue(final R record) {
179+
return record.value();
180+
}
181+
}
182+
}

0 commit comments

Comments
 (0)