Skip to content

Commit 49c1f1c

Browse files
jbachorikclaude
andcommitted
feat(profiling): Integrate OtlpProfileUploader with explicit reference counting
Integrate OtlpProfileUploader into ProfilingAgent to enable parallel JFR and OTLP profile uploads when configured. Implements explicit reference counting pattern for RecordingData to safely support multiple concurrent handlers. Key changes: 1. ProfilingAgent integration: - Add OtlpProfileUploader alongside ProfileUploader - Extract handler methods (handleRecordingData, handleRecordingDataWithDump) - Use method references instead of capturing lambdas for better performance - Call retain() once for each handler (dumper, OTLP, JFR) - Update shutdown hooks to properly cleanup OTLP uploader 2. Explicit reference counting in RecordingData: - Change initial refcount from 1 to 0 for clarity - Each handler must call retain() before processing - Each handler calls release() when done - doRelease() called only when refcount reaches 0 - Updated javadocs to reflect explicit counting pattern 3. Comprehensive test coverage: - RecordingDataRefCountingTest validates all handler combinations - Tests single, dual, and triple handler scenarios - Verifies thread-safety with concurrent handlers - Tests error conditions (premature release, retain after release) - Confirms idempotent release behavior Benefits: - Symmetric treatment of all handlers (no special first handler) - Clear, explicit reference counting (easier to understand and verify) - No resource leaks or premature cleanup - Efficient method references (no lambda capture overhead) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 1d3aee9 commit 49c1f1c

File tree

3 files changed

+307
-18
lines changed

3 files changed

