16
16
17
17
package io .aiven .kafka .connect .transforms ;
18
18
19
+ import java .util .Arrays ;
20
+ import java .util .HashSet ;
19
21
import java .util .Map ;
20
22
import java .util .Optional ;
23
+ import java .util .Set ;
21
24
import java .util .function .Predicate ;
22
25
23
26
import org .apache .kafka .common .config .AbstractConfig ;
@@ -49,12 +52,12 @@ public R apply(final R record) {
49
52
}
50
53
51
54
private R handleStruct (final R record ) {
52
- Struct struct = (Struct ) record .value ();
55
+ final Struct struct = (Struct ) record .value ();
53
56
final Optional <String > fieldValue = extractStructFieldValue (struct , fieldName );
54
57
return filterCondition .test (fieldValue ) ? record : null ;
55
58
}
56
59
57
- private Optional <String > extractStructFieldValue (Struct struct , String fieldName ) {
60
+ private Optional <String > extractStructFieldValue (final Struct struct , final String fieldName ) {
58
61
final Schema schema = struct .schema ();
59
62
final Field field = schema .field (fieldName );
60
63
final Object fieldValue = struct .get (field );
@@ -75,39 +78,50 @@ private R handleMap(final R record) {
75
78
return filterCondition .test (fieldValue ) ? record : null ;
76
79
}
77
80
78
- private Optional <String > extractMapFieldValue (Map <String , Object > map , String fieldName ) {
79
- if (!map .containsKey (fieldName )) return Optional .empty ();
81
+ private Optional <String > extractMapFieldValue (final Map <String , Object > map , final String fieldName ) {
82
+ if (!map .containsKey (fieldName )) {
83
+ return Optional .empty ();
84
+ }
80
85
81
86
final Object fieldValue = map .get (fieldName );
82
87
83
88
Optional <String > text = Optional .empty ();
84
- if (fieldValue instanceof String
85
- || fieldValue instanceof Long
86
- || fieldValue instanceof Integer
87
- || fieldValue instanceof Short
88
- || fieldValue instanceof Double
89
- || fieldValue instanceof Float
90
- || fieldValue instanceof Boolean ) {
89
+ if (isSupportedType (fieldValue )) {
91
90
text = Optional .of (fieldValue .toString ());
92
91
}
93
92
return text ;
94
93
}
95
94
95
+ private boolean isSupportedType (final Object fieldValue ) {
96
+ final Set <Class <?>> supportedTypes = new HashSet <>(
97
+ Arrays .asList (
98
+ String .class , Long .class , Integer .class , Short .class ,
99
+ Double .class , Float .class , Boolean .class
100
+ )
101
+ );
102
+
103
+ return supportedTypes .contains (fieldValue .getClass ());
104
+ }
105
+
96
106
@ Override
97
107
public ConfigDef config () {
98
108
return new ConfigDef ()
99
109
.define ("field.name" ,
100
110
ConfigDef .Type .STRING ,
101
- ConfigDef .Importance .HIGH , "The field name to filter by" )
111
+ ConfigDef .Importance .HIGH ,
112
+ "The field name to filter by" )
102
113
.define ("field.value" ,
103
114
ConfigDef .Type .STRING , null ,
104
- ConfigDef .Importance .HIGH , "Expected value to match. Either define this, or a regex pattern" )
115
+ ConfigDef .Importance .HIGH ,
116
+ "Expected value to match. Either define this, or a regex pattern" )
105
117
.define ("field.value.pattern" ,
106
118
ConfigDef .Type .STRING , null ,
107
- ConfigDef .Importance .HIGH , "The pattern to match. Either define this, or an expected value" )
119
+ ConfigDef .Importance .HIGH ,
120
+ "The pattern to match. Either define this, or an expected value" )
108
121
.define ("field.value.matches" ,
109
122
ConfigDef .Type .BOOLEAN , true ,
110
- ConfigDef .Importance .MEDIUM , "The filter mode, 'true' for matching or 'false' for non-matching" );
123
+ ConfigDef .Importance .MEDIUM ,
124
+ "The filter mode, 'true' for matching or 'false' for non-matching" );
111
125
}
112
126
113
127
@ Override
@@ -120,19 +134,20 @@ public void configure(final Map<String, ?> configs) {
120
134
this .fieldName = config .getString ("field.name" );
121
135
this .fieldExpectedValue = Optional .ofNullable (config .getString ("field.value" ));
122
136
this .fieldValuePattern = Optional .ofNullable (config .getString ("field.value.pattern" ));
123
- boolean expectedValuePresent = fieldExpectedValue .map (s -> !s .isEmpty ()).orElse (false );
124
- boolean regexPatternPresent = fieldValuePattern .map (s -> !s .isEmpty ()).orElse (false );
137
+ final boolean expectedValuePresent = fieldExpectedValue .map (s -> !s .isEmpty ()).orElse (false );
138
+ final boolean regexPatternPresent = fieldValuePattern .map (s -> !s .isEmpty ()).orElse (false );
125
139
if ((expectedValuePresent && regexPatternPresent )
126
140
|| (!expectedValuePresent && !regexPatternPresent )) {
127
- throw new ConfigException ("Either field.value or field.value.pattern have to be set to apply filter transform" );
141
+ throw new ConfigException (
142
+ "Either field.value or field.value.pattern have to be set to apply filter transform" );
128
143
}
129
- Predicate <Optional <String >> matchCondition = fieldValue -> fieldValue
130
- .filter (value -> expectedValuePresent ?
131
- fieldExpectedValue .get ().equals (value ) :
132
- value .matches (fieldValuePattern .get ()))
144
+ final Predicate <Optional <String >> matchCondition = fieldValue -> fieldValue
145
+ .filter (value -> expectedValuePresent
146
+ ? fieldExpectedValue .get ().equals (value )
147
+ : value .matches (fieldValuePattern .get ()))
133
148
.isPresent ();
134
149
this .filterCondition = config .getBoolean ("field.value.matches" )
135
- ? ( matchCondition )
150
+ ? matchCondition
136
151
: (result -> !matchCondition .test (result ));
137
152
}
138
153
}
0 commit comments