3434
3535import static org .apache .kafka .connect .data .Schema .Type .STRING ;
3636
37- public class FilterByFieldValue <R extends ConnectRecord <R >> implements Transformation <R > {
37+ public abstract class FilterByFieldValue <R extends ConnectRecord <R >> implements Transformation <R > {
3838
3939 private String fieldName ;
4040 private Optional <String > fieldExpectedValue ;
@@ -43,21 +43,29 @@ public class FilterByFieldValue<R extends ConnectRecord<R>> implements Transform
4343
4444 @ Override
4545 public R apply (final R record ) {
46- if (record .value () instanceof Struct ) {
47- return handleStruct (record );
48- } else if (record .value () instanceof Map ) {
49- return handleMap (record );
46+ if (operatingValue (record ) == null ) {
47+ return record ;
48+ }
49+
50+ if (operatingSchema (record ) == null ) {
51+ return applySchemaless (record );
52+ } else {
53+ return applyWithSchema (record );
5054 }
51- return record ; // if record is other than map or struct, pass-by
5255 }
5356
54- private R handleStruct (final R record ) {
55- final Struct struct = (Struct ) record .value ();
56- final Optional <String > fieldValue = extractStructFieldValue (struct , fieldName );
57+
58+ protected abstract Schema operatingSchema (R record );
59+
60+ protected abstract Object operatingValue (R record );
61+
62+ private R applyWithSchema (final R record ) {
63+ final Struct struct = (Struct ) operatingValue (record );
64+ final Optional <String > fieldValue = getStructFieldValue (struct , fieldName );
5765 return filterCondition .test (fieldValue ) ? record : null ;
5866 }
5967
60- private Optional <String > extractStructFieldValue (final Struct struct , final String fieldName ) {
68+ private Optional <String > getStructFieldValue (final Struct struct , final String fieldName ) {
6169 final Schema schema = struct .schema ();
6270 final Field field = schema .field (fieldName );
6371 final Object fieldValue = struct .get (field );
@@ -72,19 +80,19 @@ private Optional<String> extractStructFieldValue(final Struct struct, final Stri
7280 }
7381
7482 @ SuppressWarnings ("unchecked" )
75- private R handleMap (final R record ) {
76- final Map <String , Object > map = (Map <String , Object >) record .value ();
77- final Optional <String > fieldValue = extractMapFieldValue (map , fieldName );
78- return filterCondition .test (fieldValue ) ? record : null ;
79- }
80-
81- private Optional <String > extractMapFieldValue (final Map <String , Object > map , final String fieldName ) {
82- if (!map .containsKey (fieldName )) {
83- return Optional .empty ();
83+ private R applySchemaless (final R record ) {
84+ if (fieldName == null || fieldName .isEmpty ()) {
85+ final Optional <String > value = getSchemalessFieldValue (operatingValue (record ));
86+ return filterCondition .test (value ) ? record : null ;
87+ } else {
88+ final Map <String , Object > map = (Map <String , Object >) operatingValue (record );
89+ final Optional <String > fieldValue = getSchemalessFieldValue (map .get (fieldName ));
90+ return filterCondition .test (fieldValue ) ? record : null ;
8491 }
92+ }
8593
86- final Object fieldValue = map . get ( fieldName );
87-
94+ private Optional < String > getSchemalessFieldValue ( final Object fieldValue ) {
95+ if ( fieldValue == null ) return Optional . empty ();
8896 Optional <String > text = Optional .empty ();
8997 if (isSupportedType (fieldValue )) {
9098 text = Optional .of (fieldValue .toString ());
@@ -108,18 +116,24 @@ public ConfigDef config() {
108116 return new ConfigDef ()
109117 .define ("field.name" ,
110118 ConfigDef .Type .STRING ,
119+ null ,
111120 ConfigDef .Importance .HIGH ,
112- "The field name to filter by" )
121+ "The field name to filter by." +
122+ "Schema-based records (Avro), schemaless (e.g. JSON), and raw values are supported." +
123+ "If empty, the whole key/value record will be filtered." )
113124 .define ("field.value" ,
114- ConfigDef .Type .STRING , null ,
125+ ConfigDef .Type .STRING ,
126+ null ,
115127 ConfigDef .Importance .HIGH ,
116128 "Expected value to match. Either define this, or a regex pattern" )
117129 .define ("field.value.pattern" ,
118- ConfigDef .Type .STRING , null ,
130+ ConfigDef .Type .STRING ,
131+ null ,
119132 ConfigDef .Importance .HIGH ,
120133 "The pattern to match. Either define this, or an expected value" )
121134 .define ("field.value.matches" ,
122- ConfigDef .Type .BOOLEAN , true ,
135+ ConfigDef .Type .BOOLEAN ,
136+ true ,
123137 ConfigDef .Importance .MEDIUM ,
124138 "The filter mode, 'true' for matching or 'false' for non-matching" );
125139 }
@@ -150,4 +164,31 @@ public void configure(final Map<String, ?> configs) {
150164 ? matchCondition
151165 : (result -> !matchCondition .test (result ));
152166 }
167+
168+
169+ public static final class Key <R extends ConnectRecord <R >> extends FilterByFieldValue <R > {
170+
171+ @ Override
172+ protected Schema operatingSchema (R record ) {
173+ return record .keySchema ();
174+ }
175+
176+ @ Override
177+ protected Object operatingValue (R record ) {
178+ return record .key ();
179+ }
180+ }
181+
182+ public static final class Value <R extends ConnectRecord <R >> extends FilterByFieldValue <R > {
183+
184+ @ Override
185+ protected Schema operatingSchema (R record ) {
186+ return record .valueSchema ();
187+ }
188+
189+ @ Override
190+ protected Object operatingValue (R record ) {
191+ return record .value ();
192+ }
193+ }
153194}
0 commit comments