+307
-18
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
/*
2+
* Copyright 2025 Datadog
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datadog.profiling.controller;
17+
18+
import static org.junit.jupiter.api.Assertions.assertEquals;
19+
import static org.junit.jupiter.api.Assertions.assertThrows;
20+
import static org.junit.jupiter.api.Assertions.assertTrue;
21+
22+
import datadog.trace.api.profiling.ProfilingSnapshot;
23+
import datadog.trace.api.profiling.RecordingData;
24+
import datadog.trace.api.profiling.RecordingInputStream;
25+
import java.io.ByteArrayInputStream;
26+
import java.io.IOException;
27+
import java.nio.file.Path;
28+
import java.time.Instant;
29+
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicInteger;
32+
import javax.annotation.Nonnull;
33+
import javax.annotation.Nullable;
34+
import org.junit.jupiter.api.Test;
35+
36+
/** Tests for RecordingData reference counting with multiple handlers. */
37+
public class RecordingDataRefCountingTest {
38+
39+
/** Test RecordingData implementation that tracks release calls. */
40+
private static class TestRecordingData extends RecordingData {
41+
private final AtomicInteger releaseCount = new AtomicInteger(0);
42+
private final CountDownLatch releaseLatch = new CountDownLatch(1);
43+
44+
public TestRecordingData() {
45+
super(Instant.now(), Instant.now(), ProfilingSnapshot.Kind.PERIODIC);
46+
}
47+
48+
@Nonnull
49+
@Override
50+
public RecordingInputStream getStream() throws IOException {
51+
return new RecordingInputStream(new ByteArrayInputStream(new byte[0]));
52+
}
53+
54+
@Override
55+
protected void doRelease() {
56+
releaseCount.incrementAndGet();
57+
releaseLatch.countDown();
58+
}
59+
60+
@Nullable
61+
@Override
62+
public Path getFile() {
63+
return null;
64+
}
65+
66+
@Override
67+
public String getName() {
68+
return "test-recording";
69+
}
70+
71+
public int getReleaseCount() {
72+
return releaseCount.get();
73+
}
74+
75+
public boolean awaitRelease(long timeout, TimeUnit unit) throws InterruptedException {
76+
return releaseLatch.await(timeout, unit);
77+
}
78+
}
79+
80+
@Test
81+
public void testSingleHandler() throws InterruptedException {
82+
TestRecordingData data = new TestRecordingData();
83+
84+
// Single handler: retain once, release once
85+
data.retain();
86+
assertEquals(0, data.getReleaseCount(), "Should not be released yet");
87+
88+
data.release();
89+
90+
assertTrue(data.awaitRelease(1, TimeUnit.SECONDS), "Release should be called");
91+
assertEquals(1, data.getReleaseCount(), "doRelease() should be called exactly once");
92+
}
93+
94+
@Test
95+
public void testTwoHandlers() throws InterruptedException {
96+
TestRecordingData data = new TestRecordingData();
97+
98+
// Two handlers (e.g., JFR + OTLP): retain twice
99+
data.retain(); // Handler 1
100+
data.retain(); // Handler 2
101+
assertEquals(0, data.getReleaseCount(), "Should not be released yet");
102+
103+
// First handler releases
104+
data.release();
105+
assertEquals(0, data.getReleaseCount(), "Should not be released after first release");
106+
107+
// Second handler releases
108+
data.release();
109+
110+
assertTrue(data.awaitRelease(1, TimeUnit.SECONDS), "Release should be called");
111+
assertEquals(1, data.getReleaseCount(), "doRelease() should be called exactly once");
112+
}
113+
114+
@Test
115+
public void testThreeHandlers() throws InterruptedException {
116+
TestRecordingData data = new TestRecordingData();
117+
118+
// Three handlers (e.g., dumper + JFR + OTLP): retain three times
119+
data.retain(); // Handler 1
120+
data.retain(); // Handler 2
121+
data.retain(); // Handler 3
122+
assertEquals(0, data.getReleaseCount(), "Should not be released yet");
123+
124+
// First two handlers release
125+
data.release();
126+
data.release();
127+
assertEquals(0, data.getReleaseCount(), "Should not be released after two releases");
128+
129+
// Third handler releases
130+
data.release();
131+
132+
assertTrue(data.awaitRelease(1, TimeUnit.SECONDS), "Release should be called");
133+
assertEquals(1, data.getReleaseCount(), "doRelease() should be called exactly once");
134+
}
135+
136+
@Test
137+
public void testReleaseBeforeRetain() {
138+
TestRecordingData data = new TestRecordingData();
139+
140+
// Cannot release before any retain
141+
assertThrows(
142+
IllegalStateException.class,
143+
data::release,
144+
"Should throw when releasing with refcount=0");
145+
}
146+
147+
@Test
148+
public void testRetainAfterFullRelease() throws InterruptedException {
149+
TestRecordingData data = new TestRecordingData();
150+
151+
data.retain();
152+
data.release();
153+
assertTrue(data.awaitRelease(1, TimeUnit.SECONDS), "Release should be called");
154+
155+
// Cannot retain after full release
156+
assertThrows(
157+
IllegalStateException.class,
158+
data::retain,
159+
"Should throw when retaining after release");
160+
}
161+
162+
@Test
163+
public void testMultipleReleaseIdempotent() throws InterruptedException {
164+
TestRecordingData data = new TestRecordingData();
165+
166+
data.retain();
167+
data.release();
168+
assertTrue(data.awaitRelease(1, TimeUnit.SECONDS), "Release should be called");
169+
170+
// Additional release calls should be no-op
171+
data.release();
172+
data.release();
173+
174+
assertEquals(1, data.getReleaseCount(), "doRelease() should still be called exactly once");
175+
}
176+
177+
@Test
178+
public void testConcurrentHandlers() throws InterruptedException {
179+
TestRecordingData data = new TestRecordingData();
180+
int numHandlers = 10;
181+
182+
// Retain for all handlers
183+
for (int i = 0; i < numHandlers; i++) {
184+
data.retain();
185+
}
186+
187+
// Simulate concurrent release from multiple threads
188+
CountDownLatch startLatch = new CountDownLatch(1);
189+
CountDownLatch doneLatch = new CountDownLatch(numHandlers);
190+
191+
for (int i = 0; i < numHandlers; i++) {
192+
new Thread(
193+
() -> {
194+
try {
195+
startLatch.await();
196+
data.release();
197+
doneLatch.countDown();
198+
} catch (InterruptedException e) {
199+
Thread.currentThread().interrupt();
200+
}
201+
})
202+
.start();
203+
}
204+
205+
// Start all threads
206+
startLatch.countDown();
207+
208+
// Wait for all threads to complete
209+
assertTrue(doneLatch.await(5, TimeUnit.SECONDS), "All threads should complete");
210+
assertTrue(data.awaitRelease(1, TimeUnit.SECONDS), "Release should be called");
211+
assertEquals(1, data.getReleaseCount(), "doRelease() should be called exactly once");
212+
}
213+
214+
@Test
215+
public void testRetainChaining() {
216+
TestRecordingData data = new TestRecordingData();
217+
218+
// retain() should return this for chaining
219+
RecordingData result = data.retain();
220+
assertEquals(data, result, "retain() should return the same instance");
221+
}
222+
}

