Skip to content

Commit 785ce96

Browse files
Avoid race conditions on feature discovery during Writer creation (#9173)
1 parent 1ae2692 commit 785ce96

File tree

4 files changed

+65
-13
lines changed

4 files changed

+65
-13
lines changed

communication/src/main/java/datadog/communication/ddagent/SharedCommunicationObjects.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public DDAgentFeaturesDiscovery featuresDiscovery(Config config) {
159159
ret.discover(); // safe to run on same thread
160160
} else {
161161
// avoid performing blocking I/O operation on application thread
162-
AgentTaskScheduler.INSTANCE.execute(ret::discover);
162+
AgentTaskScheduler.INSTANCE.execute(ret::discoverIfOutdated);
163163
}
164164
}
165165
featuresDiscovery = ret;

dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public static Writer createWriter(
8686
// The AgentWriter doesn't support the CI Visibility protocol. If CI Visibility is
8787
// enabled, check if we can use the IntakeWriter instead.
8888
if (DD_AGENT_WRITER_TYPE.equals(configuredType) && (config.isCiVisibilityEnabled())) {
89+
featuresDiscovery.discoverIfOutdated();
8990
if (featuresDiscovery.supportsEvpProxy() || config.isCiVisibilityAgentlessEnabled()) {
9091
configuredType = DD_INTAKE_WRITER_TYPE;
9192
} else {
@@ -94,6 +95,7 @@ public static Writer createWriter(
9495
}
9596
}
9697
if (DD_AGENT_WRITER_TYPE.equals(configuredType) && (config.isLlmObsEnabled())) {
98+
featuresDiscovery.discoverIfOutdated();
9799
if (featuresDiscovery.supportsEvpProxy() || config.isLlmObsAgentlessEnabled()) {
98100
configuredType = DD_INTAKE_WRITER_TYPE;
99101
} else {
@@ -186,6 +188,7 @@ private static RemoteApi createDDIntakeRemoteApi(
186188
SharedCommunicationObjects commObjects,
187189
DDAgentFeaturesDiscovery featuresDiscovery,
188190
TrackType trackType) {
191+
featuresDiscovery.discoverIfOutdated();
189192
boolean evpProxySupported = featuresDiscovery.supportsEvpProxy();
190193
boolean useProxyApi =
191194
(evpProxySupported && TrackType.LLMOBS == trackType && !config.isLlmObsAgentlessEnabled())

dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -669,9 +669,16 @@ private CoreTracer(
669669
this.writer = writer;
670670
}
671671

672+
DDAgentFeaturesDiscovery featuresDiscovery =
673+
sharedCommunicationObjects.featuresDiscovery(config);
674+
675+
if (config.isCiVisibilityEnabled()) {
676+
// ensure updated discovery and sync if the another discovery currently being done
677+
featuresDiscovery.discoverIfOutdated();
678+
}
679+
672680
if (config.isCiVisibilityEnabled()
673-
&& (config.isCiVisibilityAgentlessEnabled()
674-
|| sharedCommunicationObjects.featuresDiscovery(config).supportsEvpProxy())) {
681+
&& (config.isCiVisibilityAgentlessEnabled() || featuresDiscovery.supportsEvpProxy())) {
675682
pendingTraceBuffer = PendingTraceBuffer.discarding();
676683
traceCollectorFactory =
677684
new StreamingTraceCollector.Factory(this, this.timeSource, healthMetrics);
@@ -732,8 +739,7 @@ private CoreTracer(
732739
if (config.isCiVisibilityAgentlessEnabled()) {
733740
addTraceInterceptor(DDIntakeTraceInterceptor.INSTANCE);
734741
} else {
735-
DDAgentFeaturesDiscovery featuresDiscovery =
736-
sharedCommunicationObjects.featuresDiscovery(config);
742+
featuresDiscovery.discoverIfOutdated();
737743
if (!featuresDiscovery.supportsEvpProxy()) {
738744
// CI Test Cycle protocol is not available
739745
addTraceInterceptor(CiVisibilityApmProtocolInterceptor.INSTANCE);

dd-trace-core/src/test/groovy/datadog/trace/common/writer/WriterFactoryTest.groovy

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package datadog.trace.common.writer
22

3+
import static datadog.trace.api.config.TracerConfig.PRIORITIZATION_TYPE
4+
35
import datadog.communication.ddagent.DDAgentFeaturesDiscovery
46
import datadog.communication.ddagent.SharedCommunicationObjects
57
import datadog.trace.api.Config
@@ -10,10 +12,16 @@ import datadog.trace.common.writer.ddintake.DDEvpProxyApi
1012
import datadog.trace.common.writer.ddintake.DDIntakeApi
1113
import datadog.trace.core.monitor.HealthMetrics
1214
import datadog.trace.test.util.DDSpecification
13-
15+
import groovy.json.JsonBuilder
1416
import java.util.stream.Collectors
15-
16-
import static datadog.trace.api.config.TracerConfig.PRIORITIZATION_TYPE
17+
import okhttp3.Call
18+
import okhttp3.HttpUrl
19+
import okhttp3.MediaType
20+
import okhttp3.OkHttpClient
21+
import okhttp3.Protocol
22+
import okhttp3.Request
23+
import okhttp3.Response
24+
import okhttp3.ResponseBody
1725

1826
class WriterFactoryTest extends DDSpecification {
1927

@@ -27,19 +35,30 @@ class WriterFactoryTest extends DDSpecification {
2735
config.isCiVisibilityEnabled() >> true
2836
config.isCiVisibilityCodeCoverageEnabled() >> false
2937

30-
def agentFeaturesDiscovery = Mock(DDAgentFeaturesDiscovery)
31-
agentFeaturesDiscovery.getEvpProxyEndpoint() >> DDAgentFeaturesDiscovery.V2_EVP_PROXY_ENDPOINT
32-
agentFeaturesDiscovery.supportsContentEncodingHeadersWithEvpProxy() >> evpProxySupportsCompression
38+
// Mock agent info response
39+
def response = buildHttpResponse(hasEvpProxy, evpProxySupportsCompression, HttpUrl.parse(config.agentUrl + "/info"))
40+
41+
// Mock HTTP client that simulates delayed response for async feature discovery
42+
def mockCall = Mock(Call)
43+
def mockHttpClient = Mock(OkHttpClient)
44+
mockCall.execute() >> {
45+
// Add a delay
46+
sleep(400)
47+
return response
48+
}
49+
mockHttpClient.newCall(_ as Request) >> mockCall
3350

51+
// Create SharedCommunicationObjects with mocked HTTP client
3452
def sharedComm = new SharedCommunicationObjects()
35-
sharedComm.setFeaturesDiscovery(agentFeaturesDiscovery)
53+
sharedComm.okHttpClient = mockHttpClient
54+
sharedComm.agentUrl = HttpUrl.parse(config.agentUrl)
3655
sharedComm.createRemaining(config)
3756

3857
def sampler = Mock(Sampler)
3958

4059
when:
41-
agentFeaturesDiscovery.supportsEvpProxy() >> hasEvpProxy
4260
config.ciVisibilityAgentlessEnabled >> isCiVisibilityAgentlessEnabled
61+
4362
def writer = WriterFactory.createWriter(config, sharedComm, sampler, null, HealthMetrics.NO_OP, configuredType)
4463

4564
def apis
@@ -77,4 +96,28 @@ class WriterFactoryTest extends DDSpecification {
7796
"not-found" | false | false | true | DDIntakeWriter | [DDIntakeApi] | true
7897
"not-found" | false | false | false | DDAgentWriter | [DDAgentApi] | false
7998
}
99+
100+
Response buildHttpResponse(boolean hasEvpProxy, boolean evpProxySupportsCompression, HttpUrl agentUrl) {
101+
def endpoints = []
102+
if (hasEvpProxy && evpProxySupportsCompression) {
103+
endpoints = [DDAgentFeaturesDiscovery.V4_EVP_PROXY_ENDPOINT]
104+
} else if (hasEvpProxy) {
105+
endpoints = [DDAgentFeaturesDiscovery.V2_EVP_PROXY_ENDPOINT]
106+
} else {
107+
endpoints = [DDAgentFeaturesDiscovery.V4_ENDPOINT]
108+
}
109+
110+
def response = [
111+
"version" : "7.40.0",
112+
"endpoints" : endpoints,
113+
]
114+
115+
def builder = new Response.Builder()
116+
.code(200)
117+
.message("OK")
118+
.protocol(Protocol.HTTP_1_1)
119+
.request(new Request.Builder().url(agentUrl.resolve("/info")).build())
120+
.body(ResponseBody.create(MediaType.parse("application/json"), new JsonBuilder(response).toString()))
121+
return builder.build()
122+
}
80123
}

0 commit comments

Comments
 (0)