Skip to content

Commit 5abe1de

Browse files
committed
transforms: add FilterByFieldValue transformation
Implement FilterByFieldValue transformation to filter records based on field value or regex pattern
1 parent 07f0733 commit 5abe1de

File tree

2 files changed

+357
-0
lines changed

2 files changed

+357
-0
lines changed
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Arrays;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.function.Predicate;
23+
import java.util.regex.Matcher;
24+
import java.util.regex.Pattern;
25+
26+
import org.apache.kafka.common.config.AbstractConfig;
27+
import org.apache.kafka.common.config.ConfigDef;
28+
import org.apache.kafka.connect.connector.ConnectRecord;
29+
import org.apache.kafka.connect.data.Schema;
30+
import org.apache.kafka.connect.data.Struct;
31+
import org.apache.kafka.connect.transforms.Transformation;
32+
33+
public class FilterByValueRegex<R extends ConnectRecord<R>> implements Transformation<R> {
34+
35+
private String fieldName;
36+
private String pattern;
37+
private Pattern fieldValuePattern;
38+
private boolean matches;
39+
private Predicate<Boolean> recordFilterCondition;
40+
41+
public FilterByValueRegex() {
42+
this.fieldName = "";
43+
this.pattern = "";
44+
this.fieldValuePattern = Pattern.compile("");
45+
this.matches = true;
46+
}
47+
48+
@Override
49+
public R apply(final R record) {
50+
if (record.value() instanceof Struct) {
51+
return handleStruct(record);
52+
} else if (record.value() instanceof Map) {
53+
return handleMap(record);
54+
} else if (record.value() instanceof List || record.value().getClass().isArray()) {
55+
return handleListOrArray(record);
56+
}
57+
return null;
58+
}
59+
60+
private R handleStruct(final R record) {
61+
final Struct value = (Struct) record.value();
62+
final Object fieldValueObj = value.get(fieldName);
63+
64+
if (fieldValueObj instanceof List || fieldValueObj instanceof Object[]) {
65+
if (handleArrayOrListField(fieldValueObj)) {
66+
return record;
67+
}
68+
} else {
69+
return checkAndReturnRecord(value.schema().field(fieldName).schema(), fieldValueObj, record);
70+
}
71+
72+
return null;
73+
}
74+
75+
private R handleMap(final R record) {
76+
final Map<String, Object> value = (Map<String, Object>) record.value();
77+
final Object fieldValueObj = value.get(fieldName);
78+
79+
return checkAndReturnRecord(null, fieldValueObj, record);
80+
}
81+
82+
private R handleListOrArray(final R record) {
83+
if (handleArrayOrListField(record.value())) {
84+
return record;
85+
}
86+
return null;
87+
}
88+
89+
private R checkAndReturnRecord(final Schema schema, final Object fieldValueObj, final R record) {
90+
if (fieldValueObj != null) {
91+
final String fieldValueStr = schema != null
92+
? convertToString(schema, fieldValueObj) : fieldValueObj.toString();
93+
94+
if (fieldValueStr != null) {
95+
final Matcher matcher = fieldValuePattern.matcher(fieldValueStr);
96+
final boolean matches = matcher.matches();
97+
98+
if (recordFilterCondition.test(matches)) {
99+
return record;
100+
}
101+
}
102+
}
103+
return null;
104+
}
105+
106+
private String convertToString(final Schema schema, final Object fieldValueObj) {
107+
if (schema.type() == Schema.Type.STRING) {
108+
return (String) fieldValueObj;
109+
} else if (schema.type().isPrimitive()) {
110+
return fieldValueObj.toString();
111+
}
112+
return null;
113+
}
114+
115+
private boolean handleArrayOrListField(final Object fieldValueObj) {
116+
final List<?> valueList = fieldValueObj
117+
instanceof List ? (List<?>) fieldValueObj : Arrays.asList((Object[]) fieldValueObj);
118+
boolean foundMatchingFieldValue = false;
119+
120+
for (final Object value : valueList) {
121+
final String fieldValueStr = value.toString();
122+
final Matcher matcher = fieldValuePattern.matcher(fieldValueStr);
123+
final boolean matches = matcher.matches();
124+
125+
if (recordFilterCondition.test(matches)) {
126+
foundMatchingFieldValue = true;
127+
break;
128+
}
129+
}
130+
131+
return foundMatchingFieldValue;
132+
}
133+
134+
@Override
135+
public ConfigDef config() {
136+
return new ConfigDef()
137+
.define("fieldName",
138+
ConfigDef.Type.STRING,
139+
ConfigDef.Importance.HIGH, "The field name to filter by")
140+
.define("pattern",
141+
ConfigDef.Type.STRING,
142+
ConfigDef.Importance.HIGH, "The pattern to match")
143+
.define("matches",
144+
ConfigDef.Type.BOOLEAN, true,
145+
ConfigDef.Importance.MEDIUM, "The filter mode, 'true' for matching or 'false' for non-matching");
146+
}
147+
148+
@Override
149+
public void close() {
150+
}
151+
152+
@Override
153+
public void configure(final Map<String, ?> configs) {
154+
final AbstractConfig config = new AbstractConfig(config(), configs);
155+
this.fieldName = config.getString("fieldName");
156+
this.pattern = config.getString("pattern");
157+
this.fieldValuePattern = Pattern.compile(this.pattern);
158+
this.matches = config.getBoolean("matches");
159+
recordFilterCondition = this.matches
160+
? (result -> result)
161+
: (result -> !result);
162+
}
163+
}
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Arrays;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
import org.apache.kafka.connect.data.Schema;
25+
import org.apache.kafka.connect.data.SchemaBuilder;
26+
import org.apache.kafka.connect.data.Struct;
27+
import org.apache.kafka.connect.source.SourceRecord;
28+
29+
import org.junit.jupiter.api.Test;
30+
31+
import static org.junit.jupiter.api.Assertions.assertEquals;
32+
import static org.junit.jupiter.api.Assertions.assertNotNull;
33+
import static org.junit.jupiter.api.Assertions.assertNull;
34+
35+
class FilterByValueRegexTest {
36+
37+
private static final Schema VALUE_SCHEMA = SchemaBuilder.struct()
38+
.field("before", Schema.OPTIONAL_STRING_SCHEMA)
39+
.field("after", SchemaBuilder.struct()
40+
.field("pk", Schema.STRING_SCHEMA)
41+
.field("value", Schema.STRING_SCHEMA)
42+
.build())
43+
.field("source", SchemaBuilder.struct().optional())
44+
.field("op", Schema.STRING_SCHEMA)
45+
.field("ts_ms", Schema.STRING_SCHEMA)
46+
.field("transaction", Schema.OPTIONAL_STRING_SCHEMA)
47+
.build();
48+
49+
@Test
50+
void shouldFilterOutRecordsEqualsToReadEvents() {
51+
final FilterByValueRegex<SourceRecord> filter = new FilterByValueRegex<>();
52+
filter.configure(Map.of(
53+
"fieldName", "op",
54+
"pattern", "r",
55+
"matches", "false"
56+
));
57+
58+
final Struct after = new Struct(VALUE_SCHEMA.field("after").schema())
59+
.put("pk", "1")
60+
.put("value", "New data");
61+
62+
final Struct value = new Struct(VALUE_SCHEMA)
63+
.put("before", null)
64+
.put("after", after)
65+
.put("source", null)
66+
.put("op", "r")
67+
.put("ts_ms", "1620393591654")
68+
.put("transaction", null);
69+
70+
final var record = new SourceRecord(null, null, "some_topic", VALUE_SCHEMA, value);
71+
72+
final var actual = filter.apply(record);
73+
assertNull(actual, "Record with op 'r' should be filtered out");
74+
}
75+
76+
@Test
77+
void shouldKeepRecordsNotEqualsToReadEvents() {
78+
final FilterByValueRegex<SourceRecord> filter = new FilterByValueRegex<>();
79+
filter.configure(Map.of(
80+
"fieldName", "op",
81+
"pattern", "r",
82+
"matches", "false"
83+
));
84+
85+
final Struct after = new Struct(VALUE_SCHEMA.field("after").schema())
86+
.put("pk", "1")
87+
.put("value", "New data");
88+
89+
final Struct value = new Struct(VALUE_SCHEMA)
90+
.put("before", null)
91+
.put("after", after)
92+
.put("source", null)
93+
.put("op", "u")
94+
.put("ts_ms", "1620393591654")
95+
.put("transaction", null);
96+
97+
final var record = new SourceRecord(null, null, "some_topic", VALUE_SCHEMA, value);
98+
99+
final var actual = filter.apply(record);
100+
assertEquals(record, actual, "Record with op not equal to 'r' should be kept");
101+
}
102+
103+
@Test
104+
void shouldFilterOutRecordsNotEqualsReadEvents() {
105+
final FilterByValueRegex<SourceRecord> filter = new FilterByValueRegex<>();
106+
filter.configure(Map.of(
107+
"fieldName", "op",
108+
"pattern", "r",
109+
"matches", "true"
110+
));
111+
112+
final Struct after = new Struct(VALUE_SCHEMA.field("after").schema())
113+
.put("pk", "1")
114+
.put("value", "New data");
115+
116+
final Struct value = new Struct(VALUE_SCHEMA)
117+
.put("before", null)
118+
.put("after", after)
119+
.put("source", null)
120+
.put("op", "u")
121+
.put("ts_ms", "1620393591654")
122+
.put("transaction", null);
123+
124+
final var record = new SourceRecord(null, null, "some_topic", VALUE_SCHEMA, value);
125+
126+
final var actual = filter.apply(record);
127+
assertNull(actual, "Record with op not equal to 'r' should be filtered out");
128+
}
129+
130+
@Test
131+
void shouldFilterMatchingArrayFieldValue() {
132+
final FilterByValueRegex<SourceRecord> filterByValueRegex = new FilterByValueRegex<>();
133+
final Map<String, String> configs = new HashMap<>();
134+
configs.put("fieldName", "tags");
135+
configs.put("pattern", ".*apple.*");
136+
configs.put("matches", "true");
137+
filterByValueRegex.configure(configs);
138+
139+
final Schema schema = SchemaBuilder.struct()
140+
.field("name", Schema.STRING_SCHEMA)
141+
.field("tags", SchemaBuilder.array(Schema.STRING_SCHEMA))
142+
.build();
143+
final List<String> tags = Arrays.asList("apple", "orange", "mango");
144+
final Struct value = new Struct(schema)
145+
.put("name", "John Doe")
146+
.put("tags", tags);
147+
148+
final var record = new SourceRecord(null, null, "some_topic", schema, value);
149+
150+
final var actual = filterByValueRegex.apply(record);
151+
assertEquals(record, actual, "The record contains the matching pattern");
152+
}
153+
154+
@Test
155+
void shouldFilterOutMapFieldValue() {
156+
final FilterByValueRegex<SourceRecord> filterByValueRegex = new FilterByValueRegex<>();
157+
final Map<String, String> configs = new HashMap<>();
158+
configs.put("fieldName", "language");
159+
configs.put("pattern", ".*Java.*");
160+
configs.put("matches", "false");
161+
filterByValueRegex.configure(configs);
162+
163+
final Map<String, Object> value = new HashMap<>();
164+
value.put("name", "John Doe");
165+
value.put("language", "Java");
166+
167+
final var record = new SourceRecord(null, null, "some_topic", Schema.STRING_SCHEMA, value);
168+
169+
final var actual = filterByValueRegex.apply(record);
170+
assertNull(actual, "The record should be filtered out");
171+
}
172+
173+
@Test
174+
void shouldFilterArrayFieldValue() {
175+
final FilterByValueRegex<SourceRecord> filterByValueRegex = new FilterByValueRegex<>();
176+
final Map<String, String> configs = new HashMap<>();
177+
configs.put("fieldName", "tags");
178+
configs.put("pattern", ".*apple.*");
179+
configs.put("matches", "true");
180+
filterByValueRegex.configure(configs);
181+
182+
// Test with a list
183+
final List<String> tagsList = Arrays.asList("apple", "orange", "mango");
184+
final var listRecord = new SourceRecord(null, null, "some_topic", null, tagsList);
185+
final var listActual = filterByValueRegex.apply(listRecord);
186+
assertNotNull(listActual, "The record should not be filtered out and return the record itself");
187+
188+
// Test with an array
189+
final String[] tagsArray = {"apple", "orange", "mango"};
190+
final var arrayRecord = new SourceRecord(null, null, "some_topic", null, tagsArray);
191+
final var arrayActual = filterByValueRegex.apply(arrayRecord);
192+
assertNotNull(arrayActual, "The record should not be filtered out and return the record itself");
193+
}
194+
}

0 commit comments

Comments
 (0)