Skip to content

Commit 74f1592

Browse files
Antithesis poc
1 parent 8d53878 commit 74f1592

File tree

7 files changed

+259
-0
lines changed

7 files changed

+259
-0
lines changed

dd-trace-core/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ dependencies {
8080

8181
implementation group: 'com.google.re2j', name: 're2j', version: '1.7'
8282

83+
// Antithesis SDK for assertions and property testing
84+
implementation group: 'com.antithesis', name: 'antithesis-sdk-java', version: '0.1.5'
85+
8386
compileOnly group: 'com.github.spotbugs', name: 'spotbugs-annotations', version: '4.2.0'
8487

8588
// We have autoservices defined in test subtree, looks like we need this to be able to properly rebuild this

dd-trace-core/src/main/java/datadog/trace/common/writer/PayloadDispatcherImpl.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package datadog.trace.common.writer;
22

3+
import com.antithesis.sdk.Assert;
4+
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
5+
import com.fasterxml.jackson.databind.node.ObjectNode;
36
import datadog.communication.monitor.Monitoring;
47
import datadog.communication.monitor.Recording;
58
import datadog.communication.serialization.ByteBufferConsumer;
@@ -57,6 +60,16 @@ public Collection<RemoteApi> getApis() {
5760

5861
@Override
5962
public void onDroppedTrace(int spanCount) {
63+
// Antithesis: Assert that traces should not be dropped before sending
64+
ObjectNode dropDetails = JsonNodeFactory.instance.objectNode();
65+
dropDetails.put("span_count", spanCount);
66+
dropDetails.put("total_dropped_traces", droppedTraceCount.sum() + 1);
67+
dropDetails.put("total_dropped_spans", droppedSpanCount.sum() + spanCount);
68+
69+
Assert.unreachable(
70+
"Traces should not be dropped before attempting to send - indicates buffer overflow or backpressure",
71+
dropDetails);
72+
6073
droppedSpanCount.add(spanCount);
6174
droppedTraceCount.increment();
6275
}
@@ -103,18 +116,60 @@ public void accept(int messageCount, ByteBuffer buffer) {
103116
// the packer calls this when the buffer is full,
104117
// or when the packer is flushed at a heartbeat
105118
if (messageCount > 0) {
119+
// Antithesis: Verify that we're attempting to send traces
120+
Assert.reachable("Trace sending code path is exercised", null);
121+
Assert.sometimes(
122+
messageCount > 0,
123+
"Traces are being sent to the API",
124+
null);
125+
106126
batchTimer.reset();
107127
Payload payload = newPayload(messageCount, buffer);
108128
final int sizeInBytes = payload.sizeInBytes();
109129
healthMetrics.onSerialize(sizeInBytes);
110130
RemoteApi.Response response = api.sendSerializedTraces(payload);
111131
mapper.reset();
132+
133+
// Antithesis: Assert that trace sending should always succeed
134+
ObjectNode sendDetails = JsonNodeFactory.instance.objectNode();
135+
sendDetails.put("trace_count", messageCount);
136+
sendDetails.put("payload_size_bytes", sizeInBytes);
137+
sendDetails.put("success", response.success());
138+
if (response.exception() != null) {
139+
sendDetails.put("exception", response.exception().getClass().getName());
140+
sendDetails.put("exception_message", response.exception().getMessage());
141+
}
142+
if (response.status() != null) {
143+
sendDetails.put("http_status", response.status());
144+
}
145+
146+
Assert.always(
147+
response.success(),
148+
"Trace sending to API should always succeed - no traces should be lost",
149+
sendDetails);
150+
112151
if (response.success()) {
113152
if (log.isDebugEnabled()) {
114153
log.debug("Successfully sent {} traces to the API", messageCount);
115154
}
116155
healthMetrics.onSend(messageCount, sizeInBytes, response);
117156
} else {
157+
// Antithesis: This code path should be unreachable if traces are never lost
158+
ObjectNode failureDetails = JsonNodeFactory.instance.objectNode();
159+
failureDetails.put("trace_count", messageCount);
160+
failureDetails.put("payload_size_bytes", sizeInBytes);
161+
if (response.exception() != null) {
162+
failureDetails.put("exception", response.exception().getClass().getName());
163+
failureDetails.put("exception_message", response.exception().getMessage());
164+
}
165+
if (response.status() != null) {
166+
failureDetails.put("http_status", response.status());
167+
}
168+
169+
Assert.unreachable(
170+
"Trace sending failure path should never be reached - indicates traces are being lost",
171+
failureDetails);
172+
118173
if (log.isDebugEnabled()) {
119174
log.debug(
120175
"Failed to send {} traces of size {} bytes to the API", messageCount, sizeInBytes);

dd-trace-core/src/main/java/datadog/trace/common/writer/RemoteWriter.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import static datadog.trace.api.sampling.PrioritySampling.UNSET;
44
import static java.util.concurrent.TimeUnit.MINUTES;
55

6+
import com.antithesis.sdk.Assert;
7+
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
8+
import com.fasterxml.jackson.databind.node.ObjectNode;
69
import datadog.trace.core.DDSpan;
710
import datadog.trace.core.monitor.HealthMetrics;
811
import datadog.trace.relocate.api.RatelimitedLogger;
@@ -67,9 +70,32 @@ protected RemoteWriter(
6770

6871
@Override
6972
public void write(final List<DDSpan> trace) {
73+
// Antithesis: Assert that we should never attempt to write when writer is closed
74+
ObjectNode writeAttemptDetails = JsonNodeFactory.instance.objectNode();
75+
writeAttemptDetails.put("writer_closed", closed);
76+
writeAttemptDetails.put("trace_size", trace.size());
77+
writeAttemptDetails.put("has_traces", !trace.isEmpty());
78+
79+
Assert.always(
80+
!closed,
81+
"Writer should never be closed when attempting to write traces",
82+
writeAttemptDetails);
83+
7084
if (closed) {
7185
// We can't add events after shutdown otherwise it will never complete shutting down.
7286
log.debug("Dropped due to shutdown: {}", trace);
87+
88+
// Antithesis: Track when traces are dropped due to writer being closed
89+
ObjectNode shutdownDetails = JsonNodeFactory.instance.objectNode();
90+
shutdownDetails.put("trace_size", trace.size());
91+
shutdownDetails.put("span_count", trace.stream().mapToInt(List::size).sum());
92+
shutdownDetails.put("reason", "writer_closed_during_shutdown");
93+
94+
Assert.sometimes(
95+
closed && !trace.isEmpty(),
96+
"Traces are dropped due to writer shutdown - tracking shutdown behavior",
97+
shutdownDetails);
98+
7399
handleDroppedTrace(trace);
74100
} else {
75101
if (trace.isEmpty()) {
@@ -91,6 +117,18 @@ public void write(final List<DDSpan> trace) {
91117
handleDroppedTrace(trace);
92118
break;
93119
case DROPPED_BUFFER_OVERFLOW:
120+
// Antithesis: Buffer overflow should NEVER happen - this indicates a serious problem
121+
ObjectNode overflowDetails = JsonNodeFactory.instance.objectNode();
122+
overflowDetails.put("trace_size", trace.size());
123+
overflowDetails.put("span_count", trace.stream().mapToInt(List::size).sum());
124+
overflowDetails.put("sampling_priority", samplingPriority);
125+
overflowDetails.put("buffer_capacity", traceProcessingWorker.getCapacity());
126+
overflowDetails.put("reason", "buffer_overflow_backpressure");
127+
128+
Assert.unreachable(
129+
"Buffer overflow should never occur - traces are being dropped due to backpressure",
130+
overflowDetails);
131+
94132
if (log.isDebugEnabled()) {
95133
log.debug("Dropped due to a buffer overflow: {}", trace);
96134
} else {

dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
import static datadog.communication.http.OkHttpUtils.prepareRequest;
44

5+
import com.antithesis.sdk.Assert;
6+
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
7+
import com.fasterxml.jackson.databind.node.ObjectNode;
58
import com.squareup.moshi.JsonAdapter;
69
import com.squareup.moshi.Moshi;
710
import com.squareup.moshi.Types;
@@ -89,11 +92,30 @@ public void addResponseListener(final RemoteResponseListener listener) {
8992

9093
public Response sendSerializedTraces(final Payload payload) {
9194
final int sizeInBytes = payload.sizeInBytes();
95+
96+
// Antithesis: Track that agent API send is being exercised
97+
Assert.reachable("DDAgentApi trace sending is exercised", null);
98+
Assert.sometimes(
99+
payload.traceCount() > 0,
100+
"Traces are being sent through DDAgentApi",
101+
null);
102+
92103
String tracesEndpoint = featuresDiscovery.getTraceEndpoint();
93104
if (null == tracesEndpoint) {
94105
featuresDiscovery.discoverIfOutdated();
95106
tracesEndpoint = featuresDiscovery.getTraceEndpoint();
96107
if (null == tracesEndpoint) {
108+
// Antithesis: Agent should always be detectable
109+
ObjectNode agentDetectionDetails = JsonNodeFactory.instance.objectNode();
110+
agentDetectionDetails.put("trace_count", payload.traceCount());
111+
agentDetectionDetails.put("payload_size_bytes", sizeInBytes);
112+
agentDetectionDetails.put("agent_url", agentUrl.toString());
113+
agentDetectionDetails.put("failure_reason", "agent_not_detected");
114+
115+
Assert.unreachable(
116+
"Datadog agent should always be detected - agent communication failure",
117+
agentDetectionDetails);
118+
97119
log.error("No datadog agent detected");
98120
countAndLogFailedSend(payload.traceCount(), sizeInBytes, null, null);
99121
return Response.failed(404);
@@ -122,7 +144,34 @@ public Response sendSerializedTraces(final Payload payload) {
122144
try (final Recording recording = sendPayloadTimer.start();
123145
final okhttp3.Response response = httpClient.newCall(request).execute()) {
124146
handleAgentChange(response.header(DATADOG_AGENT_STATE));
147+
148+
// Antithesis: Track HTTP response status and assert success
149+
ObjectNode httpResponseDetails = JsonNodeFactory.instance.objectNode();
150+
httpResponseDetails.put("trace_count", payload.traceCount());
151+
httpResponseDetails.put("payload_size_bytes", sizeInBytes);
152+
httpResponseDetails.put("http_status", response.code());
153+
httpResponseDetails.put("http_message", response.message());
154+
httpResponseDetails.put("success", response.code() == 200);
155+
httpResponseDetails.put("agent_url", tracesUrl.toString());
156+
157+
Assert.always(
158+
response.code() == 200,
159+
"HTTP response from Datadog agent should always be 200 - API communication failure",
160+
httpResponseDetails);
161+
125162
if (response.code() != 200) {
163+
// Antithesis: Mark non-200 path as unreachable
164+
ObjectNode errorDetails = JsonNodeFactory.instance.objectNode();
165+
errorDetails.put("trace_count", payload.traceCount());
166+
errorDetails.put("payload_size_bytes", sizeInBytes);
167+
errorDetails.put("http_status", response.code());
168+
errorDetails.put("http_message", response.message());
169+
errorDetails.put("failure_reason", "http_error_response");
170+
171+
Assert.unreachable(
172+
"Non-200 HTTP response from agent indicates API failure - traces may be lost",
173+
errorDetails);
174+
126175
agentErrorCounter.incrementErrorCount(response.message(), payload.traceCount());
127176
countAndLogFailedSend(payload.traceCount(), sizeInBytes, response, null);
128177
return Response.failed(response.code());
@@ -146,6 +195,19 @@ public Response sendSerializedTraces(final Payload payload) {
146195
}
147196
}
148197
} catch (final IOException e) {
198+
// Antithesis: Network failures should not occur
199+
ObjectNode networkErrorDetails = JsonNodeFactory.instance.objectNode();
200+
networkErrorDetails.put("trace_count", payload.traceCount());
201+
networkErrorDetails.put("payload_size_bytes", sizeInBytes);
202+
networkErrorDetails.put("exception_type", e.getClass().getName());
203+
networkErrorDetails.put("exception_message", e.getMessage());
204+
networkErrorDetails.put("agent_url", agentUrl.toString());
205+
networkErrorDetails.put("failure_reason", "network_io_exception");
206+
207+
Assert.unreachable(
208+
"Network/IO exceptions should not occur when sending to agent - indicates connectivity issues",
209+
networkErrorDetails);
210+
149211
countAndLogFailedSend(payload.traceCount(), sizeInBytes, null, e);
150212
return Response.failed(e);
151213
}

telemetry/build.gradle.kts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ dependencies {
3434
implementation(libs.slf4j)
3535

3636
implementation(project(":internal-api"))
37+
38+
// Antithesis SDK for assertions and property testing
39+
implementation(group = "com.antithesis", name = "antithesis-sdk-java", version = "0.1.5")
40+
implementation(group = "com.fasterxml.jackson.core", name = "jackson-databind", version = "2.15.2")
3741

3842
compileOnly(project(":dd-java-agent:agent-tooling"))
3943
testImplementation(project(":dd-java-agent:agent-tooling"))

telemetry/src/main/java/datadog/telemetry/TelemetryClient.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package datadog.telemetry;
22

3+
import com.antithesis.sdk.Assert;
4+
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
5+
import com.fasterxml.jackson.databind.node.ObjectNode;
36
import datadog.communication.http.HttpRetryPolicy;
47
import datadog.communication.http.OkHttpUtils;
58
import datadog.trace.api.Config;
@@ -94,26 +97,81 @@ public Result sendHttpRequest(Request.Builder httpRequestBuilder) {
9497

9598
Request httpRequest = httpRequestBuilder.build();
9699
String requestType = httpRequest.header(DD_TELEMETRY_REQUEST_TYPE);
100+
101+
// Antithesis: Track telemetry sending attempts
102+
Assert.reachable("Telemetry sending is exercised", null);
97103

98104
try (okhttp3.Response response =
99105
OkHttpUtils.sendWithRetries(okHttpClient, httpRetryPolicy, httpRequest)) {
106+
107+
// Antithesis: Assert that all telemetry requests should succeed
108+
ObjectNode telemetryResponseDetails = JsonNodeFactory.instance.objectNode();
109+
telemetryResponseDetails.put("request_type", requestType != null ? requestType : "unknown");
110+
telemetryResponseDetails.put("http_status", response.code());
111+
telemetryResponseDetails.put("http_message", response.message());
112+
telemetryResponseDetails.put("url", url.toString());
113+
telemetryResponseDetails.put("success", response.isSuccessful());
114+
100115
if (response.code() == 404) {
116+
// Antithesis: Track 404 - endpoint disabled scenario
117+
ObjectNode notFoundDetails = JsonNodeFactory.instance.objectNode();
118+
notFoundDetails.put("request_type", requestType != null ? requestType : "unknown");
119+
notFoundDetails.put("url", url.toString());
120+
notFoundDetails.put("reason", "endpoint_disabled_404");
121+
122+
Assert.sometimes(
123+
response.code() == 404,
124+
"Telemetry endpoint returns 404 - endpoint may be disabled",
125+
notFoundDetails);
126+
101127
log.debug("Telemetry endpoint is disabled, dropping {} message.", requestType);
102128
return Result.NOT_FOUND;
103129
}
130+
104131
if (!response.isSuccessful()) {
132+
// Antithesis: Telemetry should not fail - data should be retried/buffered
133+
ObjectNode failureDetails = JsonNodeFactory.instance.objectNode();
134+
failureDetails.put("request_type", requestType != null ? requestType : "unknown");
135+
failureDetails.put("http_status", response.code());
136+
failureDetails.put("http_message", response.message());
137+
failureDetails.put("url", url.toString());
138+
failureDetails.put("reason", "http_error_response");
139+
140+
Assert.unreachable(
141+
"Telemetry HTTP request failed - telemetry data should not be dropped, should retry",
142+
failureDetails);
143+
105144
log.debug(
106145
"Telemetry message {} failed with: {} {}.",
107146
requestType,
108147
response.code(),
109148
response.message());
110149
return Result.FAILURE;
111150
}
151+
152+
// Antithesis: Assert success
153+
Assert.always(
154+
response.isSuccessful(),
155+
"Telemetry requests should always succeed - no telemetry data should be lost",
156+
telemetryResponseDetails);
157+
112158
} catch (InterruptedIOException e) {
113159
log.debug("Telemetry message {} sending interrupted: {}.", requestType, e.toString());
114160
return Result.INTERRUPTED;
115161

116162
} catch (IOException e) {
163+
// Antithesis: Network failures should not cause telemetry loss
164+
ObjectNode ioErrorDetails = JsonNodeFactory.instance.objectNode();
165+
ioErrorDetails.put("request_type", requestType != null ? requestType : "unknown");
166+
ioErrorDetails.put("exception_type", e.getClass().getName());
167+
ioErrorDetails.put("exception_message", e.getMessage());
168+
ioErrorDetails.put("url", url.toString());
169+
ioErrorDetails.put("reason", "network_io_exception");
170+
171+
Assert.unreachable(
172+
"Telemetry network/IO failure - telemetry data should not be dropped, should retry",
173+
ioErrorDetails);
174+
117175
log.debug("Telemetry message {} failed with exception: {}.", requestType, e.toString());
118176
return Result.FAILURE;
119177
}

0 commit comments

Comments
 (0)