@@ -92,7 +92,9 @@ private R applySchemaless(final R record) {
92
92
}
93
93
94
94
private Optional <String > getSchemalessFieldValue (final Object fieldValue ) {
95
- if (fieldValue == null ) return Optional .empty ();
95
+ if (fieldValue == null ) {
96
+ return Optional .empty ();
97
+ }
96
98
Optional <String > text = Optional .empty ();
97
99
if (isSupportedType (fieldValue )) {
98
100
text = Optional .of (fieldValue .toString ());
@@ -102,10 +104,10 @@ private Optional<String> getSchemalessFieldValue(final Object fieldValue) {
102
104
103
105
private boolean isSupportedType (final Object fieldValue ) {
104
106
final Set <Class <?>> supportedTypes = new HashSet <>(
105
- Arrays .asList (
106
- String .class , Long .class , Integer .class , Short .class ,
107
- Double .class , Float .class , Boolean .class
108
- )
107
+ Arrays .asList (
108
+ String .class , Long .class , Integer .class , Short .class ,
109
+ Double .class , Float .class , Boolean .class
110
+ )
109
111
);
110
112
111
113
return supportedTypes .contains (fieldValue .getClass ());
@@ -114,28 +116,28 @@ private boolean isSupportedType(final Object fieldValue) {
114
116
@ Override
115
117
public ConfigDef config () {
116
118
return new ConfigDef ()
117
- .define ("field.name" ,
118
- ConfigDef .Type .STRING ,
119
- null ,
120
- ConfigDef .Importance .HIGH ,
121
- "The field name to filter by." +
122
- "Schema-based records (Avro), schemaless (e.g. JSON), and raw values are supported." +
123
- "If empty, the whole key/value record will be filtered." )
124
- .define ("field.value" ,
125
- ConfigDef .Type .STRING ,
126
- null ,
127
- ConfigDef .Importance .HIGH ,
128
- "Expected value to match. Either define this, or a regex pattern" )
129
- .define ("field.value.pattern" ,
130
- ConfigDef .Type .STRING ,
131
- null ,
132
- ConfigDef .Importance .HIGH ,
133
- "The pattern to match. Either define this, or an expected value" )
134
- .define ("field.value.matches" ,
135
- ConfigDef .Type .BOOLEAN ,
136
- true ,
137
- ConfigDef .Importance .MEDIUM ,
138
- "The filter mode, 'true' for matching or 'false' for non-matching" );
119
+ .define ("field.name" ,
120
+ ConfigDef .Type .STRING ,
121
+ null ,
122
+ ConfigDef .Importance .HIGH ,
123
+ "The field name to filter by."
124
+ + "Schema-based records (Avro), schemaless (e.g. JSON), and raw values are supported."
125
+ + "If empty, the whole key/value record will be filtered." )
126
+ .define ("field.value" ,
127
+ ConfigDef .Type .STRING ,
128
+ null ,
129
+ ConfigDef .Importance .HIGH ,
130
+ "Expected value to match. Either define this, or a regex pattern" )
131
+ .define ("field.value.pattern" ,
132
+ ConfigDef .Type .STRING ,
133
+ null ,
134
+ ConfigDef .Importance .HIGH ,
135
+ "The pattern to match. Either define this, or an expected value" )
136
+ .define ("field.value.matches" ,
137
+ ConfigDef .Type .BOOLEAN ,
138
+ true ,
139
+ ConfigDef .Importance .MEDIUM ,
140
+ "The filter mode, 'true' for matching or 'false' for non-matching" );
139
141
}
140
142
141
143
@ Override
@@ -151,43 +153,43 @@ public void configure(final Map<String, ?> configs) {
151
153
final boolean expectedValuePresent = fieldExpectedValue .map (s -> !s .isEmpty ()).orElse (false );
152
154
final boolean regexPatternPresent = fieldValuePattern .map (s -> !s .isEmpty ()).orElse (false );
153
155
if ((expectedValuePresent && regexPatternPresent )
154
- || (!expectedValuePresent && !regexPatternPresent )) {
156
+ || (!expectedValuePresent && !regexPatternPresent )) {
155
157
throw new ConfigException (
156
- "Either field.value or field.value.pattern have to be set to apply filter transform" );
158
+ "Either field.value or field.value.pattern have to be set to apply filter transform" );
157
159
}
158
160
final Predicate <Optional <String >> matchCondition = fieldValue -> fieldValue
159
- .filter (value -> expectedValuePresent
160
- ? fieldExpectedValue .get ().equals (value )
161
- : value .matches (fieldValuePattern .get ()))
162
- .isPresent ();
161
+ .filter (value -> expectedValuePresent
162
+ ? fieldExpectedValue .get ().equals (value )
163
+ : value .matches (fieldValuePattern .get ()))
164
+ .isPresent ();
163
165
this .filterCondition = config .getBoolean ("field.value.matches" )
164
- ? matchCondition
165
- : (result -> !matchCondition .test (result ));
166
+ ? matchCondition
167
+ : (result -> !matchCondition .test (result ));
166
168
}
167
169
168
170
169
171
public static final class Key <R extends ConnectRecord <R >> extends FilterByFieldValue <R > {
170
172
171
173
@ Override
172
- protected Schema operatingSchema (R record ) {
174
+ protected Schema operatingSchema (final R record ) {
173
175
return record .keySchema ();
174
176
}
175
177
176
178
@ Override
177
- protected Object operatingValue (R record ) {
179
+ protected Object operatingValue (final R record ) {
178
180
return record .key ();
179
181
}
180
182
}
181
183
182
184
public static final class Value <R extends ConnectRecord <R >> extends FilterByFieldValue <R > {
183
185
184
186
@ Override
185
- protected Schema operatingSchema (R record ) {
187
+ protected Schema operatingSchema (final R record ) {
186
188
return record .valueSchema ();
187
189
}
188
190
189
191
@ Override
190
- protected Object operatingValue (R record ) {
192
+ protected Object operatingValue (final R record ) {
191
193
return record .value ();
192
194
}
193
195
}
0 commit comments