Skip to content
This repository was archived by the owner on Mar 21, 2023. It is now read-only.

Commit 6f7e5c1

Browse files
authored
Add simulation details (#36)
* Add listener parameter to pipeline interpreter Makes it possible to hook into different pipeline processing steps. * Include simulation trace when simulating process In that way it's more straightforward to see what happened during the process if something unexpected happened. * Rename SimulationPreview to SimulationResults * Adapt UI to response with simulation trace * Add time that took processing the message * Move simulated messages to their own component * Add dropdown to select simulation results view This dropdown will let the user switch among: - Preview of processed messages - Changes summary - Simulation trace * First version of simulator trace * Generate simulation view options from a function * Add changes summary view option to simulator Provides a summary of changes done to the message, including added, removed, and mutated fields in the original message.
1 parent 6b35cec commit 6f7e5c1

20 files changed

+804
-100
lines changed

src/main/java/org/graylog/plugins/pipelineprocessor/ast/Pipeline.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,11 @@ public abstract static class Builder {
5454

5555
public abstract Builder stages(SortedSet<Stage> stages);
5656
}
57+
58+
public String toString() {
59+
final StringBuilder sb = new StringBuilder("Pipeline ");
60+
sb.append("'").append(name()).append("'");
61+
sb.append(" (").append(id()).append(")");
62+
return sb.toString();
63+
}
5764
}

src/main/java/org/graylog/plugins/pipelineprocessor/ast/Rule.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,11 @@ public abstract static class Builder {
6262
public abstract Rule build();
6363
}
6464

65+
66+
public String toString() {
67+
final StringBuilder sb = new StringBuilder("Rule ");
68+
sb.append("'").append(name()).append("'");
69+
sb.append(" (").append(id()).append(")");
70+
return sb.toString();
71+
}
6572
}

src/main/java/org/graylog/plugins/pipelineprocessor/ast/Stage.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,8 @@ public abstract static class Builder {
5757

5858
public abstract Builder ruleReferences(List<String> ruleRefs);
5959
}
60+
61+
public String toString() {
62+
return "Stage " + stage();
63+
}
6064
}

