Skip to content
This repository was archived by the owner on Mar 21, 2023. It is now read-only.

Commit 2038504

Browse files
dennisoelkerskroepke
authored andcommitted
Add sample Decorator preset. (#52)
* Providing a message decorator that uses pipelines. * Making decorator configurable. * Allow adding new messages by pipeline decorator. * Adding changes related due to introduced listener. * Adapt to naming changes, using easier forEach idiom. * Changing decorator to work on SearchResponse instead of message list. * Adding decoration stats for pipeline processor decorator. * Add uppercase decorator using pipelines interpreter with preset. * Decorators don't need to generate decoration stats on their own anymore.
1 parent 3fa6e5b commit 2038504

File tree

4 files changed

+133
-5
lines changed

4 files changed

+133
-5
lines changed

src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorMessageDecorator.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.graylog.plugins.pipelineprocessor;
1818

1919
import com.google.common.base.Strings;
20+
import com.google.common.collect.ImmutableMap;
2021
import com.google.common.collect.ImmutableMultimap;
2122
import com.google.common.collect.ImmutableSet;
2223
import com.google.inject.assistedinject.Assisted;
@@ -82,7 +83,7 @@ public ConfigurationRequest getRequestedConfiguration() {
8283

8384
public static class Descriptor extends SearchResponseDecorator.Descriptor {
8485
public Descriptor() {
85-
super("Pipeline Processor Decorator", false, "http://docs.graylog.org/en/2.0/pages/pipelines.html", "Pipeline Processor Decorator");
86+
super("Pipeline Processor Decorator", "http://docs.graylog.org/en/2.0/pages/pipelines.html", "Pipeline Processor Decorator");
8687
}
8788
}
8889

@@ -114,7 +115,11 @@ public SearchResponse apply(SearchResponse searchResponse) {
114115
results.add(ResultMessageSummary.create(inMessage.highlightRanges(), message.getFields(), inMessage.index()));
115116
additionalCreatedMessages.forEach((additionalMessage) -> {
116117
// TODO: pass proper highlight ranges. Need to rebuild them for new messages.
117-
results.add(ResultMessageSummary.create(ImmutableMultimap.of(), additionalMessage.getFields(), "[created from decorator]"));
118+
results.add(ResultMessageSummary.create(
119+
ImmutableMultimap.of(),
120+
additionalMessage.getFields(),
121+
"[created from decorator]"
122+
));
118123
});
119124
});
120125

src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorModule.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,8 @@ protected void configure() {
5050
installSearchResponseDecorator(searchResponseDecoratorBinder(),
5151
PipelineProcessorMessageDecorator.class,
5252
PipelineProcessorMessageDecorator.Factory.class);
53+
installSearchResponseDecorator(searchResponseDecoratorBinder(),
54+
UpperCaseDecorator.class,
55+
UpperCaseDecorator.Factory.class);
5356
}
5457
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package org.graylog.plugins.pipelineprocessor;
2+
3+
import com.google.common.collect.ImmutableList;
4+
import com.google.common.collect.ImmutableMap;
5+
import com.google.common.collect.ImmutableMultimap;
6+
import com.google.inject.assistedinject.Assisted;
7+
import org.graylog.plugins.pipelineprocessor.ast.Pipeline;
8+
import org.graylog.plugins.pipelineprocessor.ast.Rule;
9+
import org.graylog.plugins.pipelineprocessor.parser.PipelineRuleParser;
10+
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter;
11+
import org.graylog.plugins.pipelineprocessor.processors.listeners.NoopInterpreterListener;
12+
import org.graylog2.decorators.Decorator;
13+
import org.graylog2.plugin.Message;
14+
import org.graylog2.plugin.configuration.ConfigurationRequest;
15+
import org.graylog2.plugin.configuration.fields.TextField;
16+
import org.graylog2.plugin.decorators.SearchResponseDecorator;
17+
import org.graylog2.rest.models.messages.responses.ResultMessageSummary;
18+
import org.graylog2.rest.resources.search.responses.SearchResponse;
19+
20+
import javax.inject.Inject;
21+
import java.util.ArrayList;
22+
import java.util.HashSet;
23+
import java.util.List;
24+
import java.util.Map;
25+
26+
public class UpperCaseDecorator implements SearchResponseDecorator {
27+
private static final String CK_FIELD_NAME = "fieldName";
28+
private static final String CK_PIPELINE_DEFINITION = "pipeline \"Uppercase decorator\"\nstage 0 match either\nrule \"Uppercase field\"\nend";
29+
30+
private final List<Pipeline> pipelines;
31+
private final PipelineInterpreter pipelineInterpreter;
32+
private final Decorator decorator;
33+
34+
public interface Factory extends SearchResponseDecorator.Factory {
35+
@Override
36+
UpperCaseDecorator create(Decorator decorator);
37+
38+
@Override
39+
Config getConfig();
40+
41+
@Override
42+
Descriptor getDescriptor();
43+
}
44+
45+
public static class Config implements SearchResponseDecorator.Config {
46+
@Inject
47+
public Config() {
48+
}
49+
50+
@Override
51+
public ConfigurationRequest getRequestedConfiguration() {
52+
return new ConfigurationRequest() {{
53+
addField(new TextField(CK_FIELD_NAME, "Field Name", "", "The Name of the field which should be uppercased"));
54+
}};
55+
};
56+
}
57+
58+
public static class Descriptor extends SearchResponseDecorator.Descriptor {
59+
public Descriptor() {
60+
super("Uppercase Decorator", "http://docs.graylog.org/en/2.0/pages/pipelines.html", "Uppercase Decorator");
61+
}
62+
}
63+
64+
@Inject
65+
public UpperCaseDecorator(PipelineInterpreter pipelineInterpreter,
66+
PipelineRuleParser pipelineRuleParser,
67+
@Assisted Decorator decorator) {
68+
this.pipelineInterpreter = pipelineInterpreter;
69+
this.decorator = decorator;
70+
final String fieldName = (String)decorator.config().get(CK_FIELD_NAME);
71+
72+
this.pipelines = pipelineRuleParser.parsePipelines(CK_PIPELINE_DEFINITION);
73+
final List<Rule> rules = ImmutableList.of(pipelineRuleParser.parseRule(getRuleForField(fieldName), true));
74+
this.pipelines.forEach(pipeline -> {
75+
pipeline.stages().forEach(stage -> stage.setRules(rules));
76+
});
77+
}
78+
79+
@Override
80+
public SearchResponse apply(SearchResponse searchResponse) {
81+
final List<ResultMessageSummary> results = new ArrayList<>();
82+
searchResponse.messages().forEach((inMessage) -> {
83+
final Map<String, Object> originalMessage = ImmutableMap.copyOf(inMessage.message());
84+
final Message message = new Message(inMessage.message());
85+
final List<Message> additionalCreatedMessages = pipelineInterpreter.processForResolvedPipelines(message,
86+
message.getId(),
87+
new HashSet<>(this.pipelines),
88+
new NoopInterpreterListener());
89+
90+
results.add(ResultMessageSummary.create(inMessage.highlightRanges(), message.getFields(), inMessage.index()));
91+
additionalCreatedMessages.forEach((additionalMessage) -> {
92+
// TODO: pass proper highlight ranges. Need to rebuild them for new messages.
93+
results.add(ResultMessageSummary.create(
94+
ImmutableMultimap.of(),
95+
additionalMessage.getFields(),
96+
"[created from decorator]"
97+
));
98+
});
99+
});
100+
101+
return searchResponse.toBuilder().messages(results).build();
102+
}
103+
104+
private String getRuleForField(String fieldName) {
105+
return "rule \"Uppercase field\"\n" +
106+
"when\n" +
107+
"has_field(\"" + fieldName + "\")\n" +
108+
"then\n" +
109+
"set_field(\"" + fieldName + "\", uppercase(to_string($message." + fieldName + ")));\n" +
110+
"end";
111+
}
112+
}

src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -272,12 +272,20 @@ public Messages process(Messages messages, InterpreterListener interpreterListen
272272
}
273273

274274
public List<Message> processForPipelines(Message message, String msgId, Set<String> pipelines, InterpreterListener interpreterListener) {
275-
final ImmutableSet<Pipeline> pipelinesToRun = ImmutableSet.copyOf(pipelines.stream().map(pipelineId -> this.currentPipelines.get().get(pipelineId)).collect(Collectors.toSet()));
275+
final ImmutableSet<Pipeline> pipelinesToRun = ImmutableSet.copyOf(pipelines
276+
.stream()
277+
.map(pipelineId -> this.currentPipelines.get().get(pipelineId))
278+
.collect(Collectors.toSet()));
279+
280+
return processForResolvedPipelines(message, msgId, pipelinesToRun, interpreterListener);
281+
}
282+
283+
public List<Message> processForResolvedPipelines(Message message, String msgId, Set<Pipeline> pipelines, InterpreterListener interpreterListener) {
276284
final List<Message> result = new ArrayList<>();
277285
// record execution of pipeline in metrics
278-
pipelinesToRun.stream().forEach(pipeline -> metricRegistry.counter(name(Pipeline.class, pipeline.id(), "executed")).inc());
286+
pipelines.forEach(pipeline -> metricRegistry.counter(name(Pipeline.class, pipeline.id(), "executed")).inc());
279287

280-
final StageIterator stages = new StageIterator(pipelinesToRun);
288+
final StageIterator stages = new StageIterator(pipelines);
281289
final Set<Pipeline> pipelinesToSkip = Sets.newHashSet();
282290

283291
// iterate through all stages for all matching pipelines, per "stage slice" instead of per pipeline.

0 commit comments

Comments
 (0)