Skip to content

Commit f050d3d

Browse files
committed
wip
1 parent 92fc013 commit f050d3d

File tree

13 files changed

+142
-287
lines changed

13 files changed

+142
-287
lines changed

dd-java-agent/appsec/src/main/java/com/datadog/appsec/AppSecSystem.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,12 @@ private static void doStart(SubscriptionService gw, SharedCommunicationObjects s
7070
EventDispatcher eventDispatcher = new EventDispatcher();
7171
REPLACEABLE_EVENT_PRODUCER.replaceEventProducerService(eventDispatcher);
7272

73-
ApiSecurityRequestSampler requestSampler = new ApiSecurityRequestSampler(config);
73+
ApiSecurityRequestSampler requestSampler;
7474
if (Config.get().isApiSecurityEnabled()) {
75-
SpanPostProcessor.Holder.INSTANCE = new AppSecSpanPostProcessor();
75+
requestSampler = new ApiSecurityRequestSampler();
76+
SpanPostProcessor.Holder.INSTANCE = new AppSecSpanPostProcessor(requestSampler, REPLACEABLE_EVENT_PRODUCER);
77+
} else {
78+
requestSampler = new ApiSecurityRequestSampler.NoOp();
7679
}
7780

7881
ConfigurationPoller configurationPoller = sco.configurationPoller(config);

dd-java-agent/appsec/src/main/java/com/datadog/appsec/api/security/ApiAccessTracker.java

Lines changed: 0 additions & 18 deletions
This file was deleted.

dd-java-agent/appsec/src/main/java/com/datadog/appsec/api/security/ApiSecurityRequestSampler.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.datadog.appsec.api.security;
22

33
import com.datadog.appsec.gateway.AppSecRequestContext;
4-
import datadog.trace.bootstrap.instrumentation.api.Tags;
54
import datadog.trace.util.NonBlockingSemaphore;
65

76
import java.util.Deque;
@@ -11,15 +10,20 @@
1110

1211
public class ApiSecurityRequestSampler {
1312

14-
private static final int MAX_POST_PROCESSING_TASKS = 8;
13+
/**
14+
* A maximum number of request contexts we'll keep open past the end of request at any given time. This will avoid
15+
* excessive memory usage in case of a high number of concurrent requests, and should also prevent memory leaks in
16+
* case of a bug.
17+
*/
18+
private static final int MAX_POST_PROCESSING_TASKS = 4;
1519
private static final int INTERVAL_SECONDS = 30;
1620
private static final int MAX_SIZE = 4096;
1721
private final Map<Long, Long> apiAccessMap; // Map<hash, timestamp>
1822
private final Deque<Long> apiAccessQueue; // hashes ordered by access time
1923
private final long expirationTimeInMs;
2024
private final int capacity;
2125

22-
private final NonBlockingSemaphore counter = NonBlockingSemaphore.withPermitCount(MAX_POST_PROCESSING_TASKS);
26+
final NonBlockingSemaphore counter = NonBlockingSemaphore.withPermitCount(MAX_POST_PROCESSING_TASKS);
2327

2428
public ApiSecurityRequestSampler() {
2529
this(MAX_SIZE, INTERVAL_SECONDS * 1000);
@@ -32,12 +36,7 @@ public ApiSecurityRequestSampler(int capacity, long expirationTimeInMs) {
3236
this.apiAccessQueue = new ConcurrentLinkedDeque<>();
3337
}
3438

35-
public void preSampleRequest(final AppSecRequestContext ctx, final Map<String, Object> tags) {
36-
final Object route = tags.get(Tags.HTTP_ROUTE);
37-
if (route instanceof String) {
38-
ctx.setRoute((String) route);
39-
}
40-
39+
public void preSampleRequest(final AppSecRequestContext ctx) {
4140
if (!isValid(ctx)) {
4241
return;
4342
}
@@ -138,4 +137,19 @@ private long computeApiHash(String route, String method, int statusCode) {
138137
return result;
139138
}
140139

140+
public static final class NoOp extends ApiSecurityRequestSampler {
141+
public NoOp() {
142+
super(0, 0);
143+
}
144+
145+
@Override
146+
public void preSampleRequest(AppSecRequestContext ctx) {
147+
}
148+
149+
@Override
150+
public boolean sampleRequest(AppSecRequestContext ctx) {
151+
return false;
152+
}
153+
}
154+
141155
}

dd-java-agent/appsec/src/main/java/com/datadog/appsec/api/security/AppSecSpanPostProcessor.java

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,69 +7,77 @@
77
import com.datadog.appsec.event.data.SingletonDataBundle;
88
import com.datadog.appsec.gateway.AppSecRequestContext;
99
import com.datadog.appsec.gateway.GatewayContext;
10-
import datadog.trace.api.Config;
1110
import datadog.trace.api.gateway.RequestContext;
1211
import datadog.trace.api.gateway.RequestContextSlot;
1312
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
1413
import datadog.trace.bootstrap.instrumentation.api.SpanPostProcessor;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
1516

1617
import java.util.Collections;
1718
import java.util.function.BooleanSupplier;
1819

1920
public class AppSecSpanPostProcessor implements SpanPostProcessor {
2021

22+
private static final Logger log = LoggerFactory.getLogger(AppSecSpanPostProcessor.class);
23+
private final ApiSecurityRequestSampler sampler;
24+
private final EventProducerService producerService;
25+
26+
public AppSecSpanPostProcessor(ApiSecurityRequestSampler sampler, EventProducerService producerService) {
27+
this.sampler = sampler;
28+
this.producerService = producerService;
29+
}
30+
2131
@Override
2232
public void process(AgentSpan span, BooleanSupplier timeoutCheck) {
23-
if (timeoutCheck.getAsBoolean()) {
33+
final RequestContext ctx_ = span.getRequestContext();
34+
if (ctx_ == null) {
2435
return;
2536
}
26-
final RequestContext ctx = span.getRequestContext();
37+
final AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
2738
if (ctx == null) {
2839
return;
2940
}
30-
final AppSecRequestContext appsecCtx = ctx.getData(RequestContextSlot.APPSEC);
31-
if (appsecCtx == null) {
41+
42+
if (!ctx.isKeepOpenForApiSecurityPostProcessing()) {
3243
return;
3344
}
3445

35-
maybeExtractSchemas(appsecCtx);
36-
ctx.close();
37-
// Decrease the counter to allow the next request to be post-processed
38-
postProcessingCounter.release();
46+
try {
47+
if (timeoutCheck.getAsBoolean()) {
48+
return;
49+
}
50+
if (!sampler.sampleRequest(ctx)) {
51+
return;
52+
}
53+
maybeExtractSchemas(ctx);
54+
} finally {
55+
ctx.setKeepOpenForApiSecurityPostProcessing(false);
56+
try {
57+
ctx.close();
58+
} catch (Exception e) {
59+
log.debug("Error closing AppSecRequestContext", e);
60+
}
61+
sampler.counter.release();
62+
}
3963
}
4064

4165
private void maybeExtractSchemas(AppSecRequestContext ctx) {
42-
boolean extractSchema = false;
43-
if (Config.get().isApiSecurityEnabled() && requestSampler != null) {
44-
extractSchema = requestSampler.sampleRequest(ctx);
45-
}
46-
47-
if (!extractSchema) {
48-
return;
49-
}
50-
51-
while (true) {
52-
EventProducerService.DataSubscriberInfo subInfo = requestEndSubInfo;
53-
if (subInfo == null) {
54-
subInfo = producerService.getDataSubscribers(KnownAddresses.WAF_CONTEXT_PROCESSOR);
55-
requestEndSubInfo = subInfo;
56-
}
57-
if (subInfo == null || subInfo.isEmpty()) {
66+
final EventProducerService.DataSubscriberInfo sub = producerService.getDataSubscribers(KnownAddresses.WAF_CONTEXT_PROCESSOR);
67+
if (sub == null|| sub.isEmpty()) {
5868
return;
5969
}
6070

61-
DataBundle bundle =
71+
final DataBundle bundle =
6272
new SingletonDataBundle<>(
6373
KnownAddresses.WAF_CONTEXT_PROCESSOR,
6474
Collections.singletonMap("extract-schema", true));
6575
try {
6676
GatewayContext gwCtx = new GatewayContext(false);
67-
producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx);
68-
return;
77+
producerService.publishDataEvent(sub, ctx, bundle, gwCtx);
6978
} catch (ExpiredSubscriberInfoException e) {
70-
requestEndSubInfo = null;
79+
log.debug("Subscriber info expired", e);
7180
}
72-
}
7381
}
7482

7583
}

dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/AppSecRequestContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,10 @@ public void setKeepOpenForApiSecurityPostProcessing(final boolean flag) {
399399
this.keepOpenForApiSecurityPostProcessing = flag;
400400
}
401401

402+
public boolean isKeepOpenForApiSecurityPostProcessing() {
403+
return this.keepOpenForApiSecurityPostProcessing;
404+
}
405+
402406
void addRequestHeader(String name, String value) {
403407
if (finishedRequestHeaders) {
404408
throw new IllegalStateException("Request headers were said to be finished before");

dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -831,7 +831,12 @@ private NoopFlow onRequestEnded(RequestContext ctx_, IGSpanInfo spanInfo) {
831831
}
832832
}
833833

834-
requestSampler.preSampleRequest(ctx, tags);
834+
// API Security sampling requires http.route tag.
835+
final Object route = tags.get(Tags.HTTP_ROUTE);
836+
if (route instanceof String) {
837+
ctx.setRoute((String) route);
838+
requestSampler.preSampleRequest(ctx);
839+
}
835840

836841
return NoopFlow.INSTANCE;
837842
}

dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/api/security/ApiAccessTrackerTest.groovy

Lines changed: 0 additions & 55 deletions
This file was deleted.
Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,73 @@
11
package com.datadog.appsec.api.security
22

33
import com.datadog.appsec.gateway.AppSecRequestContext
4-
import datadog.trace.api.Config
54
import datadog.trace.test.util.DDSpecification
65

76
class ApiSecurityRequestSamplerTest extends DDSpecification {
87

9-
def config = Mock(Config) {
10-
isApiSecurityEnabled() >> true
8+
void 'happy path with single request'() {
9+
given:
10+
def ctx = Mock(AppSecRequestContext)
11+
def sampler = new ApiSecurityRequestSampler()
12+
13+
when:
14+
sampler.preSampleRequest(ctx)
15+
16+
then:
17+
_ * ctx.getRoute() >> 'route1'
18+
_ * ctx.getMethod() >> 'GET'
19+
_ * ctx.getResponseStatus() >> 200
20+
1 * ctx.setKeepOpenForApiSecurityPostProcessing(true)
21+
0 * _
22+
23+
when:
24+
def sampleDecision = sampler.sampleRequest(ctx)
25+
26+
then:
27+
sampleDecision
28+
_ * ctx.getRoute() >> 'route1'
29+
_ * ctx.getMethod() >> 'GET'
30+
_ * ctx.getResponseStatus() >> 200
31+
_ * ctx.isKeepOpenForApiSecurityPostProcessing() >> true
32+
0 * _
1133
}
1234

13-
def sampler = new ApiSecurityRequestSampler(config)
35+
void 'second request is not sampled for the same endpoint'() {
36+
given:
37+
AppSecRequestContext ctx1 = Mock(AppSecRequestContext)
38+
AppSecRequestContext ctx2 = Mock(AppSecRequestContext)
39+
def sampler = new ApiSecurityRequestSampler()
1440

15-
void 'Api Security Sample Request'() {
1641
when:
17-
def span = Mock(AppSecRequestContext) {
18-
getRoute() >> route
19-
getMethod() >> method
20-
getResponseStatus() >> statusCode
21-
}
22-
def sample = sampler.sampleRequest(span)
42+
sampler.preSampleRequest(ctx1)
43+
def sampleDecision = sampler.sampleRequest(ctx1)
2344

2445
then:
25-
sample == sampleResult
26-
27-
where:
28-
method | route | statusCode | sampleResult
29-
'GET' | 'route1' | 200 | true
30-
'GET' | 'route2' | null | false
31-
'GET' | null | 404 | false
32-
'TOP' | 999 | 404 | true
33-
null | '999' | 404 | false
46+
sampleDecision
47+
_ * ctx1.getRoute() >> 'route1'
48+
_ * ctx1.getMethod() >> 'GET'
49+
_ * ctx1.getResponseStatus() >> 200
50+
_ * _
51+
52+
when:
53+
sampler.preSampleRequest(ctx2)
54+
55+
then:
56+
_ * ctx2.getRoute() >> 'route1'
57+
_ * ctx2.getMethod() >> 'GET'
58+
_ * ctx2.getResponseStatus() >> 200
59+
0 * ctx2.setKeepOpenForApiSecurityPostProcessing(_)
60+
0 * _
61+
62+
when:
63+
sampleDecision = sampler.sampleRequest(ctx2)
64+
65+
then:
66+
!sampleDecision
67+
_ * ctx2.getRoute() >> 'route1'
68+
_ * ctx2.getMethod() >> 'GET'
69+
_ * ctx2.getResponseStatus() >> 200
70+
0 * _
3471
}
35-
}
72+
73+
}

0 commit comments

Comments
 (0)