Skip to content

Commit ee45ab8

Browse files
Added AppSec post-processing
1 parent a7d6537 commit ee45ab8

File tree

9 files changed

+223
-2
lines changed

9 files changed

+223
-2
lines changed

dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static datadog.trace.api.UserIdCollectionMode.DISABLED;
1111
import static datadog.trace.api.UserIdCollectionMode.SDK;
1212
import static datadog.trace.api.telemetry.LogCollector.SEND_TELEMETRY;
13+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
1314
import static datadog.trace.util.Strings.toHexString;
1415

1516
import com.datadog.appsec.AppSecSystem;
@@ -163,6 +164,7 @@ public void init() {
163164
subscriptionService.registerCallback(EVENTS.shellCmd(), this::onShellCmd);
164165
subscriptionService.registerCallback(EVENTS.user(), this::onUser);
165166
subscriptionService.registerCallback(EVENTS.loginEvent(), this::onLoginEvent);
167+
subscriptionService.registerCallback(EVENTS.postProcessing(), this::onPostProcessing);
166168

167169
if (additionalIGEvents.contains(EVENTS.requestPathParams())) {
168170
subscriptionService.registerCallback(EVENTS.requestPathParams(), this::onRequestPathParams);
@@ -890,6 +892,10 @@ private void onRequestHeader(RequestContext ctx_, String name, String value) {
890892
}
891893
}
892894

895+
private void onPostProcessing(RequestContext ctx_) {
896+
// Do AppSec post-processing
897+
}
898+
893899
public void stop() {
894900
subscriptionService.reset();
895901
}
@@ -1015,6 +1021,7 @@ private Flow<Void> maybePublishRequestData(AppSecRequestContext ctx) {
10151021

10161022
try {
10171023
GatewayContext gwCtx = new GatewayContext(false);
1024+
activeSpan().getLocalRootSpan().setRequiresPostProcessing(true);
10181025
return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx);
10191026
} catch (ExpiredSubscriberInfoException e) {
10201027
this.initialReqDataSubInfo = null;

dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/gateway/GatewayBridgeSpecification.groovy

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import datadog.trace.test.util.DDSpecification
2727

2828
import java.util.function.BiConsumer
2929
import java.util.function.BiFunction
30+
import java.util.function.Consumer
3031
import java.util.function.Function
3132
import java.util.function.Supplier
3233

@@ -105,6 +106,7 @@ class GatewayBridgeSpecification extends DDSpecification {
105106
BiFunction<RequestContext, String[], Flow<Void>> execCmdCB
106107
BiFunction<RequestContext, String, Flow<Void>> shellCmdCB
107108
TriFunction<RequestContext, UserIdCollectionMode, String, Flow<Void>> userCB
109+
Consumer<RequestContext> postProcessingCB
108110
LoginEventCallback loginEventCB
109111

110112
void setup() {
@@ -446,6 +448,7 @@ class GatewayBridgeSpecification extends DDSpecification {
446448
1 * ig.registerCallback(EVENTS.shellCmd(), _) >> { shellCmdCB = it[1]; null }
447449
1 * ig.registerCallback(EVENTS.user(), _) >> { userCB = it[1]; null }
448450
1 * ig.registerCallback(EVENTS.loginEvent(), _) >> { loginEventCB = it[1]; null }
451+
1 * ig.registerCallback(EVENTS.postProcessing(), _) >> { postProcessingCB = it[1]; null }
449452
0 * ig.registerCallback(_, _)
450453

451454
bridge.init()

dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import datadog.trace.common.writer.ddagent.DDAgentMapperDiscovery;
1515
import datadog.trace.common.writer.ddagent.Prioritization;
1616
import datadog.trace.core.monitor.HealthMetrics;
17+
import datadog.trace.core.postprocessor.AppSecSpanPostProcessor;
18+
import datadog.trace.core.postprocessor.SpanPostProcessor;
1719
import java.util.concurrent.TimeUnit;
1820
import okhttp3.HttpUrl;
1921
import okhttp3.OkHttpClient;
@@ -153,6 +155,7 @@ public DDAgentWriter build() {
153155
final DDAgentMapperDiscovery mapperDiscovery = new DDAgentMapperDiscovery(featureDiscovery);
154156
final PayloadDispatcher dispatcher =
155157
new PayloadDispatcherImpl(mapperDiscovery, agentApi, healthMetrics, monitoring);
158+
final SpanPostProcessor spanPostProcessor = new AppSecSpanPostProcessor();
156159
final TraceProcessingWorker traceProcessingWorker =
157160
new TraceProcessingWorker(
158161
traceBufferSize,
@@ -163,7 +166,7 @@ public DDAgentWriter build() {
163166
flushIntervalMilliseconds,
164167
TimeUnit.MILLISECONDS,
165168
singleSpanSampler,
166-
null);
169+
spanPostProcessor);
167170

168171
return new DDAgentWriter(
169172
traceProcessingWorker,

dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import datadog.trace.common.writer.ddagent.Prioritization;
1010
import datadog.trace.common.writer.ddintake.DDIntakeMapperDiscovery;
1111
import datadog.trace.core.monitor.HealthMetrics;
12+
import datadog.trace.core.postprocessor.AppSecSpanPostProcessor;
13+
import datadog.trace.core.postprocessor.SpanPostProcessor;
1214
import java.util.EnumMap;
1315
import java.util.Map;
1416
import java.util.concurrent.TimeUnit;
@@ -113,6 +115,8 @@ public DDIntakeWriter build() {
113115
dispatcher = new CompositePayloadDispatcher(dispatchers);
114116
}
115117

118+
SpanPostProcessor spanPostProcessor = new AppSecSpanPostProcessor();
119+
116120
final TraceProcessingWorker traceProcessingWorker =
117121
new TraceProcessingWorker(
118122
traceBufferSize,
@@ -123,7 +127,7 @@ public DDIntakeWriter build() {
123127
flushIntervalMilliseconds,
124128
TimeUnit.MILLISECONDS,
125129
singleSpanSampler,
126-
null);
130+
spanPostProcessor);
127131

128132
return new DDIntakeWriter(
129133
traceProcessingWorker,
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package datadog.trace.core.postprocessor;
2+
3+
import static datadog.trace.api.gateway.Events.EVENTS;
4+
5+
import datadog.trace.api.gateway.CallbackProvider;
6+
import datadog.trace.api.gateway.RequestContext;
7+
import datadog.trace.api.gateway.RequestContextSlot;
8+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
9+
import datadog.trace.core.DDSpan;
10+
import java.util.function.BooleanSupplier;
11+
import java.util.function.Consumer;
12+
13+
public class AppSecSpanPostProcessor implements SpanPostProcessor {
14+
15+
// For testing purpose
16+
protected AgentTracer.TracerAPI tracer() {
17+
return AgentTracer.get();
18+
}
19+
20+
@Override
21+
public boolean process(DDSpan span, BooleanSupplier timeoutCheck) {
22+
CallbackProvider cbp = tracer().getCallbackProvider(RequestContextSlot.APPSEC);
23+
if (cbp == null) {
24+
return false;
25+
}
26+
27+
RequestContext ctx = span.getRequestContext();
28+
if (ctx == null) {
29+
return false;
30+
}
31+
32+
Consumer<RequestContext> postProcessingCallback = cbp.getCallback(EVENTS.postProcessing());
33+
if (postProcessingCallback == null) {
34+
return false;
35+
}
36+
37+
postProcessingCallback.accept(ctx);
38+
return true;
39+
}
40+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package datadog.trace.core.postprocessor
2+
3+
import datadog.trace.api.gateway.CallbackProvider
4+
import datadog.trace.api.gateway.RequestContext
5+
import datadog.trace.api.gateway.RequestContextSlot
6+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer
7+
import datadog.trace.core.DDSpan
8+
import datadog.trace.core.DDSpanContext
9+
import datadog.trace.core.PendingTrace
10+
import datadog.trace.test.util.DDSpecification
11+
12+
import java.util.function.Consumer
13+
import java.util.function.BooleanSupplier
14+
import static datadog.trace.api.gateway.Events.EVENTS
15+
16+
class AppSecSpanPostProcessorTest extends DDSpecification {
17+
def "process returns false if span context is null"() {
18+
given:
19+
def processor = new AppSecSpanPostProcessor()
20+
def span = Mock(DDSpan)
21+
def timeoutCheck = Mock(BooleanSupplier)
22+
(span.context()) >> null
23+
24+
expect:
25+
!processor.process(span, timeoutCheck)
26+
}
27+
28+
def "process returns false if callback provider is null"() {
29+
given:
30+
AgentTracer.TracerAPI tracer = Mock(AgentTracer.TracerAPI)
31+
tracer.getCallbackProvider(RequestContextSlot.APPSEC) >> null
32+
def processor = new AppSecSpanPostProcessor() {
33+
@Override
34+
protected AgentTracer.TracerAPI tracer() {
35+
return tracer
36+
}
37+
}
38+
def span = Mock(DDSpan) {
39+
context() >> Mock(DDSpanContext)
40+
}
41+
def timeoutCheck = Mock(BooleanSupplier)
42+
43+
expect:
44+
!processor.process(span, timeoutCheck)
45+
}
46+
47+
def "process returns false if request context is null"() {
48+
given:
49+
AgentTracer.TracerAPI tracer = Mock(AgentTracer.TracerAPI)
50+
def cbp = Mock(CallbackProvider)
51+
tracer.getCallbackProvider(RequestContextSlot.APPSEC) >> cbp
52+
def processor = new AppSecSpanPostProcessor() {
53+
@Override
54+
protected AgentTracer.TracerAPI tracer() {
55+
return tracer
56+
}
57+
}
58+
def span = Mock(DDSpan) {
59+
context() >> Mock(DDSpanContext)
60+
getRequestContext() >> null
61+
}
62+
def timeoutCheck = Mock(BooleanSupplier)
63+
64+
expect:
65+
!processor.process(span, timeoutCheck)
66+
}
67+
68+
def "process returns false if post-processing callback is null"() {
69+
given:
70+
AgentTracer.TracerAPI tracer = Mock(AgentTracer.TracerAPI)
71+
def cbp = Mock(CallbackProvider)
72+
tracer.getCallbackProvider(RequestContextSlot.APPSEC) >> cbp
73+
cbp.getCallback(EVENTS.postProcessing()) >> null
74+
def processor = new AppSecSpanPostProcessor() {
75+
@Override
76+
protected AgentTracer.TracerAPI tracer() {
77+
return tracer
78+
}
79+
}
80+
def span = Mock(DDSpan) {
81+
context() >> Mock(DDSpanContext)
82+
getRequestContext() >> Mock(RequestContext)
83+
}
84+
def timeoutCheck = Mock(BooleanSupplier)
85+
86+
expect:
87+
!processor.process(span, timeoutCheck)
88+
}
89+
90+
def "process returns true when all components are properly configured"() {
91+
given:
92+
def callback = Mock(Consumer)
93+
AgentTracer.TracerAPI tracer = Mock(AgentTracer.TracerAPI)
94+
def cbp = Mock(CallbackProvider)
95+
tracer.getCallbackProvider(RequestContextSlot.APPSEC) >> cbp
96+
cbp.getCallback(EVENTS.postProcessing()) >> callback
97+
def processor = new AppSecSpanPostProcessor() {
98+
@Override
99+
protected AgentTracer.TracerAPI tracer() {
100+
return tracer
101+
}
102+
}
103+
def span = DDSpan.create("test", 0, Mock(DDSpanContext) {
104+
isRequiresPostProcessing() >> true
105+
getTraceCollector() >> Mock(PendingTrace) {
106+
getCurrentTimeNano() >> 0
107+
}
108+
getRequestContext() >> Mock(RequestContext)
109+
}, [])
110+
def timeoutCheck = Mock(BooleanSupplier)
111+
112+
when:
113+
boolean result = processor.process(span, timeoutCheck)
114+
115+
then:
116+
result
117+
1 * callback.accept(_)
118+
}
119+
}

internal-api/src/main/java/datadog/trace/api/gateway/Events.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.concurrent.atomic.AtomicInteger;
1111
import java.util.function.BiConsumer;
1212
import java.util.function.BiFunction;
13+
import java.util.function.Consumer;
1314
import java.util.function.Function;
1415
import java.util.function.Supplier;
1516

@@ -314,6 +315,18 @@ public EventType<BiFunction<RequestContext, String, Flow<Void>>> shellCmd() {
314315
return (EventType<BiFunction<RequestContext, String, Flow<Void>>>) SHELL_CMD;
315316
}
316317

318+
static final int POST_PROCESSING_ID = 26;
319+
320+
@SuppressWarnings("rawtypes")
321+
private static final EventType POST_PROCESSING =
322+
new ET<>("trace.post.processing", POST_PROCESSING_ID);
323+
324+
/** The span post-processing */
325+
@SuppressWarnings("unchecked")
326+
public EventType<Consumer<RequestContext>> postProcessing() {
327+
return (EventType<Consumer<RequestContext>>) POST_PROCESSING;
328+
}
329+
317330
static final int MAX_EVENTS = nextId.get();
318331

319332
private static final class ET<T> extends EventType<T> {

internal-api/src/main/java/datadog/trace/api/gateway/InstrumentationGateway.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static datadog.trace.api.gateway.Events.LOGIN_EVENT_ID;
1111
import static datadog.trace.api.gateway.Events.MAX_EVENTS;
1212
import static datadog.trace.api.gateway.Events.NETWORK_CONNECTION_ID;
13+
import static datadog.trace.api.gateway.Events.POST_PROCESSING_ID;
1314
import static datadog.trace.api.gateway.Events.REQUEST_BODY_CONVERTED_ID;
1415
import static datadog.trace.api.gateway.Events.REQUEST_BODY_DONE_ID;
1516
import static datadog.trace.api.gateway.Events.REQUEST_BODY_START_ID;
@@ -38,6 +39,7 @@
3839
import java.util.concurrent.atomic.AtomicReferenceArray;
3940
import java.util.function.BiConsumer;
4041
import java.util.function.BiFunction;
42+
import java.util.function.Consumer;
4143
import java.util.function.Function;
4244
import java.util.function.Supplier;
4345
import org.slf4j.Logger;
@@ -468,6 +470,18 @@ public Flow<Void> apply(RequestContext ctx, String[] arg) {
468470
}
469471
}
470472
};
473+
case POST_PROCESSING_ID:
474+
return (C)
475+
new Consumer<RequestContext>() {
476+
@Override
477+
public void accept(RequestContext ctx) {
478+
try {
479+
((Consumer<RequestContext>) callback).accept(ctx);
480+
} catch (Throwable t) {
481+
log.warn("Callback for {} threw.", eventType, t);
482+
}
483+
}
484+
};
471485
default:
472486
log.warn("Unwrapped callback for {}", eventType);
473487
return callback;

0 commit comments

Comments
 (0)