Skip to content

Commit 92fc013

Browse files
committed
wip
1 parent b868b8a commit 92fc013

File tree

16 files changed

+279
-291
lines changed

16 files changed

+279
-291
lines changed

dd-java-agent/appsec/src/main/java/com/datadog/appsec/AppSecSystem.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.datadog.appsec;
22

33
import com.datadog.appsec.api.security.ApiSecurityRequestSampler;
4+
import com.datadog.appsec.api.security.AppSecSpanPostProcessor;
45
import com.datadog.appsec.blocking.BlockingServiceImpl;
56
import com.datadog.appsec.config.AppSecConfigService;
67
import com.datadog.appsec.config.AppSecConfigServiceImpl;
@@ -29,6 +30,8 @@
2930
import java.util.Set;
3031
import java.util.concurrent.atomic.AtomicBoolean;
3132
import java.util.stream.Collectors;
33+
34+
import datadog.trace.bootstrap.instrumentation.api.SpanPostProcessor;
3235
import org.slf4j.Logger;
3336
import org.slf4j.LoggerFactory;
3437

@@ -68,6 +71,9 @@ private static void doStart(SubscriptionService gw, SharedCommunicationObjects s
6871
REPLACEABLE_EVENT_PRODUCER.replaceEventProducerService(eventDispatcher);
6972

7073
ApiSecurityRequestSampler requestSampler = new ApiSecurityRequestSampler(config);
74+
if (Config.get().isApiSecurityEnabled()) {
75+
SpanPostProcessor.Holder.INSTANCE = new AppSecSpanPostProcessor();
76+
}
7177

7278
ConfigurationPoller configurationPoller = sco.configurationPoller(config);
7379
// may throw and abort startup
Lines changed: 0 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,5 @@
11
package com.datadog.appsec.api.security;
22

3-
import java.util.Deque;
4-
import java.util.Map;
5-
import java.util.concurrent.ConcurrentHashMap;
6-
import java.util.concurrent.ConcurrentLinkedDeque;
7-
83
/**
94
* The ApiAccessTracker class provides a mechanism to track API access events, managing them within
105
* a specified capacity limit. Each event is associated with a unique combination of route, method,
@@ -19,92 +14,5 @@
1914
* within the specified capacity limit, with older, less relevant events being discarded.
2015
*/
2116
public class ApiAccessTracker {
22-
private static final int INTERVAL_SECONDS = 30;
23-
private static final int MAX_SIZE = 4096;
24-
private final Map<Long, Long> apiAccessMap; // Map<hash, timestamp>
25-
private final Deque<Long> apiAccessQueue; // hashes ordered by access time
26-
private final long expirationTimeInMs;
27-
private final int capacity;
28-
29-
public ApiAccessTracker() {
30-
this(MAX_SIZE, INTERVAL_SECONDS * 1000);
31-
}
32-
33-
public ApiAccessTracker(int capacity, long expirationTimeInMs) {
34-
this.capacity = capacity;
35-
this.expirationTimeInMs = expirationTimeInMs;
36-
this.apiAccessMap = new ConcurrentHashMap<>();
37-
this.apiAccessQueue = new ConcurrentLinkedDeque<>();
38-
}
39-
40-
/**
41-
* Updates the API access log with the given route, method, and status code. If the record already
42-
* exists and is outdated, it is updated by moving to the end of the list. If the record does not
43-
* exist, a new record is added. If the capacity limit is reached, the oldest record is removed.
44-
* This method should not be called concurrently by multiple threads, due absence of additional
45-
* synchronization for updating data structures is not required.
46-
*
47-
* @param route The route of the API endpoint request
48-
* @param method The method of the API request
49-
* @param statusCode The HTTP response status code of the API request
50-
* @return return true if the record was updated or added, false otherwise
51-
*/
52-
public boolean updateApiAccessIfExpired(String route, String method, int statusCode) {
53-
long currentTime = System.currentTimeMillis();
54-
long hash = computeApiHash(route, method, statusCode);
55-
56-
// New or updated record
57-
boolean isNewOrUpdated = false;
58-
if (!apiAccessMap.containsKey(hash)
59-
|| currentTime - apiAccessMap.get(hash) > expirationTimeInMs) {
60-
61-
cleanupExpiredEntries(currentTime);
62-
63-
apiAccessMap.put(hash, currentTime); // Update timestamp
64-
// move hash to the end of the queue
65-
apiAccessQueue.remove(hash);
66-
apiAccessQueue.addLast(hash);
67-
isNewOrUpdated = true;
68-
69-
// Remove the oldest hash if capacity is reached
70-
while (apiAccessMap.size() > this.capacity) {
71-
Long oldestHash = apiAccessQueue.pollFirst();
72-
if (oldestHash != null) {
73-
apiAccessMap.remove(oldestHash);
74-
}
75-
}
76-
}
77-
78-
return isNewOrUpdated;
79-
}
80-
81-
public boolean isApiAccessExpired(String route, String method, int statusCode) {
82-
long currentTime = System.currentTimeMillis();
83-
long hash = computeApiHash(route, method, statusCode);
84-
return !apiAccessMap.containsKey(hash)
85-
|| currentTime - apiAccessMap.get(hash) > expirationTimeInMs;
86-
}
87-
88-
private void cleanupExpiredEntries(long currentTime) {
89-
while (!apiAccessQueue.isEmpty()) {
90-
Long oldestHash = apiAccessQueue.peekFirst();
91-
if (oldestHash == null) break;
92-
93-
Long lastAccessTime = apiAccessMap.get(oldestHash);
94-
if (lastAccessTime == null || currentTime - lastAccessTime > expirationTimeInMs) {
95-
apiAccessQueue.pollFirst(); // remove from head
96-
apiAccessMap.remove(oldestHash);
97-
} else {
98-
break; // is up-to-date
99-
}
100-
}
101-
}
10217

103-
private long computeApiHash(String route, String method, int statusCode) {
104-
long result = 17;
105-
result = 31 * result + route.hashCode();
106-
result = 31 * result + method.hashCode();
107-
result = 31 * result + statusCode;
108-
return result;
109-
}
11018
}
Lines changed: 114 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,141 @@
11
package com.datadog.appsec.api.security;
22

33
import com.datadog.appsec.gateway.AppSecRequestContext;
4-
import datadog.trace.api.Config;
4+
import datadog.trace.bootstrap.instrumentation.api.Tags;
5+
import datadog.trace.util.NonBlockingSemaphore;
6+
7+
import java.util.Deque;
8+
import java.util.Map;
9+
import java.util.concurrent.ConcurrentHashMap;
10+
import java.util.concurrent.ConcurrentLinkedDeque;
511

612
public class ApiSecurityRequestSampler {
713

8-
private final ApiAccessTracker apiAccessTracker;
9-
private final Config config;
14+
private static final int MAX_POST_PROCESSING_TASKS = 8;
15+
private static final int INTERVAL_SECONDS = 30;
16+
private static final int MAX_SIZE = 4096;
17+
private final Map<Long, Long> apiAccessMap; // Map<hash, timestamp>
18+
private final Deque<Long> apiAccessQueue; // hashes ordered by access time
19+
private final long expirationTimeInMs;
20+
private final int capacity;
21+
22+
private final NonBlockingSemaphore counter = NonBlockingSemaphore.withPermitCount(MAX_POST_PROCESSING_TASKS);
1023

11-
public ApiSecurityRequestSampler(final Config config) {
12-
this.apiAccessTracker = new ApiAccessTracker();
13-
this.config = config;
24+
public ApiSecurityRequestSampler() {
25+
this(MAX_SIZE, INTERVAL_SECONDS * 1000);
1426
}
1527

16-
public boolean sampleRequest(AppSecRequestContext ctx) {
28+
public ApiSecurityRequestSampler(int capacity, long expirationTimeInMs) {
29+
this.capacity = capacity;
30+
this.expirationTimeInMs = expirationTimeInMs;
31+
this.apiAccessMap = new ConcurrentHashMap<>();
32+
this.apiAccessQueue = new ConcurrentLinkedDeque<>();
33+
}
34+
35+
public void preSampleRequest(final AppSecRequestContext ctx, final Map<String, Object> tags) {
36+
final Object route = tags.get(Tags.HTTP_ROUTE);
37+
if (route instanceof String) {
38+
ctx.setRoute((String) route);
39+
}
40+
1741
if (!isValid(ctx)) {
18-
return false;
42+
return;
1943
}
2044

21-
return apiAccessTracker.updateApiAccessIfExpired(
22-
ctx.getRoute(), ctx.getMethod(), ctx.getResponseStatus());
45+
if (!isApiAccessExpired(ctx.getRoute(), ctx.getMethod(), ctx.getResponseStatus())) {
46+
return;
47+
}
48+
49+
if (counter.acquire()) {
50+
ctx.setKeepOpenForApiSecurityPostProcessing(true);
51+
}
2352
}
2453

25-
public boolean preSampleRequest(AppSecRequestContext ctx) {
54+
public boolean sampleRequest(AppSecRequestContext ctx) {
2655
if (!isValid(ctx)) {
2756
return false;
2857
}
2958

30-
return apiAccessTracker.isApiAccessExpired(
59+
return updateApiAccessIfExpired(
3160
ctx.getRoute(), ctx.getMethod(), ctx.getResponseStatus());
3261
}
3362

3463
private boolean isValid(AppSecRequestContext ctx) {
35-
return config.isApiSecurityEnabled()
36-
&& ctx != null
64+
return ctx != null
3765
&& ctx.getRoute() != null
3866
&& ctx.getMethod() != null
3967
&& ctx.getResponseStatus() != 0;
4068
}
69+
70+
/**
71+
* Updates the API access log with the given route, method, and status code. If the record already
72+
* exists and is outdated, it is updated by moving to the end of the list. If the record does not
73+
* exist, a new record is added. If the capacity limit is reached, the oldest record is removed.
74+
* This method should not be called concurrently by multiple threads, due absence of additional
75+
* synchronization for updating data structures is not required.
76+
*
77+
* @param route The route of the API endpoint request
78+
* @param method The method of the API request
79+
* @param statusCode The HTTP response status code of the API request
80+
* @return return true if the record was updated or added, false otherwise
81+
*/
82+
public boolean updateApiAccessIfExpired(String route, String method, int statusCode) {
83+
long currentTime = System.currentTimeMillis();
84+
long hash = computeApiHash(route, method, statusCode);
85+
86+
// New or updated record
87+
boolean isNewOrUpdated = false;
88+
if (!apiAccessMap.containsKey(hash)
89+
|| currentTime - apiAccessMap.get(hash) > expirationTimeInMs) {
90+
91+
cleanupExpiredEntries(currentTime);
92+
93+
apiAccessMap.put(hash, currentTime); // Update timestamp
94+
// move hash to the end of the queue
95+
apiAccessQueue.remove(hash);
96+
apiAccessQueue.addLast(hash);
97+
isNewOrUpdated = true;
98+
99+
// Remove the oldest hash if capacity is reached
100+
while (apiAccessMap.size() > this.capacity) {
101+
Long oldestHash = apiAccessQueue.pollFirst();
102+
if (oldestHash != null) {
103+
apiAccessMap.remove(oldestHash);
104+
}
105+
}
106+
}
107+
108+
return isNewOrUpdated;
109+
}
110+
111+
public boolean isApiAccessExpired(String route, String method, int statusCode) {
112+
long currentTime = System.currentTimeMillis();
113+
long hash = computeApiHash(route, method, statusCode);
114+
return !apiAccessMap.containsKey(hash)
115+
|| currentTime - apiAccessMap.get(hash) > expirationTimeInMs;
116+
}
117+
118+
private void cleanupExpiredEntries(long currentTime) {
119+
while (!apiAccessQueue.isEmpty()) {
120+
Long oldestHash = apiAccessQueue.peekFirst();
121+
if (oldestHash == null) break;
122+
123+
Long lastAccessTime = apiAccessMap.get(oldestHash);
124+
if (lastAccessTime == null || currentTime - lastAccessTime > expirationTimeInMs) {
125+
apiAccessQueue.pollFirst(); // remove from head
126+
apiAccessMap.remove(oldestHash);
127+
} else {
128+
break; // is up-to-date
129+
}
130+
}
131+
}
132+
133+
private long computeApiHash(String route, String method, int statusCode) {
134+
long result = 17;
135+
result = 31 * result + route.hashCode();
136+
result = 31 * result + method.hashCode();
137+
result = 31 * result + statusCode;
138+
return result;
139+
}
140+
41141
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package com.datadog.appsec.api.security;
2+
3+
import com.datadog.appsec.event.EventProducerService;
4+
import com.datadog.appsec.event.ExpiredSubscriberInfoException;
5+
import com.datadog.appsec.event.data.DataBundle;
6+
import com.datadog.appsec.event.data.KnownAddresses;
7+
import com.datadog.appsec.event.data.SingletonDataBundle;
8+
import com.datadog.appsec.gateway.AppSecRequestContext;
9+
import com.datadog.appsec.gateway.GatewayContext;
10+
import datadog.trace.api.Config;
11+
import datadog.trace.api.gateway.RequestContext;
12+
import datadog.trace.api.gateway.RequestContextSlot;
13+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
14+
import datadog.trace.bootstrap.instrumentation.api.SpanPostProcessor;
15+
16+
import java.util.Collections;
17+
import java.util.function.BooleanSupplier;
18+
19+
public class AppSecSpanPostProcessor implements SpanPostProcessor {
20+
21+
@Override
22+
public void process(AgentSpan span, BooleanSupplier timeoutCheck) {
23+
if (timeoutCheck.getAsBoolean()) {
24+
return;
25+
}
26+
final RequestContext ctx = span.getRequestContext();
27+
if (ctx == null) {
28+
return;
29+
}
30+
final AppSecRequestContext appsecCtx = ctx.getData(RequestContextSlot.APPSEC);
31+
if (appsecCtx == null) {
32+
return;
33+
}
34+
35+
maybeExtractSchemas(appsecCtx);
36+
ctx.close();
37+
// Decrease the counter to allow the next request to be post-processed
38+
postProcessingCounter.release();
39+
}
40+
41+
private void maybeExtractSchemas(AppSecRequestContext ctx) {
42+
boolean extractSchema = false;
43+
if (Config.get().isApiSecurityEnabled() && requestSampler != null) {
44+
extractSchema = requestSampler.sampleRequest(ctx);
45+
}
46+
47+
if (!extractSchema) {
48+
return;
49+
}
50+
51+
while (true) {
52+
EventProducerService.DataSubscriberInfo subInfo = requestEndSubInfo;
53+
if (subInfo == null) {
54+
subInfo = producerService.getDataSubscribers(KnownAddresses.WAF_CONTEXT_PROCESSOR);
55+
requestEndSubInfo = subInfo;
56+
}
57+
if (subInfo == null || subInfo.isEmpty()) {
58+
return;
59+
}
60+
61+
DataBundle bundle =
62+
new SingletonDataBundle<>(
63+
KnownAddresses.WAF_CONTEXT_PROCESSOR,
64+
Collections.singletonMap("extract-schema", true));
65+
try {
66+
GatewayContext gwCtx = new GatewayContext(false);
67+
producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx);
68+
return;
69+
} catch (ExpiredSubscriberInfoException e) {
70+
requestEndSubInfo = null;
71+
}
72+
}
73+
}
74+
75+
}

0 commit comments

Comments
 (0)