src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import org.graylog.plugins.pipelineprocessor.events.RulesChangedEvent;
4444
import org.graylog.plugins.pipelineprocessor.parser.ParseException;
4545
import org.graylog.plugins.pipelineprocessor.parser.PipelineRuleParser;
46+
import org.graylog.plugins.pipelineprocessor.processors.listeners.InterpreterListener;
47+
import org.graylog.plugins.pipelineprocessor.processors.listeners.NoopInterpreterListener;
4648
import org.graylog.plugins.pipelineprocessor.rest.PipelineConnections;
4749
import org.graylog2.plugin.Message;
4850
import org.graylog2.plugin.MessageCollection;
@@ -177,6 +179,11 @@ private synchronized void reload() {
177179
*/
178180
@Override
179181
public Messages process(Messages messages) {
182+
return process(messages, new NoopInterpreterListener());
183+
}
184+
185+
public Messages process(Messages messages, InterpreterListener interpreterListener) {
186+
interpreterListener.startProcessing();
180187
// message id + stream id
181188
final Set<Tuple2<String, String>> processingBlacklist = Sets.newHashSet();
182189

@@ -208,6 +215,7 @@ public Messages process(Messages messages) {
208215
} else {
209216
// get the default stream pipeline connections for this message
210217
pipelinesToRun = streamConnection.get("default");
218+
interpreterListener.processDefaultStream(message, pipelinesToRun);
211219
if (log.isDebugEnabled()) {
212220
log.debug("[{}] running default stream pipelines: [{}]",
213221
msgId,
@@ -223,6 +231,7 @@ public Messages process(Messages messages) {
223231
pipelinesToRun = ImmutableSet.copyOf(streamsIds.stream()
224232
.flatMap(streamId -> streamConnection.get(streamId).stream())
225233
.collect(Collectors.toSet()));
234+
interpreterListener.processStreams(message, pipelinesToRun, streamsIds);
226235
log.debug("[{}] running pipelines {} for streams {}", msgId, pipelinesToRun, streamsIds);
227236
}
228237

@@ -246,6 +255,7 @@ public Messages process(Messages messages) {
246255
continue;
247256
}
248257
metricRegistry.counter(name(Pipeline.class, pipeline.id(), "stage", String.valueOf(stage.stage()), "executed")).inc();
258+
interpreterListener.enterStage(stage);
249259
log.debug("[{}] evaluating rule conditions in stage {}: match {}",
250260
msgId,
251261
stage.stage(),
@@ -258,34 +268,40 @@ public Messages process(Messages messages) {
258268
final ArrayList<Rule> rulesToRun = Lists.newArrayListWithCapacity(stage.getRules().size());
259269
boolean anyRulesMatched = false;
260270
for (Rule rule : stage.getRules()) {
271+
interpreterListener.evaluateRule(rule, pipeline);
261272
if (rule.when().evaluateBool(context)) {
262273
anyRulesMatched = true;
263274
countRuleExecution(rule, pipeline, stage, "matched");
264275

265276
if (context.hasEvaluationErrors()) {
266277
final EvaluationContext.EvalError lastError = Iterables.getLast(context.evaluationErrors());
267278
appendProcessingError(rule, message, lastError.toString());
279+
interpreterListener.failEvaluateRule(rule, pipeline);
268280
log.debug("Encountered evaluation error during condition, skipping rule actions: {}",
269281
lastError);
270282
continue;
271283
}
284+
interpreterListener.satisfyRule(rule, pipeline);
272285
log.debug("[{}] rule `{}` matches, scheduling to run", msgId, rule.name());
273286
rulesToRun.add(rule);
274287
} else {
275288
countRuleExecution(rule, pipeline, stage, "not-matched");
289+
interpreterListener.dissatisfyRule(rule, pipeline);
276290
log.debug("[{}] rule `{}` does not match", msgId, rule.name());
277291
}
278292
}
279293
RULES:
280294
for (Rule rule : rulesToRun) {
281295
countRuleExecution(rule, pipeline, stage, "executed");
296+
interpreterListener.executeRule(rule, pipeline);
282297
log.debug("[{}] rule `{}` matched running actions", msgId, rule.name());
283298
for (Statement statement : rule.then()) {
284299
statement.evaluate(context);
285300
if (context.hasEvaluationErrors()) {
286301
// if the last statement resulted in an error, do not continue to execute this rules
287302
final EvaluationContext.EvalError lastError = Iterables.getLast(context.evaluationErrors());
288303
appendProcessingError(rule, message, lastError.toString());
304+
interpreterListener.failExecuteRule(rule, pipeline);
289305
log.debug("Encountered evaluation error, skipping rest of the rule: {}",
290306
lastError);
291307
countRuleExecution(rule, pipeline, stage, "failed");
@@ -300,10 +316,12 @@ public Messages process(Messages messages) {
300316
// record that it is ok to proceed with the pipeline
301317
if ((stage.matchAll() && (rulesToRun.size() == stage.getRules().size()))
302318
|| (rulesToRun.size() > 0 && anyRulesMatched)) {
319+
interpreterListener.continuePipelineExecution(pipeline, stage);
303320
log.debug("[{}] stage {} for pipeline `{}` required match: {}, ok to proceed with next stage",
304321
msgId, stage.stage(), pipeline.name(), stage.matchAll() ? "all" : "either");
305322
} else {
306323
// no longer execute stages from this pipeline, the guard prevents it
324+
interpreterListener.stopPipelineExecution(pipeline, stage);
307325
log.debug("[{}] stage {} for pipeline `{}` required match: {}, NOT ok to proceed with next stage",
308326
msgId, stage.stage(), pipeline.name(), stage.matchAll() ? "all" : "either");
309327
pipelinesToSkip.add(pipeline);
@@ -315,6 +333,7 @@ public Messages process(Messages messages) {
315333
// 4a. also add all new messages from the context to the toProcess work list
316334
Iterables.addAll(toProcess, context.createdMessages());
317335
context.clearCreatedMessages();
336+
interpreterListener.exitStage(stage);
318337
}
319338

320339
}
@@ -346,6 +365,8 @@ public Messages process(Messages messages) {
346365
}
347366
}
348367
}
368+
369+
interpreterListener.finishProcessing();
349370
// 7. return the processed messages
350371
return new MessageCollection(fullyProcessed);
351372
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package org.graylog.plugins.pipelineprocessor.processors.listeners;
2+
3+
import org.graylog.plugins.pipelineprocessor.ast.Pipeline;
4+
import org.graylog.plugins.pipelineprocessor.ast.Rule;
5+
import org.graylog.plugins.pipelineprocessor.ast.Stage;
6+
import org.graylog2.plugin.Message;
7+
8+
import java.util.Set;
9+
10+
public interface InterpreterListener {
11+
void startProcessing();
12+
void finishProcessing();
13+
void processDefaultStream(Message message, Set<Pipeline> pipelines);
14+
void processStreams(Message message, Set<Pipeline> pipelines, Set<String> streams);
15+
void enterStage(Stage stage);
16+
void exitStage(Stage stage);
17+
void evaluateRule(Rule rule, Pipeline pipeline);
18+
void failEvaluateRule(Rule rule, Pipeline pipeline);
19+
void satisfyRule(Rule rule, Pipeline pipeline);
20+
void dissatisfyRule(Rule rule, Pipeline pipeline);
21+
void executeRule(Rule rule, Pipeline pipeline);
22+
void failExecuteRule(Rule rule, Pipeline pipeline);
23+
void continuePipelineExecution(Pipeline pipeline, Stage stage);
24+
void stopPipelineExecution(Pipeline pipeline, Stage stage);
25+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package org.graylog.plugins.pipelineprocessor.processors.listeners;
2+
3+
import org.graylog.plugins.pipelineprocessor.ast.Pipeline;
4+
import org.graylog.plugins.pipelineprocessor.ast.Rule;
5+
import org.graylog.plugins.pipelineprocessor.ast.Stage;
6+
import org.graylog2.plugin.Message;
7+
8+
import java.util.Set;
9+
10+
public class NoopInterpreterListener implements InterpreterListener {
11+
@Override
12+
public void startProcessing() {
13+
14+
}
15+
16+
@Override
17+
public void finishProcessing() {
18+
19+
}
20+
21+
@Override
22+
public void processDefaultStream(Message messageId, Set<Pipeline> pipelines) {
23+
24+
}
25+
26+
@Override
27+
public void processStreams(Message messageId, Set<Pipeline> pipelines, Set<String> streams) {
28+
29+
}
30+
31+
@Override
32+
public void enterStage(Stage stage) {
33+
34+
}
35+
36+
@Override
37+
public void exitStage(Stage stage) {
38+
39+
}
40+
41+
@Override
42+
public void evaluateRule(Rule rule, Pipeline pipeline) {
43+
44+
}
45+
46+
@Override
47+
public void failEvaluateRule(Rule rule, Pipeline pipeline) {
48+
49+
}
50+
51+
@Override
52+
public void satisfyRule(Rule rule, Pipeline pipeline) {
53+
54+
}
55+
56+
@Override
57+
public void dissatisfyRule(Rule rule, Pipeline pipeline) {
58+
59+
}
60+
61+
@Override
62+
public void executeRule(Rule rule, Pipeline pipeline) {
63+
64+
}
65+
66+
@Override
67+
public void failExecuteRule(Rule rule, Pipeline pipeline) {
68+
69+
}
70+
71+
@Override
72+
public void continuePipelineExecution(Pipeline pipeline, Stage stage) {
73+
74+
}
75+
76+
@Override
77+
public void stopPipelineExecution(Pipeline pipeline, Stage stage) {
78+
79+
}
80+
}

src/main/java/org/graylog/plugins/pipelineprocessor/rest/SimulationResponse.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.fasterxml.jackson.annotation.JsonCreator;
55
import com.fasterxml.jackson.annotation.JsonProperty;
66
import com.google.auto.value.AutoValue;
7+
import org.graylog.plugins.pipelineprocessor.simulator.PipelineInterpreterTrace;
78
import org.graylog2.rest.models.messages.responses.ResultMessageSummary;
89

910
import java.util.List;
@@ -14,14 +15,24 @@ public abstract class SimulationResponse {
1415
@JsonProperty
1516
public abstract List<ResultMessageSummary> messages();
1617

18+
@JsonProperty
19+
public abstract List<PipelineInterpreterTrace> simulationTrace();
20+
21+
@JsonProperty
22+
public abstract long tookMicroseconds();
23+
1724
public static SimulationResponse.Builder builder() {
1825
return new AutoValue_SimulationResponse.Builder();
1926
}
2027

2128
@JsonCreator
22-
public static SimulationResponse create (@JsonProperty("messages") List<ResultMessageSummary> messages) {
29+
public static SimulationResponse create (@JsonProperty("messages") List<ResultMessageSummary> messages,
30+
@JsonProperty("simulation_trace") List<PipelineInterpreterTrace> simulationTrace,
31+
@JsonProperty("took_microseconds") long tookMicroseconds) {
2332
return builder()
2433
.messages(messages)
34+
.simulationTrace(simulationTrace)
35+
.tookMicroseconds(tookMicroseconds)
2536
.build();
2637
}
2738

@@ -30,5 +41,9 @@ public abstract static class Builder {
3041
public abstract SimulationResponse build();
3142

3243
public abstract SimulationResponse.Builder messages(List<ResultMessageSummary> messages);
44+
45+
public abstract SimulationResponse.Builder simulationTrace(List<PipelineInterpreterTrace> trace);
46+
47+
public abstract SimulationResponse.Builder tookMicroseconds(long tookMicroseconds);
3348
}
3449
}

src/main/java/org/graylog/plugins/pipelineprocessor/rest/SimulatorResource.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.apache.shiro.authz.annotation.RequiresAuthentication;
77
import org.apache.shiro.authz.annotation.RequiresPermissions;
88
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter;
9+
import org.graylog.plugins.pipelineprocessor.simulator.PipelineInterpreterTracer;
910
import org.graylog2.database.NotFoundException;
1011
import org.graylog2.indexer.messages.DocumentNotFoundException;
1112
import org.graylog2.indexer.messages.Messages;
@@ -61,18 +62,19 @@ public SimulationResponse simulate(@ApiParam(name = "simulation", required = tru
6162
message.addStream(stream);
6263
}
6364

64-
List<ResultMessageSummary> simulationResults = new ArrayList<>();
65+
final List<ResultMessageSummary> simulationResults = new ArrayList<>();
66+
final PipelineInterpreterTracer pipelineInterpreterTracer = new PipelineInterpreterTracer();
6567

6668
for (MessageProcessor messageProcessor : orderedMessageProcessors) {
6769
if (messageProcessor instanceof PipelineInterpreter) {
68-
org.graylog2.plugin.Messages processedMessages = messageProcessor.process(message);
70+
org.graylog2.plugin.Messages processedMessages = ((PipelineInterpreter)messageProcessor).process(message, pipelineInterpreterTracer.getSimulatorInterpreterListener());
6971
for (Message processedMessage : processedMessages) {
7072
simulationResults.add(ResultMessageSummary.create(null, processedMessage.getFields(), ""));
7173
}
7274
}
7375
}
7476

75-
return SimulationResponse.create(simulationResults);
77+
return SimulationResponse.create(simulationResults, pipelineInterpreterTracer.getExecutionTrace(), pipelineInterpreterTracer.took());
7678
} catch (DocumentNotFoundException e) {
7779
throw new NotFoundException(e);
7880
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package org.graylog.plugins.pipelineprocessor.simulator;
2+
3+
import com.fasterxml.jackson.annotation.JsonAutoDetect;
4+
import com.fasterxml.jackson.annotation.JsonCreator;
5+
import com.fasterxml.jackson.annotation.JsonProperty;
6+
import com.google.auto.value.AutoValue;
7+
8+
@AutoValue
9+
@JsonAutoDetect
10+
public abstract class PipelineInterpreterTrace {
11+
@JsonProperty
12+
public abstract long time();
13+
14+
@JsonProperty
15+
public abstract String message();
16+
17+
@JsonCreator
18+
public static PipelineInterpreterTrace create (@JsonProperty("time") long time,
19+
@JsonProperty("message") String message) {
20+
return new AutoValue_PipelineInterpreterTrace(time, message);
21+
}
22+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.graylog.plugins.pipelineprocessor.simulator;
2+
3+
import com.google.common.base.Stopwatch;
4+
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
import java.util.concurrent.TimeUnit;
8+
9+
public class PipelineInterpreterTracer {
10+
private final List<PipelineInterpreterTrace> executionTrace;
11+
private final Stopwatch timer;
12+
private final SimulatorInterpreterListener simulatorInterpreterListener;
13+
14+
public PipelineInterpreterTracer() {
15+
executionTrace = new ArrayList<>();
16+
timer = Stopwatch.createUnstarted();
17+
simulatorInterpreterListener = new SimulatorInterpreterListener(this);
18+
}
19+
20+
public SimulatorInterpreterListener getSimulatorInterpreterListener() {
21+
return simulatorInterpreterListener;
22+
}
23+
24+
public List<PipelineInterpreterTrace> getExecutionTrace() {
25+
return executionTrace;
26+
}
27+
28+
public long took() {
29+
return timer.elapsed(TimeUnit.MICROSECONDS);
30+
}
31+
32+
public void addTrace(String message) {
33+
executionTrace.add(PipelineInterpreterTrace.create(timer.elapsed(TimeUnit.MICROSECONDS), message));
34+
}
35+
36+
public void startProcessing(String message) {
37+
timer.start();
38+
addTrace(message);
39+
}
40+
41+
public void finishProcessing(String message) {
42+
timer.stop();
43+
addTrace(message);
44+
}
45+
}

0 commit comments

Comments
 (0)