16
16
17
17
package io .aiven .kafka .connect .transforms ;
18
18
19
- import java .util .Arrays ;
20
- import java .util .HashSet ;
21
19
import java .util .Map ;
22
20
import java .util .Optional ;
23
- import java .util .Set ;
24
21
import java .util .function .Predicate ;
25
22
26
23
import org .apache .kafka .common .config .AbstractConfig ;
30
27
import org .apache .kafka .connect .data .Field ;
31
28
import org .apache .kafka .connect .data .Schema ;
32
29
import org .apache .kafka .connect .data .Struct ;
30
+ import org .apache .kafka .connect .data .Values ;
33
31
import org .apache .kafka .connect .transforms .Transformation ;
34
32
35
- import static org .apache .kafka .connect .data .Schema .Type .STRING ;
36
-
37
33
public abstract class FilterByFieldValue <R extends ConnectRecord <R >> implements Transformation <R > {
38
34
39
35
private String fieldName ;
40
36
private Optional <String > fieldExpectedValue ;
41
37
private Optional <String > fieldValuePattern ;
42
- private Predicate <Optional < String > > filterCondition ;
38
+ private Predicate <String > filterCondition ;
43
39
44
40
@ Override
45
41
public R apply (final R record ) {
@@ -61,32 +57,25 @@ public R apply(final R record) {
61
57
62
58
private R applyWithSchema (final R record ) {
63
59
final Struct struct = (Struct ) operatingValue (record );
64
- final Optional < String > fieldValue = getStructFieldValue (struct , fieldName );
60
+ final String fieldValue = getStructFieldValue (struct , fieldName ). orElse ( null );
65
61
return filterCondition .test (fieldValue ) ? record : null ;
66
62
}
67
63
68
64
private Optional <String > getStructFieldValue (final Struct struct , final String fieldName ) {
69
65
final Schema schema = struct .schema ();
70
66
final Field field = schema .field (fieldName );
71
67
final Object fieldValue = struct .get (field );
72
-
73
- Optional <String > text = Optional .empty ();
74
- if (STRING .equals (field .schema ().type ())) {
75
- text = Optional .of ((String ) fieldValue );
76
- } else if (schema .type ().isPrimitive ()) {
77
- text = Optional .of (fieldValue .toString ());
78
- }
79
- return text ;
68
+ return Optional .ofNullable (Values .convertToString (field .schema (), fieldValue ));
80
69
}
81
70
82
71
@ SuppressWarnings ("unchecked" )
83
72
private R applySchemaless (final R record ) {
84
73
if (fieldName == null || fieldName .isEmpty ()) {
85
- final Optional < String > value = getSchemalessFieldValue (operatingValue (record ));
74
+ final String value = getSchemalessFieldValue (operatingValue (record )). orElse ( null );
86
75
return filterCondition .test (value ) ? record : null ;
87
76
} else {
88
77
final Map <String , Object > map = (Map <String , Object >) operatingValue (record );
89
- final Optional < String > fieldValue = getSchemalessFieldValue (map .get (fieldName ));
78
+ final String fieldValue = getSchemalessFieldValue (map .get (fieldName )). orElse ( null );
90
79
return filterCondition .test (fieldValue ) ? record : null ;
91
80
}
92
81
}
@@ -95,49 +84,34 @@ private Optional<String> getSchemalessFieldValue(final Object fieldValue) {
95
84
if (fieldValue == null ) {
96
85
return Optional .empty ();
97
86
}
98
- Optional <String > text = Optional .empty ();
99
- if (isSupportedType (fieldValue )) {
100
- text = Optional .of (fieldValue .toString ());
101
- }
102
- return text ;
103
- }
104
-
105
- private boolean isSupportedType (final Object fieldValue ) {
106
- final Set <Class <?>> supportedTypes = new HashSet <>(
107
- Arrays .asList (
108
- String .class , Long .class , Integer .class , Short .class ,
109
- Double .class , Float .class , Boolean .class
110
- )
111
- );
112
-
113
- return supportedTypes .contains (fieldValue .getClass ());
87
+ return Optional .ofNullable (Values .convertToString (null , fieldValue ));
114
88
}
115
89
116
90
@ Override
117
91
public ConfigDef config () {
118
92
return new ConfigDef ()
119
- .define ("field.name" ,
120
- ConfigDef .Type .STRING ,
121
- null ,
122
- ConfigDef .Importance .HIGH ,
123
- "The field name to filter by."
124
- + "Schema-based records (Avro), schemaless (e.g. JSON), and raw values are supported."
125
- + "If empty, the whole key/value record will be filtered." )
126
- .define ("field.value" ,
127
- ConfigDef .Type .STRING ,
128
- null ,
129
- ConfigDef .Importance .HIGH ,
130
- "Expected value to match. Either define this, or a regex pattern" )
131
- .define ("field.value.pattern" ,
132
- ConfigDef .Type .STRING ,
133
- null ,
134
- ConfigDef .Importance .HIGH ,
135
- "The pattern to match. Either define this, or an expected value" )
136
- .define ("field.value.matches" ,
137
- ConfigDef .Type .BOOLEAN ,
138
- true ,
139
- ConfigDef .Importance .MEDIUM ,
140
- "The filter mode, 'true' for matching or 'false' for non-matching" );
93
+ .define ("field.name" ,
94
+ ConfigDef .Type .STRING ,
95
+ null ,
96
+ ConfigDef .Importance .HIGH ,
97
+ "The field name to filter by."
98
+ + "Schema-based records (Avro), schemaless (e.g. JSON), and raw values are supported."
99
+ + "If empty, the whole key/value record will be filtered." )
100
+ .define ("field.value" ,
101
+ ConfigDef .Type .STRING ,
102
+ null ,
103
+ ConfigDef .Importance .HIGH ,
104
+ "Expected value to match. Either define this, or a regex pattern" )
105
+ .define ("field.value.pattern" ,
106
+ ConfigDef .Type .STRING ,
107
+ null ,
108
+ ConfigDef .Importance .HIGH ,
109
+ "The pattern to match. Either define this, or an expected value" )
110
+ .define ("field.value.matches" ,
111
+ ConfigDef .Type .BOOLEAN ,
112
+ true ,
113
+ ConfigDef .Importance .MEDIUM ,
114
+ "The filter mode, 'true' for matching or 'false' for non-matching" );
141
115
}
142
116
143
117
@ Override
@@ -152,16 +126,20 @@ public void configure(final Map<String, ?> configs) {
152
126
this .fieldValuePattern = Optional .ofNullable (config .getString ("field.value.pattern" ));
153
127
final boolean expectedValuePresent = fieldExpectedValue .map (s -> !s .isEmpty ()).orElse (false );
154
128
final boolean regexPatternPresent = fieldValuePattern .map (s -> !s .isEmpty ()).orElse (false );
155
- if ((expectedValuePresent && regexPatternPresent )
156
- || (!expectedValuePresent && !regexPatternPresent )) {
129
+ if (expectedValuePresent == regexPatternPresent ) {
157
130
throw new ConfigException (
158
- "Either field.value or field.value.pattern have to be set to apply filter transform" );
131
+ "Either field.value or field.value.pattern have to be set to apply filter transform" );
159
132
}
160
- final Predicate <Optional <String >> matchCondition = fieldValue -> fieldValue
161
- .filter (value -> expectedValuePresent
162
- ? fieldExpectedValue .get ().equals (value )
163
- : value .matches (fieldValuePattern .get ()))
164
- .isPresent ();
133
+ final Predicate <String > matchCondition ;
134
+
135
+ if (expectedValuePresent ) {
136
+ final String expectedValue = fieldExpectedValue .get ();
137
+ matchCondition = fieldValue -> fieldValue != null && fieldValue .equals (expectedValue );
138
+ } else {
139
+ final String pattern = fieldValuePattern .get ();
140
+ matchCondition = fieldValue -> fieldValue != null && fieldValue .matches (pattern );
141
+ }
142
+
165
143
this .filterCondition = config .getBoolean ("field.value.matches" )
166
144
? matchCondition
167
145
: (result -> !matchCondition .test (result ));
0 commit comments