3434
3535import static org .apache .kafka .connect .data .Schema .Type .STRING ;
3636
37- public class FilterByFieldValue <R extends ConnectRecord <R >> implements Transformation <R > {
37+ public abstract class FilterByFieldValue <R extends ConnectRecord <R >> implements Transformation <R > {
3838
3939 private String fieldName ;
4040 private Optional <String > fieldExpectedValue ;
@@ -43,15 +43,23 @@ public class FilterByFieldValue<R extends ConnectRecord<R>> implements Transform
4343
4444 @ Override
4545 public R apply (final R record ) {
46- if (record .value () instanceof Struct ) {
47- return handleStruct (record );
48- } else if (record .value () instanceof Map ) {
49- return handleMap (record );
46+ if (operatingValue (record ) == null ) {
47+ return record ;
48+ }
49+
50+ if (operatingSchema (record ) == null ) {
51+ return applySchemaless (record );
52+ } else {
53+ return applyWithSchema (record );
5054 }
51- return record ; // if record is other than map or struct, pass-by
5255 }
5356
54- private R handleStruct (final R record ) {
57+
58+ protected abstract Schema operatingSchema (R record );
59+
60+ protected abstract Object operatingValue (R record );
61+
62+ private R applyWithSchema (final R record ) {
5563 final Struct struct = (Struct ) record .value ();
5664 final Optional <String > fieldValue = extractStructFieldValue (struct , fieldName );
5765 return filterCondition .test (fieldValue ) ? record : null ;
@@ -72,19 +80,19 @@ private Optional<String> extractStructFieldValue(final Struct struct, final Stri
7280 }
7381
7482 @ SuppressWarnings ("unchecked" )
75- private R handleMap (final R record ) {
76- final Map <String , Object > map = (Map <String , Object >) record .value ();
77- final Optional <String > fieldValue = extractMapFieldValue (map , fieldName );
78- return filterCondition .test (fieldValue ) ? record : null ;
79- }
80-
81- private Optional <String > extractMapFieldValue (final Map <String , Object > map , final String fieldName ) {
82- if (!map .containsKey (fieldName )) {
83- return Optional .empty ();
83+ private R applySchemaless (final R record ) {
84+ if (fieldName .isEmpty ()) {
85+ final Optional <String > value = extractSchemalessFieldValue (operatingValue (record ));
86+ return filterCondition .test (value ) ? record : null ;
87+ } else {
88+ final Map <String , Object > map = (Map <String , Object >) record .value ();
89+ final Optional <String > fieldValue = extractSchemalessFieldValue (map .get (fieldName ));
90+ return filterCondition .test (fieldValue ) ? record : null ;
8491 }
92+ }
8593
86- final Object fieldValue = map . get ( fieldName );
87-
94+ private Optional < String > extractSchemalessFieldValue ( final Object fieldValue ) {
95+ if ( fieldValue == null ) return Optional . empty ();
8896 Optional <String > text = Optional .empty ();
8997 if (isSupportedType (fieldValue )) {
9098 text = Optional .of (fieldValue .toString ());
@@ -150,4 +158,31 @@ public void configure(final Map<String, ?> configs) {
150158 ? matchCondition
151159 : (result -> !matchCondition .test (result ));
152160 }
161+
162+
163+ public static final class Key <R extends ConnectRecord <R >> extends FilterByFieldValue <R > {
164+
165+ @ Override
166+ protected Schema operatingSchema (R record ) {
167+ return record .keySchema ();
168+ }
169+
170+ @ Override
171+ protected Object operatingValue (R record ) {
172+ return record .key ();
173+ }
174+ }
175+
176+ public static final class Value <R extends ConnectRecord <R >> extends FilterByFieldValue <R > {
177+
178+ @ Override
179+ protected Schema operatingSchema (R record ) {
180+ return record .valueSchema ();
181+ }
182+
183+ @ Override
184+ protected Object operatingValue (R record ) {
185+ return record .value ();
186+ }
187+ }
153188}
0 commit comments