Skip to content

Commit c9fc731

Browse files
committed
HikariCP: Add a span when waiting on the pool
1 parent fb836a1 commit c9fc731

File tree

3 files changed

+409
-0
lines changed

3 files changed

+409
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
package datadog.trace.instrumentation.jdbc;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
5+
import static datadog.trace.bootstrap.instrumentation.api.Tags.DB_POOL_NAME;
6+
import static java.util.Collections.singletonMap;
7+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
8+
9+
import com.google.auto.service.AutoService;
10+
import com.zaxxer.hikari.pool.HikariPool;
11+
import com.zaxxer.hikari.util.ConcurrentBag;
12+
import datadog.trace.agent.tooling.Instrumenter;
13+
import datadog.trace.agent.tooling.InstrumenterModule;
14+
import datadog.trace.bootstrap.InstrumentationContext;
15+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
16+
import java.lang.reflect.Field;
17+
import java.util.Map;
18+
import java.util.concurrent.TimeUnit;
19+
import net.bytebuddy.asm.Advice;
20+
import net.bytebuddy.asm.AsmVisitorWrapper;
21+
import net.bytebuddy.description.field.FieldDescription;
22+
import net.bytebuddy.description.field.FieldList;
23+
import net.bytebuddy.description.method.MethodList;
24+
import net.bytebuddy.description.type.TypeDescription;
25+
import net.bytebuddy.implementation.Implementation;
26+
import net.bytebuddy.jar.asm.ClassVisitor;
27+
import net.bytebuddy.jar.asm.ClassWriter;
28+
import net.bytebuddy.jar.asm.MethodVisitor;
29+
import net.bytebuddy.jar.asm.Opcodes;
30+
import net.bytebuddy.jar.asm.Type;
31+
import net.bytebuddy.pool.TypePool;
32+
33+
/**
34+
* Instrument Hikari's ConcurrentBag class to detect when blocking occurs trying to get an entry
35+
* from the connection pool.
36+
*/
37+
@AutoService(InstrumenterModule.class)
38+
public final class HikariConcurrentBagInstrumentation extends InstrumenterModule.Tracing
39+
implements Instrumenter.ForSingleType,
40+
Instrumenter.HasTypeAdvice,
41+
Instrumenter.HasMethodAdvice {
42+
private static final String INSTRUMENTATION_NAME = "hikari";
43+
private static final String POOL_WAITING = "pool.waiting";
44+
45+
public HikariConcurrentBagInstrumentation() {
46+
super("jdbc-datasource");
47+
}
48+
49+
@Override
50+
public String instrumentedType() {
51+
return "com.zaxxer.hikari.util.ConcurrentBag";
52+
}
53+
54+
@Override
55+
public Map<String, String> contextStore() {
56+
// For getting the poolName
57+
return singletonMap("com.zaxxer.hikari.util.ConcurrentBag", String.class.getName());
58+
}
59+
60+
@Override
61+
public void typeAdvice(TypeTransformer transformer) {
62+
transformer.applyAdvice(new ConcurrentBagVisitorWrapper());
63+
}
64+
65+
@Override
66+
public void methodAdvice(MethodTransformer transformer) {
67+
transformer.applyAdvice(
68+
isConstructor(), HikariConcurrentBagInstrumentation.class.getName() + "$ConstructorAdvice");
69+
transformer.applyAdvice(
70+
named("borrow"), HikariConcurrentBagInstrumentation.class.getName() + "$BorrowAdvice");
71+
}
72+
73+
public static class ConstructorAdvice {
74+
@Advice.OnMethodExit(suppress = Throwable.class)
75+
static void after(
76+
@Advice.This ConcurrentBag<?> thiz,
77+
@Advice.FieldValue("listener") ConcurrentBag.IBagStateListener listener)
78+
throws IllegalAccessException, NoSuchFieldException {
79+
HikariPool hikariPool = (HikariPool) listener;
80+
81+
/*
82+
* In earlier versions of Hikari, poolName is directly inside HikariPool, and
83+
* in later versions it is in the PoolBase superclass.
84+
*/
85+
final Class<?> hikariPoolSuper = hikariPool.getClass().getSuperclass();
86+
final Class<?> poolNameContainingClass;
87+
if (!hikariPoolSuper.getName().equals("java.lang.Object")) {
88+
poolNameContainingClass = hikariPoolSuper;
89+
} else {
90+
poolNameContainingClass = hikariPool.getClass();
91+
}
92+
Field poolNameField = poolNameContainingClass.getDeclaredField("poolName");
93+
poolNameField.setAccessible(true);
94+
String poolName = (String) poolNameField.get(hikariPool);
95+
InstrumentationContext.get(ConcurrentBag.class, String.class).put(thiz, poolName);
96+
}
97+
}
98+
99+
public static class BorrowAdvice {
100+
@Advice.OnMethodEnter(suppress = Throwable.class)
101+
public static Long onEnter() {
102+
HikariWaitingTracker.clearWaiting();
103+
return System.currentTimeMillis();
104+
}
105+
106+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
107+
public static void stopSpan(
108+
@Advice.This ConcurrentBag thiz,
109+
@Advice.Enter final Long startTimeMillis,
110+
@Advice.Thrown final Throwable throwable) {
111+
if (HikariWaitingTracker.wasWaiting()) {
112+
final AgentSpan span =
113+
startSpan(
114+
INSTRUMENTATION_NAME,
115+
POOL_WAITING,
116+
TimeUnit.MILLISECONDS.toMicros(startTimeMillis));
117+
final String poolName =
118+
InstrumentationContext.get(ConcurrentBag.class, String.class).get(thiz);
119+
if (poolName != null) {
120+
span.setTag(DB_POOL_NAME, poolName);
121+
}
122+
// XXX should we do anything with the throwable?
123+
span.finish();
124+
}
125+
HikariWaitingTracker.clearWaiting();
126+
}
127+
}
128+
129+
private class ConcurrentBagVisitorWrapper implements AsmVisitorWrapper {
130+
@Override
131+
public int mergeWriter(int flags) {
132+
return flags | ClassWriter.COMPUTE_MAXS;
133+
}
134+
135+
@Override
136+
public int mergeReader(int flags) {
137+
return flags;
138+
}
139+
140+
@Override
141+
public ClassVisitor wrap(
142+
TypeDescription instrumentedType,
143+
ClassVisitor classVisitor,
144+
Implementation.Context implementationContext,
145+
TypePool typePool,
146+
FieldList<FieldDescription.InDefinedShape> fields,
147+
MethodList<?> methods,
148+
int writerFlags,
149+
int readerFlags) {
150+
return new ConcurrentBagClassVisitor(Opcodes.ASM8, classVisitor);
151+
}
152+
}
153+
154+
public static class ConcurrentBagClassVisitor extends ClassVisitor {
155+
public ConcurrentBagClassVisitor(int api, ClassVisitor cv) {
156+
super(api, cv);
157+
}
158+
159+
@Override
160+
public MethodVisitor visitMethod(
161+
int access, String name, String descriptor, String signature, String[] exceptions) {
162+
MethodVisitor superMv = super.visitMethod(access, name, descriptor, signature, exceptions);
163+
if ("borrow".equals(name)
164+
&& "(JLjava/util/concurrent/TimeUnit;)Lcom/zaxxer/hikari/util/ConcurrentBag$IConcurrentBagEntry;"
165+
.equals(descriptor)) {
166+
return new BorrowMethodVisitor(api, superMv);
167+
} else {
168+
return superMv;
169+
}
170+
}
171+
}
172+
173+
public static class BorrowMethodVisitor extends MethodVisitor {
174+
public BorrowMethodVisitor(int api, MethodVisitor superMv) {
175+
super(api, superMv);
176+
}
177+
178+
179+
/**
180+
* Adds a call to HikariWaitingTracker.setWaiting whenever Hikari is blocking waiting on a connection from the pool
181+
* to be available whenever either of these method calls happen (which one depends on Hikari version):
182+
* <br/>
183+
* <code>synchronizer.waitUntilSequenceExceeded(startSeq, timeout)</code>
184+
* -- <a href=" https://github.com/brettwooldridge/HikariCP/blob/5adf46c148dfa095886c7c754f365b0644dc04cb/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java#L159">prior to 2.6.0</a>
185+
* <br/>
186+
* <code>handoffQueue.poll(timeout, NANOSECONDS)</code>
187+
* -- <a href="https://github.com/brettwooldridge/HikariCP/blob/22cc9bde6c0fb54c8ac009122a20d2f579e1a54a/src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java#L162">2.6.0 and later</a>
188+
*/
189+
@Override
190+
public void visitMethodInsn(
191+
int opcode, String owner, String name, String descriptor, boolean isInterface) {
192+
if ((opcode == Opcodes.INVOKEVIRTUAL
193+
&& owner.equals("com/zaxxer/hikari/util/QueuedSequenceSynchronizer")
194+
&& name.equals("waitUntilSequenceExceeded")
195+
&& descriptor.equals("(JJ)Z"))
196+
|| (opcode == Opcodes.INVOKEVIRTUAL
197+
&& owner.equals("java/util/concurrent/SynchronousQueue")
198+
&& name.equals("poll")
199+
&& descriptor.equals("(JLjava/util/concurrent/TimeUnit;)Ljava/lang/Object;"))) {
200+
super.visitMethodInsn(
201+
Opcodes.INVOKESTATIC,
202+
Type.getInternalName(HikariWaitingTracker.class),
203+
"setWaiting",
204+
"()V",
205+
false);
206+
// original stack
207+
}
208+
super.visitMethodInsn(opcode, owner, name, descriptor, isInterface);
209+
}
210+
}
211+
212+
public static class HikariWaitingTracker {
213+
private static final ThreadLocal<Boolean> tracker = ThreadLocal.withInitial(() -> false);
214+
215+
public static void clearWaiting() {
216+
tracker.set(false);
217+
}
218+
219+
public static void setWaiting() {
220+
tracker.set(true);
221+
}
222+
223+
public static boolean wasWaiting() {
224+
return tracker.get();
225+
}
226+
}
227+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import datadog.trace.agent.test.AgentTestRunner
2+
import com.zaxxer.hikari.HikariConfig
3+
import com.zaxxer.hikari.HikariDataSource
4+
import test.TestDataSource
5+
6+
import java.sql.SQLTimeoutException
7+
import java.sql.SQLTransientConnectionException
8+
9+
/**
10+
* Ideas taken from Hikari's com.zaxxer.hikari.pool.TestSaturatedPool830.
11+
*/
12+
class SaturatedPoolBlockingTest extends AgentTestRunner {
13+
def "saturated pool test"(int connectionTimeout, Long exhaustPoolForMillis, int expectedWaitingSpans, boolean expectedTimeout) {
14+
setup:
15+
TEST_WRITER.setFilter((trace) -> trace.get(0).getOperationName() == "test.when")
16+
17+
final HikariConfig config = new HikariConfig()
18+
config.setPoolName("testPool")
19+
config.setMaximumPoolSize(1)
20+
config.setConnectionTimeout(connectionTimeout)
21+
config.setDataSourceClassName(TestDataSource.class.getName())
22+
final HikariDataSource ds = new HikariDataSource(config)
23+
24+
when:
25+
if (exhaustPoolForMillis != null) {
26+
def saturatedConnection = ds.getConnection()
27+
new Thread(() -> {
28+
Thread.sleep(exhaustPoolForMillis)
29+
saturatedConnection.close()
30+
}, "saturated connection closer").start()
31+
}
32+
33+
def timedOut = false
34+
def span = TEST_TRACER.startSpan("test", "test.when")
35+
try (def ignore = TEST_TRACER.activateSpan(span)) {
36+
def connection = ds.getConnection()
37+
connection.close()
38+
} catch (SQLTransientConnectionException e) {
39+
if (e.getMessage().contains("request timed out after")) {
40+
// Hikari, newer
41+
timedOut = true
42+
} else {
43+
throw e
44+
}
45+
} catch (SQLTimeoutException ignored) {
46+
// Hikari, older
47+
timedOut = true
48+
}
49+
span.finish()
50+
51+
then:
52+
def waiting = TEST_WRITER.firstTrace().findAll {
53+
element -> element.getOperationName() == "pool.waiting"
54+
}
55+
56+
print(TEST_WRITER.firstTrace())
57+
58+
verifyAll {
59+
TEST_WRITER.size() == 1
60+
waiting.size() == expectedWaitingSpans
61+
timedOut == expectedTimeout
62+
}
63+
64+
where:
65+
connectionTimeout | exhaustPoolForMillis | expectedWaitingSpans | expectedTimeout
66+
1000 | null | 0 | false
67+
1000 | null | 0 | false
68+
1000 | 500 | 1 | false
69+
1000 | 1500 | 1 | true
70+
}
71+
}

0 commit comments

Comments
 (0)