Skip to content

Commit c98fcfc

Browse files
committed
feat: add transform tests for different scenarios and docs
1 parent ca93bd0 commit c98fcfc

File tree

3 files changed

+175
-85
lines changed

3 files changed

+175
-85
lines changed

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ transforms.MakeTombstone.type=io.aiven.kafka.connect.transforms.MakeTombstone
143143

144144
### `FilterByFieldValue`
145145

146-
This transformation allows filtering records based on a specific field value or a given regex pattern.
146+
This transformation allows filtering records based either on a specific field or whole value and a matching expected value or regex pattern.
147147

148148
Here is an example of this transformation configuration:
149149

@@ -156,8 +156,12 @@ transforms.Filter.field.value.pattern=<regex_pattern>
156156
transforms.Filter.field.value.matches=<true|false>
157157
```
158158

159+
If `field.name` is empty, the whole value is considered for filtering.
160+
159161
Either `field.value` or `field.value.pattern` must be defined to apply filter.
160162

163+
Only, `string`, `numeric` and `boolean` types are considered for matching purposes, other types are ignored.
164+
161165
## License
162166

163167
This project is licensed under the [Apache License, Version 2.0](LICENSE).

src/main/java/io/aiven/kafka/connect/transforms/FilterByFieldValue.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,12 @@ public R apply(final R record) {
6060
protected abstract Object operatingValue(R record);
6161

6262
private R applyWithSchema(final R record) {
63-
final Struct struct = (Struct) record.value();
64-
final Optional<String> fieldValue = extractStructFieldValue(struct, fieldName);
63+
final Struct struct = (Struct) operatingValue(record);
64+
final Optional<String> fieldValue = getStructFieldValue(struct, fieldName);
6565
return filterCondition.test(fieldValue) ? record : null;
6666
}
6767

68-
private Optional<String> extractStructFieldValue(final Struct struct, final String fieldName) {
68+
private Optional<String> getStructFieldValue(final Struct struct, final String fieldName) {
6969
final Schema schema = struct.schema();
7070
final Field field = schema.field(fieldName);
7171
final Object fieldValue = struct.get(field);
@@ -81,17 +81,17 @@ private Optional<String> extractStructFieldValue(final Struct struct, final Stri
8181

8282
@SuppressWarnings("unchecked")
8383
private R applySchemaless(final R record) {
84-
if (fieldName.isEmpty()) {
85-
final Optional<String> value = extractSchemalessFieldValue(operatingValue(record));
84+
if (fieldName == null || fieldName.isEmpty()) {
85+
final Optional<String> value = getSchemalessFieldValue(operatingValue(record));
8686
return filterCondition.test(value) ? record : null;
8787
} else {
88-
final Map<String, Object> map = (Map<String, Object>) record.value();
89-
final Optional<String> fieldValue = extractSchemalessFieldValue(map.get(fieldName));
88+
final Map<String, Object> map = (Map<String, Object>) operatingValue(record);
89+
final Optional<String> fieldValue = getSchemalessFieldValue(map.get(fieldName));
9090
return filterCondition.test(fieldValue) ? record : null;
9191
}
9292
}
9393

94-
private Optional<String> extractSchemalessFieldValue(final Object fieldValue) {
94+
private Optional<String> getSchemalessFieldValue(final Object fieldValue) {
9595
if (fieldValue == null) return Optional.empty();
9696
Optional<String> text = Optional.empty();
9797
if (isSupportedType(fieldValue)) {
@@ -116,18 +116,24 @@ public ConfigDef config() {
116116
return new ConfigDef()
117117
.define("field.name",
118118
ConfigDef.Type.STRING,
119+
null,
119120
ConfigDef.Importance.HIGH,
120-
"The field name to filter by")
121+
"The field name to filter by." +
122+
"Schema-based records (Avro), schemaless (e.g. JSON), and raw values are supported." +
123+
"If empty, the whole key/value record will be filtered.")
121124
.define("field.value",
122-
ConfigDef.Type.STRING, null,
125+
ConfigDef.Type.STRING,
126+
null,
123127
ConfigDef.Importance.HIGH,
124128
"Expected value to match. Either define this, or a regex pattern")
125129
.define("field.value.pattern",
126-
ConfigDef.Type.STRING, null,
130+
ConfigDef.Type.STRING,
131+
null,
127132
ConfigDef.Importance.HIGH,
128133
"The pattern to match. Either define this, or an expected value")
129134
.define("field.value.matches",
130-
ConfigDef.Type.BOOLEAN, true,
135+
ConfigDef.Type.BOOLEAN,
136+
true,
131137
ConfigDef.Importance.MEDIUM,
132138
"The filter mode, 'true' for matching or 'false' for non-matching");
133139
}

src/test/java/io/aiven/kafka/connect/transforms/FilterByFieldValueTest.java

Lines changed: 152 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import java.util.HashMap;
2020
import java.util.Map;
21+
import java.util.function.Consumer;
22+
import java.util.function.Supplier;
2123

2224
import org.apache.kafka.connect.data.Schema;
2325
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -26,120 +28,198 @@
2628

2729
import org.junit.jupiter.api.Test;
2830

29-
import static org.junit.jupiter.api.Assertions.assertEquals;
30-
import static org.junit.jupiter.api.Assertions.assertNull;
31+
import static org.junit.jupiter.api.Assertions.*;
3132

3233
class FilterByFieldValueTest {
3334

34-
private static final Schema VALUE_SCHEMA = SchemaBuilder.struct()
35-
.field("before", Schema.OPTIONAL_STRING_SCHEMA)
36-
.field("after", SchemaBuilder.struct()
37-
.field("pk", Schema.STRING_SCHEMA)
38-
.field("value", Schema.STRING_SCHEMA)
39-
.build())
40-
.field("source", SchemaBuilder.struct().optional())
41-
.field("op", Schema.STRING_SCHEMA)
42-
.field("ts_ms", Schema.STRING_SCHEMA)
43-
.field("transaction", Schema.OPTIONAL_STRING_SCHEMA)
44-
.build();
4535

4636
@Test
47-
void shouldFilterOutRecordsEqualsToReadEvents() {
37+
void shouldFilterOutValueRecordsEqualsToReadEvents() {
4838
final FilterByFieldValue<SourceRecord> filter = new FilterByFieldValue.Value<>();
4939
filter.configure(Map.of(
5040
"field.name", "op",
5141
"field.value", "r",
5242
"field.value.matches", "false"
5343
));
5444

55-
final Struct after = new Struct(VALUE_SCHEMA.field("after").schema())
56-
.put("pk", "1")
57-
.put("value", "New data");
58-
59-
final Struct value = new Struct(VALUE_SCHEMA)
60-
.put("before", null)
61-
.put("after", after)
62-
.put("source", null)
63-
.put("op", "r")
64-
.put("ts_ms", "1620393591654")
65-
.put("transaction", null);
66-
67-
final var record = new SourceRecord(null, null, "some_topic", VALUE_SCHEMA, value);
68-
69-
final var actual = filter.apply(record);
70-
assertNull(actual, "Record with op 'r' should be filtered out");
45+
assertNull(filter.apply(
46+
prepareStructRecord(
47+
struct -> {
48+
},
49+
struct -> struct.put("op", "r")
50+
)),
51+
"Record with op 'r' should be filtered out");
52+
SourceRecord record = prepareStructRecord(
53+
struct -> {
54+
},
55+
struct -> struct.put("op", "u")
56+
);
57+
assertEquals(record, filter.apply(record),
58+
"Record with op not 'r' should be not filtered out");
7159
}
7260

7361
@Test
74-
void shouldKeepRecordsNotEqualsToReadEvents() {
75-
final FilterByFieldValue<SourceRecord> filter = new FilterByFieldValue.Value<>();
62+
void shouldFilterOutKeyRecordsEqualsToId() {
63+
final FilterByFieldValue<SourceRecord> filter = new FilterByFieldValue.Key<>();
7664
filter.configure(Map.of(
77-
"field.name", "op",
78-
"field.value", "r",
65+
"field.name", "id",
66+
"field.value", "123",
7967
"field.value.matches", "false"
8068
));
8169

82-
final Struct after = new Struct(VALUE_SCHEMA.field("after").schema())
83-
.put("pk", "1")
84-
.put("value", "New data");
85-
86-
final Struct value = new Struct(VALUE_SCHEMA)
87-
.put("before", null)
88-
.put("after", after)
89-
.put("source", null)
90-
.put("op", "u")
91-
.put("ts_ms", "1620393591654")
92-
.put("transaction", null);
93-
94-
final var record = new SourceRecord(null, null, "some_topic", VALUE_SCHEMA, value);
95-
96-
final var actual = filter.apply(record);
97-
assertEquals(record, actual, "Record with op not equal to 'r' should be kept");
70+
assertNull(filter.apply(prepareStructRecord(
71+
struct -> struct.put("id", "123"),
72+
struct -> {
73+
})), "Record with id '123' should be filtered out");
74+
SourceRecord record = prepareStructRecord(
75+
struct -> struct.put("id", "111"),
76+
struct -> {
77+
});
78+
assertEquals(record, filter.apply(record), "Record with id not '123' should not be filtered out");
9879
}
9980

10081
@Test
101-
void shouldFilterOutRecordsNotEqualsReadEvents() {
82+
void shouldFilterOutValueRecordsNotEqualsReadEvents() {
10283
final FilterByFieldValue<SourceRecord> filter = new FilterByFieldValue.Value<>();
10384
filter.configure(Map.of(
10485
"field.name", "op",
10586
"field.value", "r",
10687
"field.value.matches", "true"
10788
));
10889

109-
final Struct after = new Struct(VALUE_SCHEMA.field("after").schema())
110-
.put("pk", "1")
111-
.put("value", "New data");
112-
113-
final Struct value = new Struct(VALUE_SCHEMA)
114-
.put("before", null)
115-
.put("after", after)
116-
.put("source", null)
117-
.put("op", "u")
118-
.put("ts_ms", "1620393591654")
119-
.put("transaction", null);
90+
assertNull(filter.apply(prepareStructRecord(
91+
struct -> {
92+
},
93+
struct -> struct.put("op", "u")
94+
)),
95+
"Record with op not equal to 'r' should be filtered out");
96+
SourceRecord record = prepareStructRecord(
97+
struct -> {
98+
},
99+
struct -> struct.put("op", "r")
100+
);
101+
assertEquals(record, filter.apply(record), "Record with op equal to 'r' should not be filtered out");
102+
}
120103

121-
final var record = new SourceRecord(null, null, "some_topic", VALUE_SCHEMA, value);
104+
@Test
105+
void shouldFilterOutKeyRecordsNotEqualsToId() {
106+
final FilterByFieldValue<SourceRecord> filter = new FilterByFieldValue.Key<>();
107+
filter.configure(Map.of(
108+
"field.name", "id",
109+
"field.value", "123",
110+
"field.value.matches", "true"
111+
));
122112

123-
final var actual = filter.apply(record);
124-
assertNull(actual, "Record with op not equal to 'r' should be filtered out");
113+
assertNull(filter.apply(prepareStructRecord(
114+
struct -> struct.put("id", "111"),
115+
struct -> {
116+
}
117+
)), "Record with id not equal to '123' should be filtered out");
118+
SourceRecord record = prepareStructRecord(
119+
struct -> struct.put("id", "123"),
120+
struct -> {
121+
}
122+
);
123+
assertEquals(record, filter.apply(record), "Record with id equal to '123' should not be filtered out");
125124
}
126125

127126
@Test
128-
void shouldFilterOutMapFieldValue() {
127+
void shouldFilterOutMapValueRecordsWithRegex() {
129128
final FilterByFieldValue<SourceRecord> filterByFieldValue = new FilterByFieldValue.Value<>();
130129
final Map<String, String> configs = new HashMap<>();
131130
configs.put("field.name", "language");
132131
configs.put("field.value.pattern", ".*Java.*");
133132
configs.put("field.value.matches", "false");
134133
filterByFieldValue.configure(configs);
135134

136-
final Map<String, Object> value = new HashMap<>();
137-
value.put("name", "John Doe");
138-
value.put("language", "Java");
135+
assertNull(filterByFieldValue.apply(prepareRecord(() -> "42", () -> Map.of("language", "Javascript"))), "The record should be filtered out");
136+
SourceRecord record = prepareRecord(() -> "42", () -> Map.of("language", "Rust"));
137+
assertEquals(record, filterByFieldValue.apply(record), "The record should not be filtered out");
138+
}
139139

140-
final var record = new SourceRecord(null, null, "some_topic", null, value);
140+
@Test
141+
void shouldFilterOutMapKeyRecordsWithRegex() {
142+
final FilterByFieldValue<SourceRecord> filterByFieldValue = new FilterByFieldValue.Key<>();
143+
final Map<String, String> configs = new HashMap<>();
144+
configs.put("field.name", "language");
145+
configs.put("field.value.pattern", ".*Java.*");
146+
configs.put("field.value.matches", "false");
147+
filterByFieldValue.configure(configs);
148+
149+
assertNull(filterByFieldValue.apply(prepareRecord(() -> Map.of("language", "Javascript"), () -> "42")), "The record should be filtered out");
150+
SourceRecord record = prepareRecord(() -> Map.of("language", "Rust"), () -> "42");
151+
assertEquals(record, filterByFieldValue.apply(record), "The record should not be filtered out");
152+
}
153+
154+
@Test
155+
void shouldFilterOutRawKeyRecords() {
156+
final FilterByFieldValue<SourceRecord> filterByFieldValue = new FilterByFieldValue.Key<>();
157+
final Map<String, String> configs = new HashMap<>();
158+
configs.put("field.value", "42");
159+
configs.put("field.value.matches", "false");
160+
filterByFieldValue.configure(configs);
161+
162+
assertNull(filterByFieldValue.apply(prepareRecord(() -> "42", () -> Map.of("language", "Javascript"))), "The record should be filtered out");
163+
SourceRecord record = prepareRecord(() -> "43", () -> Map.of("language", "Rust"));
164+
assertEquals(record, filterByFieldValue.apply(record), "The record should be filtered out");
165+
}
166+
167+
@Test
168+
void shouldFilterOutRawValueRecords() {
169+
final FilterByFieldValue<SourceRecord> filterByFieldValue = new FilterByFieldValue.Value<>();
170+
final Map<String, String> configs = new HashMap<>();
171+
configs.put("field.value", "42");
172+
configs.put("field.value.matches", "false");
173+
filterByFieldValue.configure(configs);
174+
175+
assertNull(filterByFieldValue.apply(prepareRecord(() -> Map.of("language", "Javascript"), () -> "42")), "The record should be filtered out");
176+
SourceRecord record = prepareRecord(() -> Map.of("language", "Rust"), () -> "43");
177+
assertEquals(record, filterByFieldValue.apply(record), "The record should be filtered out");
178+
}
179+
180+
private SourceRecord prepareRecord(Supplier<Object> keySupplier, Supplier<Object> valueSupplier) {
181+
return new SourceRecord(null, null, "some_topic",
182+
null, keySupplier.get(),
183+
null, valueSupplier.get());
184+
}
185+
186+
private SourceRecord prepareStructRecord(Consumer<Struct> keyChanges, Consumer<Struct> valueChanges) {
187+
final Schema KEY_VALUE = SchemaBuilder.struct()
188+
.field("id", Schema.STRING_SCHEMA)
189+
.build();
190+
final Schema VALUE_SCHEMA = SchemaBuilder.struct()
191+
.field("before", Schema.OPTIONAL_STRING_SCHEMA)
192+
.field("after", SchemaBuilder.struct()
193+
.field("pk", Schema.STRING_SCHEMA)
194+
.field("value", Schema.STRING_SCHEMA)
195+
.build())
196+
.field("source", SchemaBuilder.struct().optional())
197+
.field("op", Schema.STRING_SCHEMA)
198+
.field("ts_ms", Schema.STRING_SCHEMA)
199+
.field("transaction", Schema.OPTIONAL_STRING_SCHEMA)
200+
.build();
201+
202+
final Struct key = new Struct(KEY_VALUE)
203+
.put("id", "123");
204+
keyChanges.accept(key);
205+
206+
final Struct after = new Struct(VALUE_SCHEMA.field("after").schema())
207+
.put("pk", "1")
208+
.put("value", "New data");
209+
210+
final Struct value = new Struct(VALUE_SCHEMA)
211+
.put("before", null)
212+
.put("after", after)
213+
.put("source", null)
214+
.put("op", "u")
215+
.put("ts_ms", "1620393591654")
216+
.put("transaction", null);
217+
valueChanges.accept(value);
141218

142-
final var actual = filterByFieldValue.apply(record);
143-
assertNull(actual, "The record should be filtered out");
219+
return new SourceRecord(
220+
null, null, "some_topic",
221+
KEY_VALUE, key,
222+
VALUE_SCHEMA, value
223+
);
144224
}
145225
}

0 commit comments

Comments
 (0)