2020import java .util .Optional ;
2121import java .util .function .Predicate ;
2222
23+ import java .util .regex .Pattern ;
2324import org .apache .kafka .common .config .AbstractConfig ;
2425import org .apache .kafka .common .config .ConfigDef ;
2526import org .apache .kafka .common .config .ConfigException ;
2627import org .apache .kafka .connect .connector .ConnectRecord ;
2728import org .apache .kafka .connect .data .Field ;
2829import org .apache .kafka .connect .data .Schema ;
30+ import org .apache .kafka .connect .data .SchemaAndValue ;
2931import org .apache .kafka .connect .data .Struct ;
3032import org .apache .kafka .connect .data .Values ;
3133import org .apache .kafka .connect .transforms .Transformation ;
@@ -35,7 +37,68 @@ public abstract class FilterByFieldValue<R extends ConnectRecord<R>> implements
3537 private String fieldName ;
3638 private Optional <String > fieldExpectedValue ;
3739 private Optional <String > fieldValuePattern ;
38- private Predicate <String > filterCondition ;
40+
41+ @ Override
42+ public ConfigDef config () {
43+ return new ConfigDef ()
44+ .define ("field.name" ,
45+ ConfigDef .Type .STRING ,
46+ null ,
47+ ConfigDef .Importance .HIGH ,
48+ "The field name to filter by."
49+ + "Schema-based records (Avro), schemaless (e.g. JSON), and raw values are supported."
50+ + "If empty, the whole key/value record will be filtered." )
51+ .define ("field.value" ,
52+ ConfigDef .Type .STRING ,
53+ null ,
54+ ConfigDef .Importance .HIGH ,
55+ "Expected value to match. Either define this, or a regex pattern" )
56+ .define ("field.value.pattern" ,
57+ ConfigDef .Type .STRING ,
58+ null ,
59+ ConfigDef .Importance .HIGH ,
60+ "The pattern to match. Either define this, or an expected value" )
61+ .define ("field.value.matches" ,
62+ ConfigDef .Type .BOOLEAN ,
63+ true ,
64+ ConfigDef .Importance .MEDIUM ,
65+ "The filter mode, 'true' for matching or 'false' for non-matching" );
66+ }
67+
68+ @ Override
69+ public void configure (final Map <String , ?> configs ) {
70+ final AbstractConfig config = new AbstractConfig (config (), configs );
71+ this .fieldName = config .getString ("field.name" );
72+ this .fieldExpectedValue = Optional .ofNullable (config .getString ("field.value" ));
73+ this .fieldValuePattern = Optional .ofNullable (config .getString ("field.value.pattern" ));
74+ final boolean expectedValuePresent = fieldExpectedValue .isPresent ();
75+ final boolean regexPatternPresent = fieldValuePattern .map (s -> !s .isEmpty ()).orElse (false );
76+ if (expectedValuePresent == regexPatternPresent ) {
77+ throw new ConfigException (
78+ "Either field.value or field.value.pattern have to be set to apply filter transform" );
79+ }
80+ final Predicate <SchemaAndValue > matchCondition ;
81+
82+ if (expectedValuePresent ) {
83+ final SchemaAndValue expectedSchemaAndValue = Values .parseString (fieldExpectedValue .get ());
84+ matchCondition = schemaAndValue -> expectedSchemaAndValue .value ().equals (schemaAndValue .value ());
85+ } else {
86+ final String pattern = fieldValuePattern .get ();
87+ final Predicate <String > regexPredicate = Pattern .compile (pattern ).asPredicate ();
88+ matchCondition = schemaAndValue ->
89+ schemaAndValue != null
90+ && regexPredicate .test (Values .convertToString (schemaAndValue .schema (), schemaAndValue .value ()));
91+ }
92+
93+ this .filterCondition = config .getBoolean ("field.value.matches" )
94+ ? matchCondition
95+ : (result -> !matchCondition .test (result ));
96+ }
97+ private Predicate <SchemaAndValue > filterCondition ;
98+
99+ protected abstract Schema operatingSchema (R record );
100+
101+ protected abstract Object operatingValue (R record );
39102
40103 @ Override
41104 public R apply (final R record ) {
@@ -50,102 +113,46 @@ public R apply(final R record) {
50113 }
51114 }
52115
53-
54- protected abstract Schema operatingSchema (R record );
55-
56- protected abstract Object operatingValue (R record );
57-
58116 private R applyWithSchema (final R record ) {
59117 final Struct struct = (Struct ) operatingValue (record );
60- final String fieldValue = getStructFieldValue (struct , fieldName ).orElse (null );
61- return filterCondition .test (fieldValue ) ? record : null ;
118+ final SchemaAndValue schemaAndValue = getStructFieldValue (struct , fieldName ).orElse (null );
119+ return filterCondition .test (schemaAndValue ) ? record : null ;
62120 }
63121
64- private Optional <String > getStructFieldValue (final Struct struct , final String fieldName ) {
122+ private Optional <SchemaAndValue > getStructFieldValue (final Struct struct , final String fieldName ) {
65123 final Schema schema = struct .schema ();
66124 final Field field = schema .field (fieldName );
67125 final Object fieldValue = struct .get (field );
68- return Optional .ofNullable (Values .convertToString (field .schema (), fieldValue ));
126+ if (fieldValue == null ) {
127+ return Optional .empty ();
128+ } else {
129+ return Optional .of (new SchemaAndValue (field .schema (), struct .get (field )));
130+ }
69131 }
70132
71133 @ SuppressWarnings ("unchecked" )
72134 private R applySchemaless (final R record ) {
73135 if (fieldName == null || fieldName .isEmpty ()) {
74- final String value = getSchemalessFieldValue (operatingValue (record )).orElse (null );
75- return filterCondition .test (value ) ? record : null ;
136+ final SchemaAndValue schemaAndValue = getSchemalessFieldValue (operatingValue (record )).orElse (null );
137+ return filterCondition .test (schemaAndValue ) ? record : null ;
76138 } else {
77139 final Map <String , Object > map = (Map <String , Object >) operatingValue (record );
78- final String fieldValue = getSchemalessFieldValue (map .get (fieldName )).orElse (null );
79- return filterCondition .test (fieldValue ) ? record : null ;
140+ final SchemaAndValue schemaAndValue = getSchemalessFieldValue (map .get (fieldName )).orElse (null );
141+ return filterCondition .test (schemaAndValue ) ? record : null ;
80142 }
81143 }
82144
83- private Optional <String > getSchemalessFieldValue (final Object fieldValue ) {
145+ private Optional <SchemaAndValue > getSchemalessFieldValue (final Object fieldValue ) {
84146 if (fieldValue == null ) {
85147 return Optional .empty ();
86148 }
87- return Optional .ofNullable (Values .convertToString (null , fieldValue ));
88- }
89-
90- @ Override
91- public ConfigDef config () {
92- return new ConfigDef ()
93- .define ("field.name" ,
94- ConfigDef .Type .STRING ,
95- null ,
96- ConfigDef .Importance .HIGH ,
97- "The field name to filter by."
98- + "Schema-based records (Avro), schemaless (e.g. JSON), and raw values are supported."
99- + "If empty, the whole key/value record will be filtered." )
100- .define ("field.value" ,
101- ConfigDef .Type .STRING ,
102- null ,
103- ConfigDef .Importance .HIGH ,
104- "Expected value to match. Either define this, or a regex pattern" )
105- .define ("field.value.pattern" ,
106- ConfigDef .Type .STRING ,
107- null ,
108- ConfigDef .Importance .HIGH ,
109- "The pattern to match. Either define this, or an expected value" )
110- .define ("field.value.matches" ,
111- ConfigDef .Type .BOOLEAN ,
112- true ,
113- ConfigDef .Importance .MEDIUM ,
114- "The filter mode, 'true' for matching or 'false' for non-matching" );
149+ return Optional .of (new SchemaAndValue (Values .inferSchema (fieldValue ), fieldValue ));
115150 }
116151
117152 @ Override
118153 public void close () {
119154 }
120155
121- @ Override
122- public void configure (final Map <String , ?> configs ) {
123- final AbstractConfig config = new AbstractConfig (config (), configs );
124- this .fieldName = config .getString ("field.name" );
125- this .fieldExpectedValue = Optional .ofNullable (config .getString ("field.value" ));
126- this .fieldValuePattern = Optional .ofNullable (config .getString ("field.value.pattern" ));
127- final boolean expectedValuePresent = fieldExpectedValue .map (s -> !s .isEmpty ()).orElse (false );
128- final boolean regexPatternPresent = fieldValuePattern .map (s -> !s .isEmpty ()).orElse (false );
129- if (expectedValuePresent == regexPatternPresent ) {
130- throw new ConfigException (
131- "Either field.value or field.value.pattern have to be set to apply filter transform" );
132- }
133- final Predicate <String > matchCondition ;
134-
135- if (expectedValuePresent ) {
136- final String expectedValue = fieldExpectedValue .get ();
137- matchCondition = fieldValue -> fieldValue != null && fieldValue .equals (expectedValue );
138- } else {
139- final String pattern = fieldValuePattern .get ();
140- matchCondition = fieldValue -> fieldValue != null && fieldValue .matches (pattern );
141- }
142-
143- this .filterCondition = config .getBoolean ("field.value.matches" )
144- ? matchCondition
145- : (result -> !matchCondition .test (result ));
146- }
147-
148-
149156 public static final class Key <R extends ConnectRecord <R >> extends FilterByFieldValue <R > {
150157
151158 @ Override
0 commit comments