Skip to content

Commit 5bca71c

Browse files
feat(filters): support GrokFilter target and array input
1 parent fcfaa19 commit 5bca71c

File tree

4 files changed

+215
-14
lines changed

4 files changed

+215
-14
lines changed

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/config/GrokFilterConfig.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ public class GrokFilterConfig extends CommonFilterConfig {
1414

1515
private final GrokConfig grok;
1616
public static final String GROK_FILTER = "GROK_FILTER";
17+
18+
public static final String GROK_TARGET_CONFIG = "target";
19+
public static final String GROK_TARGET_DOC = "The target field to put the extracted Grok data (optional)";
1720

1821
/**
1922
* Creates a new {@link GrokFilterConfig} instance.
@@ -29,11 +32,26 @@ public GrokConfig grok() {
2932
return grok;
3033
}
3134

35+
public String target() {
36+
return getString(GROK_TARGET_CONFIG);
37+
}
38+
3239
public static ConfigDef configDef() {
3340
int filterGroupCounter = 0;
3441
final ConfigDef def = new ConfigDef(CommonFilterConfig.configDef())
3542
.define(getSourceConfigKey(GROK_FILTER, filterGroupCounter++))
36-
.define(getOverwriteConfigKey(GROK_FILTER, filterGroupCounter++));
43+
.define(getOverwriteConfigKey(GROK_FILTER, filterGroupCounter++))
44+
.define(
45+
GROK_TARGET_CONFIG,
46+
ConfigDef.Type.STRING,
47+
null,
48+
ConfigDef.Importance.HIGH,
49+
GROK_TARGET_DOC,
50+
GROK_FILTER,
51+
filterGroupCounter++,
52+
ConfigDef.Width.NONE,
53+
GROK_TARGET_CONFIG
54+
);
3755
for (ConfigDef.ConfigKey configKey : GrokConfig.configDef().configKeys().values()) {
3856
def.define(new ConfigDef.ConfigKey(
3957
configKey.name,

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/GrokFilter.java

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
*/
77
package io.streamthoughts.kafka.connect.filepulse.filter;
88

9-
import io.streamthoughts.kafka.connect.filepulse.config.CommonFilterConfig;
109
import io.streamthoughts.kafka.connect.filepulse.config.GrokFilterConfig;
1110
import io.streamthoughts.kafka.connect.filepulse.data.Type;
1211
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
@@ -17,6 +16,7 @@
1716
import io.streamthoughts.kafka.connect.transform.pattern.GrokPatternResolver;
1817
import java.nio.charset.StandardCharsets;
1918
import java.util.ArrayList;
19+
import java.util.Collection;
2020
import java.util.HashMap;
2121
import java.util.List;
2222
import java.util.Map;
@@ -58,7 +58,7 @@ public void configure(final Map<String, ?> props) {
5858
*/
5959
@Override
6060
public ConfigDef configDef() {
61-
return CommonFilterConfig.configDef();
61+
return GrokFilterConfig.configDef();
6262
}
6363

6464
/**
@@ -68,10 +68,56 @@ public ConfigDef configDef() {
6868
protected RecordsIterable<TypedStruct> apply(final FilterContext context,
6969
final TypedStruct record) throws FilterException {
7070

71-
final String value = record.getString(config.source());
71+
final TypedValue value = record.find(config.source());
72+
73+
if (value == null) {
74+
throw new FilterException("Invalid field '" + config.source() + "', field does not exist");
75+
}
76+
77+
final List<String> valuesToProcess = resolveValues(value);
78+
final List<TypedStruct> extractedResults = new ArrayList<>();
79+
80+
for (String valueToProcess : valuesToProcess) {
81+
extractedResults.add(applyFilterOnValue(valueToProcess));
82+
}
83+
84+
return buildResult(extractedResults);
85+
}
86+
87+
/**
88+
* Normalizes the configured source field into the list of strings to run through the Grok patterns.
89+
*
90+
* @param value the typed value obtained from the record for the configured source path.
91+
* @return list of string entries to parse.
92+
*/
93+
private List<String> resolveValues(final TypedValue value) {
94+
if (value.type() == Type.STRING) {
95+
return List.of(value.getString());
96+
}
97+
98+
if (value.type() == Type.ARRAY) {
99+
final Collection<Object> array = value.getArray();
100+
final List<String> values = new ArrayList<>(array.size());
101+
for (Object item : array) {
102+
if (!(item instanceof String)) {
103+
throw new FilterException(
104+
"Array contains non-string element of type: " + item.getClass().getName());
105+
}
106+
values.add((String) item);
107+
}
108+
return values;
109+
}
72110

73-
if (value == null) return null;
111+
throw new FilterException("Source field must be either STRING or ARRAY type, got: " + value.type());
112+
}
74113

114+
/**
115+
* Applies all configured Grok patterns on the given value and returns the resulting struct.
116+
*
117+
* @param value the string payload to match.
118+
* @return the struct built from captured groups.
119+
*/
120+
private TypedStruct applyFilterOnValue(final String value) {
75121
final byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
76122

77123
List<SchemaAndNamedCaptured> allNamedCaptured = new ArrayList<>(matchPatterns.size());
@@ -89,7 +135,30 @@ protected RecordsIterable<TypedStruct> apply(final FilterContext context,
89135
}
90136

91137
final Schema schema = mergeToSchema(allNamedCaptured);
92-
return RecordsIterable.of(mergeToStruct(allNamedCaptured, schema));
138+
return mergeToStruct(allNamedCaptured, schema);
139+
}
140+
141+
/**
142+
* Builds the output payload by honoring the target field and the number of extracted results.
143+
*
144+
* @param results list of extracted structs for each processed value.
145+
* @return iterable wrapping the final record to merge.
146+
*/
147+
private RecordsIterable<TypedStruct> buildResult(final List<TypedStruct> results) {
148+
final boolean hasTarget = config.target() != null;
149+
final String targetField = hasTarget ? config.target() : config.source();
150+
151+
if (results.size() == 1 && !hasTarget) {
152+
return RecordsIterable.of(results.get(0));
153+
}
154+
155+
final TypedStruct result = TypedStruct.create();
156+
if (results.size() == 1) {
157+
result.insert(targetField, results.get(0));
158+
} else {
159+
result.insert(targetField, TypedValue.array(results, Type.STRUCT));
160+
}
161+
return RecordsIterable.of(result);
93162
}
94163

95164
/**

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/GrokFilterTest.java

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.streamthoughts.kafka.connect.filepulse.config.CommonFilterConfig;
1010
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
1111
import io.streamthoughts.kafka.connect.transform.GrokConfig;
12+
import java.util.Arrays;
1213
import java.util.HashMap;
1314
import java.util.List;
1415
import java.util.Map;
@@ -86,4 +87,116 @@ public void testGivenPatternWithNoGroupWhenCapturedNameOnlyIsFalse() {
8687
Assert.assertEquals("INFO", struct.getString("LOGLEVEL"));
8788
Assert.assertEquals("a dummy log message", struct.getString("GREEDYDATA"));
8889
}
90+
91+
@Test
92+
public void testGivenTargetField() {
93+
configs.put(GrokConfig.GROK_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN);
94+
configs.put("target", "logData");
95+
filter.configure(configs, alias -> null);
96+
List<TypedStruct> results = filter.apply(null, DATA, false).collect();
97+
98+
Assert.assertEquals(1, results.size());
99+
TypedStruct struct = results.get(0);
100+
101+
// Original message field should still exist
102+
Assert.assertEquals(INPUT, struct.getString("message"));
103+
104+
// Extracted data should be in the target field
105+
Assert.assertTrue(struct.exists("logData"));
106+
TypedStruct logData = struct.getStruct("logData");
107+
Assert.assertNotNull(logData);
108+
Assert.assertEquals("1970-01-01 00:00:00,000", logData.getString("timestamp"));
109+
Assert.assertEquals("INFO", logData.getString("level"));
110+
// Inside the target struct, message is just the extracted value (no merging with original)
111+
Assert.assertEquals("a dummy log message", logData.getString("message"));
112+
}
113+
114+
@Test
115+
public void testGivenTargetFieldWithOverwrite() {
116+
configs.put(GrokConfig.GROK_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN);
117+
configs.put("target", "parsed");
118+
configs.put(CommonFilterConfig.FILTER_OVERWRITE_CONFIG, "message");
119+
filter.configure(configs, alias -> null);
120+
List<TypedStruct> results = filter.apply(null, DATA, false).collect();
121+
122+
Assert.assertEquals(1, results.size());
123+
TypedStruct struct = results.get(0);
124+
125+
// Original message field should still exist
126+
Assert.assertEquals(INPUT, struct.getString("message"));
127+
128+
// Extracted data should be in the target field with overwrite applied
129+
Assert.assertTrue(struct.exists("parsed"));
130+
TypedStruct parsed = struct.getStruct("parsed");
131+
Assert.assertNotNull(parsed);
132+
Assert.assertEquals("1970-01-01 00:00:00,000", parsed.getString("timestamp"));
133+
Assert.assertEquals("INFO", parsed.getString("level"));
134+
// With overwrite, message should be a string, not an array
135+
Assert.assertEquals("a dummy log message", parsed.getString("message"));
136+
}
137+
138+
@Test
139+
public void testGivenArraySourceWithNestedTarget() {
140+
TypedStruct record = TypedStruct.create();
141+
record.put(
142+
"logData",
143+
Arrays.asList(
144+
"1970-01-01 00:00:00,000 INFO first log message",
145+
"1970-01-01 00:00:01,000 WARN second log message",
146+
"1970-01-01 00:00:02,000 ERROR third log message"));
147+
148+
configs.put(GrokConfig.GROK_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN);
149+
configs.put(CommonFilterConfig.FILTER_SOURCE_FIELD_CONFIG, "logData");
150+
configs.put("target", "parsed.records");
151+
filter.configure(configs, alias -> null);
152+
153+
List<TypedStruct> results = filter.apply(null, record, false).collect();
154+
Assert.assertEquals(1, results.size());
155+
TypedStruct struct = results.get(0);
156+
157+
Assert.assertTrue(struct.exists("parsed"));
158+
TypedStruct parsed = struct.getStruct("parsed");
159+
Assert.assertNotNull(parsed);
160+
Assert.assertTrue(parsed.exists("records"));
161+
List<Object> records = parsed.getArray("records");
162+
Assert.assertEquals(3, records.size());
163+
164+
TypedStruct first = (TypedStruct) records.get(0);
165+
Assert.assertEquals("1970-01-01 00:00:00,000", first.getString("timestamp"));
166+
Assert.assertEquals("INFO", first.getString("level"));
167+
Assert.assertEquals("first log message", first.getString("message"));
168+
169+
TypedStruct last = (TypedStruct) records.get(2);
170+
Assert.assertEquals("1970-01-01 00:00:02,000", last.getString("timestamp"));
171+
Assert.assertEquals("ERROR", last.getString("level"));
172+
Assert.assertEquals("third log message", last.getString("message"));
173+
}
174+
175+
@Test(expected = FilterException.class)
176+
public void testGivenArraySourceWithInvalidElementTypeShouldFail() {
177+
TypedStruct record = TypedStruct.create().put(
178+
"logs",
179+
Arrays.asList("1970-01-01 00:00:00,000 INFO first log message", 1));
180+
181+
configs.put(GrokConfig.GROK_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN);
182+
configs.put(CommonFilterConfig.FILTER_SOURCE_FIELD_CONFIG, "logs");
183+
filter.configure(configs, alias -> null);
184+
185+
filter.apply(null, record, false);
186+
}
187+
188+
@Test(expected = FilterException.class)
189+
public void testGivenArraySourceWithNonMatchingEntryShouldFail() {
190+
TypedStruct record = TypedStruct.create().put(
191+
"logs",
192+
Arrays.asList(
193+
"1970-01-01 00:00:00,000 INFO first log message",
194+
"INVALID"));
195+
196+
configs.put(GrokConfig.GROK_PATTERN_CONFIG, GROK_NAMED_CAPTURED_PATTERN);
197+
configs.put(CommonFilterConfig.FILTER_SOURCE_FIELD_CONFIG, "logs");
198+
filter.configure(configs, alias -> null);
199+
200+
filter.apply(null, record, false);
201+
}
89202
}

docs/content/en/docs/Developer Guide/filters.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -632,14 +632,15 @@ The `GrokFilter`is based on: https://github.com/streamthoughts/kafka-connect-tra
632632

633633
### Configuration
634634

635-
| Configuration | Description | Type | Default | Importance |
636-
|----------------------|-----------------------------------------------|---------|-----------|------------|
637-
| `namedCapturesOnly` | If true, only store named captures from grok. | boolean | *true* | high |
638-
| `pattern` | The Grok pattern to match. | string | *-* | high |
639-
| `overwrite` | The fields to overwrite. | list | medium |
640-
| `patternDefinitions` | Custom pattern definitions. | list | *-* | low |
641-
| `patternsDir` | List of user-defined pattern directories | string | *-* | low |
642-
| `source` | The input field on which to apply the filter | string | *message* | medium |
635+
| Configuration | Description | Type | Default | Importance |
636+
|----------------------|----------------------------------------------------------|----------------------- |-----------|------------|
637+
| `namedCapturesOnly` | If true, only store named captures from grok. | boolean | *true* | high |
638+
| `pattern` | The Grok pattern to match. | string | *-* | high |
639+
| `overwrite` | The fields to overwrite. | list | | medium |
640+
| `patternDefinitions` | Custom pattern definitions. | list | *-* | low |
641+
| `patternsDir` | List of user-defined pattern directories. | string | *-* | low |
642+
| `source` | The input field on which to apply the filter | string / array<string> | *message* | medium |
643+
| `target` | (Optional) Destination field receiving extracted values. | string | *-* | medium |
643644

644645
### Examples
645646

0 commit comments

Comments
 (0)