Skip to content

Commit fedd001

Browse files
add queue type and length to queue events
1 parent ae1aa30 commit fedd001

File tree

14 files changed

+372
-32
lines changed

14 files changed

+372
-32
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/java/concurrent/QueueTimerHelper.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,31 @@ private static final class RateLimiterHolder {
3030
}
3131

3232
public static <T> void startQueuingTimer(
33-
ContextStore<T, State> taskContextStore, Class<?> schedulerClass, T task) {
33+
ContextStore<T, State> taskContextStore,
34+
Class<?> schedulerClass,
35+
Class<?> queueClass,
36+
int queueLength,
37+
T task) {
3438
State state = taskContextStore.get(task);
35-
startQueuingTimer(state, schedulerClass, task);
39+
startQueuingTimer(state, schedulerClass, queueClass, queueLength, task);
3640
}
3741

38-
public static void startQueuingTimer(State state, Class<?> schedulerClass, Object task) {
42+
public static void startQueuingTimer(
43+
State state, Class<?> schedulerClass, Class<?> queueClass, int queueLength, Object task) {
3944
if (Platform.isNativeImage()) {
4045
// explicitly not supported for Graal native image
4146
return;
4247
}
48+
// TODO consider queue length based sampling here to reduce overhead
4349
// avoid calling this before JFR is initialised because it will lead to reading the wrong
4450
// TSC frequency before JFR has set it up properly
4551
if (task != null && state != null && InstrumentationBasedProfiling.isJFRReady()) {
4652
QueueTiming timing =
4753
(QueueTiming) AgentTracer.get().getProfilingContext().start(Timer.TimerType.QUEUEING);
4854
timing.setTask(task);
4955
timing.setScheduler(schedulerClass);
56+
timing.setQueue(queueClass);
57+
timing.setQueueLength(queueLength);
5058
state.setTiming(timing);
5159
}
5260
}

dd-java-agent/agent-profiling/profiling-controller-openjdk/src/main/java/com/datadog/profiling/controller/openjdk/events/QueueTimeEvent.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ public class QueueTimeEvent extends Event implements QueueTiming {
3333
@Label("Scheduler")
3434
private Class<?> scheduler;
3535

36+
@Label("Queue")
37+
private Class<?> queueType;
38+
39+
@Label("Queue Length on Entry")
40+
private int queueLength;
41+
3642
public QueueTimeEvent() {
3743
this.origin = Thread.currentThread();
3844
AgentSpan activeSpan = AgentTracer.activeSpan();
@@ -55,6 +61,16 @@ public void setScheduler(Class<?> scheduler) {
5561
this.scheduler = scheduler;
5662
}
5763

64+
@Override
65+
public void setQueue(Class<?> queueType) {
66+
this.queueType = queueType;
67+
}
68+
69+
@Override
70+
public void setQueueLength(int queueLength) {
71+
this.queueLength = queueLength;
72+
}
73+
5874
@Override
5975
public void report() {
6076
commit();

dd-java-agent/agent-profiling/profiling-ddprof/src/main/java/com/datadog/profiling/ddprof/QueueTimeTracker.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ public class QueueTimeTracker implements QueueTiming {
1313
// FIXME this can be eliminated by altering the instrumentation
1414
// since it is known when the item is polled from the queue
1515
private Class<?> scheduler;
16+
private Class<?> queue;
17+
private int queueLength;
1618

1719
public QueueTimeTracker(DatadogProfiler profiler, long startTicks) {
1820
this.profiler = profiler;
@@ -31,6 +33,16 @@ public void setScheduler(Class<?> scheduler) {
3133
this.scheduler = scheduler;
3234
}
3335

36+
@Override
37+
public void setQueue(Class<?> queue) {
38+
this.queue = queue;
39+
}
40+
41+
@Override
42+
public void setQueueLength(int queueLength) {
43+
this.queueLength = queueLength;
44+
}
45+
3446
@Override
3547
public void report() {
3648
assert weakTask != null && scheduler != null;

dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/QueuedCommandInstrumentation.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.nio.channels.Channel;
2121
import java.util.Collections;
2222
import java.util.Map;
23+
import java.util.concurrent.ConcurrentLinkedQueue;
2324
import net.bytebuddy.asm.Advice;
2425

2526
@AutoService(InstrumenterModule.class)
@@ -66,7 +67,14 @@ public static final class Construct {
6667
public static void after(@Advice.This Object command) {
6768
ContextStore<Object, State> contextStore = InstrumentationContext.get(QUEUED_COMMAND, STATE);
6869
capture(contextStore, command);
69-
QueueTimerHelper.startQueuingTimer(contextStore, Channel.class, command);
70+
// FIXME hard to handle both the lifecyle and get access to the queue instance in the same
71+
// frame within the WriteQueue class.
72+
// This means we can't get the queue length. A (bad) alternative would be to instrument
73+
// ConcurrentLinkedQueue broadly,
74+
// or we could write more brittle instrumentation targeting code patterns in different gRPC
75+
// versions.
76+
QueueTimerHelper.startQueuingTimer(
77+
contextStore, Channel.class, ConcurrentLinkedQueue.class, 0, command);
7078
}
7179
}
7280

dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/executor/ThreadPoolExecutorInstrumentation.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,7 @@
2626
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
2727
import datadog.trace.bootstrap.instrumentation.java.concurrent.TPEHelper;
2828
import datadog.trace.bootstrap.instrumentation.java.concurrent.Wrapper;
29-
import java.util.Arrays;
30-
import java.util.Collection;
31-
import java.util.Collections;
32-
import java.util.HashMap;
33-
import java.util.Map;
29+
import java.util.*;
3430
import java.util.concurrent.RunnableFuture;
3531
import java.util.concurrent.ThreadPoolExecutor;
3632
import net.bytebuddy.asm.Advice;
@@ -162,12 +158,20 @@ public static void capture(
162158
// excluded as
163159
// Runnables but it is not until now that they will be put on the executor's queue
164160
if (!exclude(RUNNABLE, task)) {
161+
Queue<?> queue = tpe.getQueue();
165162
QueueTimerHelper.startQueuingTimer(
166-
InstrumentationContext.get(Runnable.class, State.class), tpe.getClass(), task);
163+
InstrumentationContext.get(Runnable.class, State.class),
164+
tpe.getClass(),
165+
queue.getClass(),
166+
queue.size(),
167+
task);
167168
} else if (!exclude(RUNNABLE_FUTURE, task) && task instanceof RunnableFuture) {
169+
Queue<?> queue = tpe.getQueue();
168170
QueueTimerHelper.startQueuingTimer(
169171
InstrumentationContext.get(RunnableFuture.class, State.class),
170172
tpe.getClass(),
173+
queue.getClass(),
174+
queue.size(),
171175
(RunnableFuture<?>) task);
172176
}
173177
}

dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/forkjoin/JavaForkJoinPoolInstrumentation.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@
1616
import datadog.trace.agent.tooling.InstrumenterModule;
1717
import datadog.trace.bootstrap.ContextStore;
1818
import datadog.trace.bootstrap.InstrumentationContext;
19-
import datadog.trace.bootstrap.instrumentation.java.concurrent.QueueTimerHelper;
2019
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
2120
import java.util.Map;
22-
import java.util.concurrent.ForkJoinPool;
2321
import java.util.concurrent.ForkJoinTask;
2422
import net.bytebuddy.asm.Advice;
2523

@@ -53,13 +51,11 @@ public void methodAdvice(MethodTransformer transformer) {
5351
public static final class ExternalPush {
5452
@SuppressWarnings("rawtypes")
5553
@Advice.OnMethodEnter
56-
public static <T> void externalPush(
57-
@Advice.This ForkJoinPool pool, @Advice.Argument(0) ForkJoinTask<T> task) {
54+
public static <T> void externalPush(@Advice.Argument(0) ForkJoinTask<T> task) {
5855
if (!exclude(FORK_JOIN_TASK, task)) {
5956
ContextStore<ForkJoinTask, State> contextStore =
6057
InstrumentationContext.get(ForkJoinTask.class, State.class);
6158
capture(contextStore, task);
62-
QueueTimerHelper.startQueuingTimer(contextStore, pool.getClass(), task);
6359
}
6460
}
6561

@@ -74,13 +70,11 @@ public static <T> void cleanup(
7470

7571
public static final class PoolSubmit {
7672
@Advice.OnMethodEnter
77-
public static <T> void poolSubmit(
78-
@Advice.This ForkJoinPool pool, @Advice.Argument(1) ForkJoinTask<T> task) {
73+
public static <T> void poolSubmit(@Advice.Argument(1) ForkJoinTask<T> task) {
7974
if (!exclude(FORK_JOIN_TASK, task)) {
8075
ContextStore<ForkJoinTask, State> contextStore =
8176
InstrumentationContext.get(ForkJoinTask.class, State.class);
8277
capture(contextStore, task);
83-
QueueTimerHelper.startQueuingTimer(contextStore, pool.getClass(), task);
8478
}
8579
}
8680

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package datadog.trace.instrumentation.java.concurrent.forkjoin;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.ExcludeType.FORK_JOIN_TASK;
5+
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.exclude;
6+
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.EXECUTOR_INSTRUMENTATION_NAME;
7+
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.FORK_JOIN_POOL_INSTRUMENTATION_NAME;
8+
import static java.util.Collections.singletonMap;
9+
import static net.bytebuddy.matcher.ElementMatchers.*;
10+
11+
import com.google.auto.service.AutoService;
12+
import datadog.trace.agent.tooling.Instrumenter;
13+
import datadog.trace.agent.tooling.InstrumenterModule;
14+
import datadog.trace.api.config.ProfilingConfig;
15+
import datadog.trace.bootstrap.ContextStore;
16+
import datadog.trace.bootstrap.InstrumentationContext;
17+
import datadog.trace.bootstrap.config.provider.ConfigProvider;
18+
import datadog.trace.bootstrap.instrumentation.java.concurrent.QueueTimerHelper;
19+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
20+
import java.util.Map;
21+
import java.util.concurrent.ForkJoinPool;
22+
import java.util.concurrent.ForkJoinTask;
23+
import net.bytebuddy.asm.Advice;
24+
25+
@AutoService(InstrumenterModule.class)
26+
public class JavaForkJoinWorkQueueInstrumentation extends InstrumenterModule.Profiling
27+
implements Instrumenter.ForBootstrap, Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
28+
29+
public JavaForkJoinWorkQueueInstrumentation() {
30+
super(
31+
EXECUTOR_INSTRUMENTATION_NAME,
32+
FORK_JOIN_POOL_INSTRUMENTATION_NAME,
33+
FORK_JOIN_POOL_INSTRUMENTATION_NAME + "-workqueue");
34+
}
35+
36+
@Override
37+
public String instrumentedType() {
38+
return "java.util.concurrent.ForkJoinPool$WorkQueue";
39+
}
40+
41+
@Override
42+
public boolean isEnabled() {
43+
return super.isEnabled()
44+
&& ConfigProvider.getInstance()
45+
.getBoolean(
46+
ProfilingConfig.PROFILING_QUEUEING_TIME_ENABLED,
47+
ProfilingConfig.PROFILING_QUEUEING_TIME_ENABLED_DEFAULT);
48+
}
49+
50+
@Override
51+
public Map<String, String> contextStore() {
52+
return singletonMap("java.util.concurrent.ForkJoinTask", State.class.getName());
53+
}
54+
55+
@Override
56+
public void methodAdvice(MethodTransformer transformer) {
57+
String name = getClass().getName();
58+
transformer.applyAdvice(
59+
isMethod()
60+
.and(named("push"))
61+
.and(takesArgument(0, named("java.util.concurrent.ForkJoinTask")))
62+
.and(
63+
isDeclaredBy(
64+
declaresField(fieldType(int.class).and(named("top")))
65+
.and(declaresField(fieldType(int.class).and(named("base")))))),
66+
name + "$PushTask");
67+
}
68+
69+
public static final class PushTask {
70+
@SuppressWarnings("rawtypes")
71+
@Advice.OnMethodEnter
72+
public static <T> void push(
73+
@Advice.This Object workQueue,
74+
@Advice.FieldValue("top") int top,
75+
@Advice.FieldValue("base") int base,
76+
@Advice.Argument(0) ForkJoinTask<T> task) {
77+
if (!exclude(FORK_JOIN_TASK, task)) {
78+
ContextStore<ForkJoinTask, State> contextStore =
79+
InstrumentationContext.get(ForkJoinTask.class, State.class);
80+
QueueTimerHelper.startQueuingTimer(
81+
contextStore, ForkJoinPool.class, workQueue.getClass(), top - base, task);
82+
}
83+
}
84+
}
85+
}

dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/timer/JavaTimerInstrumentation.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
66
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.ExcludeType.RUNNABLE;
77
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.exclude;
8-
import static datadog.trace.bootstrap.instrumentation.java.concurrent.QueueTimerHelper.startQueuingTimer;
98
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.EXECUTOR_INSTRUMENTATION_NAME;
109
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.RUNNABLE_INSTRUMENTATION_NAME;
1110
import static java.util.Collections.singletonMap;
@@ -21,7 +20,6 @@
2120
import datadog.trace.bootstrap.InstrumentationContext;
2221
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
2322
import java.util.Map;
24-
import java.util.Timer;
2523
import java.util.TimerTask;
2624
import net.bytebuddy.asm.Advice;
2725

@@ -67,7 +65,6 @@ public static void before(@Advice.Argument(0) TimerTask task, @Advice.Argument(2
6765
ContextStore<Runnable, State> contextStore =
6866
InstrumentationContext.get(Runnable.class, State.class);
6967
capture(contextStore, task);
70-
startQueuingTimer(contextStore, Timer.class, task);
7168
}
7269
}
7370

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package datadog.trace.instrumentation.java.concurrent.timer;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.EXECUTOR_INSTRUMENTATION_NAME;
5+
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.RUNNABLE_INSTRUMENTATION_NAME;
6+
import static java.util.Collections.singletonMap;
7+
import static net.bytebuddy.matcher.ElementMatchers.*;
8+
9+
import com.google.auto.service.AutoService;
10+
import datadog.trace.agent.tooling.Instrumenter;
11+
import datadog.trace.agent.tooling.InstrumenterModule;
12+
import datadog.trace.api.config.ProfilingConfig;
13+
import datadog.trace.bootstrap.ContextStore;
14+
import datadog.trace.bootstrap.InstrumentationContext;
15+
import datadog.trace.bootstrap.config.provider.ConfigProvider;
16+
import datadog.trace.bootstrap.instrumentation.java.concurrent.QueueTimerHelper;
17+
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
18+
import java.util.Map;
19+
import java.util.Timer;
20+
import java.util.TimerTask;
21+
import net.bytebuddy.asm.Advice;
22+
23+
@AutoService(InstrumenterModule.class)
24+
public class TaskQueueInstrumentation extends InstrumenterModule.Profiling
25+
implements Instrumenter.ForBootstrap, Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
26+
public TaskQueueInstrumentation() {
27+
super("java_timer", EXECUTOR_INSTRUMENTATION_NAME, RUNNABLE_INSTRUMENTATION_NAME, "task-queue");
28+
}
29+
30+
@Override
31+
public boolean isEnabled() {
32+
return super.isEnabled()
33+
&& ConfigProvider.getInstance()
34+
.getBoolean(
35+
ProfilingConfig.PROFILING_QUEUEING_TIME_ENABLED,
36+
ProfilingConfig.PROFILING_QUEUEING_TIME_ENABLED_DEFAULT);
37+
}
38+
39+
@Override
40+
public String instrumentedType() {
41+
return "java.util.TaskQueue";
42+
}
43+
44+
@Override
45+
public Map<String, String> contextStore() {
46+
return singletonMap("java.util.TimerTask", State.class.getName());
47+
}
48+
49+
@Override
50+
public void methodAdvice(MethodTransformer transformer) {
51+
transformer.applyAdvice(
52+
isMethod()
53+
.and(isPrivate())
54+
.and(
55+
named("add")
56+
.and(takesArguments(1))
57+
.and(takesArgument(0, named("java.util.TimerTask")))
58+
.and(isDeclaredBy(declaresField(fieldType(int.class).and(named("size")))))),
59+
getClass().getName() + "$AddTask");
60+
}
61+
62+
public static final class AddTask {
63+
@Advice.OnMethodEnter(suppress = Throwable.class)
64+
public static void before(
65+
@Advice.This Object taskQueue,
66+
@Advice.FieldValue("size") int size,
67+
@Advice.Argument(0) TimerTask task) {
68+
ContextStore<TimerTask, State> contextStore =
69+
InstrumentationContext.get(TimerTask.class, State.class);
70+
QueueTimerHelper.startQueuingTimer(
71+
contextStore, Timer.class, taskQueue.getClass(), size, task);
72+
}
73+
}
74+
}

0 commit comments

Comments
 (0)