dd-java-agent/agent-profiling/src/main/java/com/datadog/profiling/agent/ProfilingAgent.java

Lines changed: 75 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import static datadog.environment.JavaVirtualMachine.isJavaVersion;
44
import static datadog.environment.JavaVirtualMachine.isJavaVersionAtLeast;
5+
import static datadog.trace.api.config.ProfilingConfig.PROFILING_OTLP_ENABLED;
6+
import static datadog.trace.api.config.ProfilingConfig.PROFILING_OTLP_ENABLED_DEFAULT;
57
import static datadog.trace.api.config.ProfilingConfig.PROFILING_START_FORCE_FIRST;
68
import static datadog.trace.api.config.ProfilingConfig.PROFILING_START_FORCE_FIRST_DEFAULT;
79
import static datadog.trace.api.telemetry.LogCollector.SEND_TELEMETRY;
@@ -14,6 +16,7 @@
1416
import com.datadog.profiling.controller.ProfilingSystem;
1517
import com.datadog.profiling.controller.UnsupportedEnvironmentException;
1618
import com.datadog.profiling.controller.jfr.JFRAccess;
19+
import com.datadog.profiling.uploader.OtlpProfileUploader;
1720
import com.datadog.profiling.uploader.ProfileUploader;
1821
import com.datadog.profiling.utils.Timestamper;
1922
import datadog.trace.api.Config;
@@ -48,6 +51,53 @@ public class ProfilingAgent {
4851

4952
private static volatile ProfilingSystem profiler;
5053
private static volatile ProfileUploader uploader;
54+
private static volatile OtlpProfileUploader otlpUploader;
55+
private static volatile DataDumper dumper;
56+
57+
/**
58+
* Handle recording data upload to both JFR and OTLP uploaders.
59+
*
60+
* @param type Recording type
61+
* @param data Recording data (will be retained for each uploader)
62+
* @param sync Whether to upload synchronously
63+
*/
64+
private static void handleRecordingData(RecordingType type, RecordingData data, boolean sync) {
65+
// Retain once for each uploader
66+
if (otlpUploader != null) {
67+
data.retain(); // For OTLP uploader
68+
}
69+
data.retain(); // For JFR uploader
70+
71+
// Upload to both (if OTLP enabled)
72+
if (otlpUploader != null) {
73+
otlpUploader.upload(type, data, sync, null);
74+
}
75+
uploader.upload(type, data, sync);
76+
}
77+
78+
/**
79+
* Handle recording data upload with debug dump, JFR, and OTLP uploaders.
80+
*
81+
* @param type Recording type
82+
* @param data Recording data (will be retained for each handler)
83+
* @param sync Whether to upload synchronously
84+
*/
85+
private static void handleRecordingDataWithDump(
86+
RecordingType type, RecordingData data, boolean sync) {
87+
// Retain once for each handler
88+
data.retain(); // For dumper
89+
if (otlpUploader != null) {
90+
data.retain(); // For OTLP uploader
91+
}
92+
data.retain(); // For JFR uploader
93+
94+
// Process in all handlers
95+
dumper.onNewData(type, data, sync);
96+
if (otlpUploader != null) {
97+
otlpUploader.upload(type, data, sync, null);
98+
}
99+
uploader.upload(type, data, sync);
100+
}
51101

52102
private static class DataDumper implements RecordingDataListener {
53103
private final Path path;
@@ -133,10 +183,14 @@ public static synchronized boolean run(final boolean earlyStart, Instrumentation
133183
final Controller controller = CompositeController.build(configProvider, context);
134184

135185
String dumpPath = configProvider.getString(ProfilingConfig.PROFILING_DEBUG_DUMP_PATH);
136-
DataDumper dumper = dumpPath != null ? new DataDumper(Paths.get(dumpPath)) : null;
186+
dumper = dumpPath != null ? new DataDumper(Paths.get(dumpPath)) : null;
137187

138188
uploader = new ProfileUploader(config, configProvider);
139189

190+
if (configProvider.getBoolean(PROFILING_OTLP_ENABLED, PROFILING_OTLP_ENABLED_DEFAULT)) {
191+
otlpUploader = new OtlpProfileUploader(config, configProvider);
192+
}
193+
140194
final Duration startupDelay = Duration.ofSeconds(config.getProfilingStartDelay());
141195
final Duration uploadPeriod = Duration.ofSeconds(config.getProfilingUploadPeriod());
142196

@@ -150,11 +204,8 @@ public static synchronized boolean run(final boolean earlyStart, Instrumentation
150204
controller,
151205
context.snapshot(),
152206
dumper == null
153-
? uploader::upload
154-
: (type, data, sync) -> {
155-
dumper.onNewData(type, data, sync);
156-
uploader.upload(type, data, sync);
157-
},
207+
? ProfilingAgent::handleRecordingData
208+
: ProfilingAgent::handleRecordingDataWithDump,
158209
startupDelay,
159210
startupDelayRandomRange,
160211
uploadPeriod,
@@ -169,7 +220,7 @@ public static synchronized boolean run(final boolean earlyStart, Instrumentation
169220
This means that if/when we implement functionality to manually shutdown profiler we would
170221
need to not forget to add code that removes this shutdown hook from JVM.
171222
*/
172-
Runtime.getRuntime().addShutdownHook(new ShutdownHook(profiler, uploader));
223+
Runtime.getRuntime().addShutdownHook(new ShutdownHook(profiler, uploader, otlpUploader));
173224
} catch (final IllegalStateException ex) {
174225
// The JVM is already shutting down.
175226
}
@@ -188,17 +239,20 @@ private static boolean isStartForceFirstSafe() {
188239
}
189240

190241
public static void shutdown() {
191-
shutdown(profiler, uploader, false);
242+
shutdown(profiler, uploader, otlpUploader, false);
192243
}
193244

194245
public static void shutdown(boolean snapshot) {
195-
shutdown(profiler, uploader, snapshot);
246+
shutdown(profiler, uploader, otlpUploader, snapshot);
196247
}
197248

198249
private static final AtomicBoolean shutDownFlag = new AtomicBoolean();
199250

200251
private static void shutdown(
201-
ProfilingSystem profiler, ProfileUploader uploader, boolean snapshot) {
252+
ProfilingSystem profiler,
253+
ProfileUploader uploader,
254+
OtlpProfileUploader otlpUploader,
255+
boolean snapshot) {
202256
if (shutDownFlag.compareAndSet(false, true)) {
203257
if (profiler != null) {
204258
profiler.shutdown(snapshot);
@@ -207,23 +261,32 @@ private static void shutdown(
207261
if (uploader != null) {
208262
uploader.shutdown();
209263
}
264+
265+
if (otlpUploader != null) {
266+
otlpUploader.shutdown();
267+
}
210268
}
211269
}
212270

213271
private static class ShutdownHook extends Thread {
214272

215273
private final WeakReference<ProfilingSystem> profilerRef;
216274
private final WeakReference<ProfileUploader> uploaderRef;
275+
private final WeakReference<OtlpProfileUploader> otlpUploaderRef;
217276

218-
private ShutdownHook(final ProfilingSystem profiler, final ProfileUploader uploader) {
277+
private ShutdownHook(
278+
final ProfilingSystem profiler,
279+
final ProfileUploader uploader,
280+
final OtlpProfileUploader otlpUploader) {
219281
super(AGENT_THREAD_GROUP, "dd-profiler-shutdown-hook");
220282
profilerRef = new WeakReference<>(profiler);
221283
uploaderRef = new WeakReference<>(uploader);
284+
otlpUploaderRef = new WeakReference<>(otlpUploader);
222285
}
223286

224287
@Override
225288
public void run() {
226-
shutdown(profilerRef.get(), uploaderRef.get(), false);
289+
shutdown(profilerRef.get(), uploaderRef.get(), otlpUploaderRef.get(), false);
227290
}
228291
}
229292
}

0 commit comments

Comments
 (0)