Skip to content

Commit daa7de1

Browse files
committed
HikariCP: Add a span when waiting on the pool
1 parent 7d20566 commit daa7de1

File tree

6 files changed

+378
-0
lines changed

6 files changed

+378
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package datadog.trace.instrumentation.jdbc;
2+
3+
/** Shared blocked getConnection() tracking ThreadLocking for Hikari. */
4+
public class HikariBlockedTracker {
5+
private static final ThreadLocal<Boolean> tracker = ThreadLocal.withInitial(() -> false);
6+
7+
public static void clearBlocked() {
8+
tracker.set(false);
9+
}
10+
11+
public static void setBlocked() {
12+
tracker.set(true);
13+
}
14+
15+
public static boolean wasBlocked() {
16+
return tracker.get();
17+
}
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package datadog.trace.instrumentation.jdbc;
2+
3+
import java.util.concurrent.SynchronousQueue;
4+
import java.util.concurrent.TimeUnit;
5+
6+
/** Blocked getConnection() tracking for Hikari starting with commit f0b3c520c. */
7+
public class HikariBlockedTrackingSynchronousQueue<T> extends SynchronousQueue<T> {
8+
public HikariBlockedTrackingSynchronousQueue() {
9+
// This assumes the initialization of the SynchronousQueue in ConcurrentBag doesn't change
10+
super(true);
11+
}
12+
13+
@Override
14+
public T poll(long timeout, TimeUnit unit) throws InterruptedException {
15+
HikariBlockedTracker.setBlocked();
16+
return super.poll(timeout, unit);
17+
}
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package datadog.trace.instrumentation.jdbc;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
5+
import static datadog.trace.bootstrap.instrumentation.api.Tags.DB_POOL_NAME;
6+
import static java.util.Collections.singletonMap;
7+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
8+
9+
import com.google.auto.service.AutoService;
10+
import com.zaxxer.hikari.pool.HikariPool;
11+
import com.zaxxer.hikari.util.ConcurrentBag;
12+
import datadog.trace.agent.tooling.Instrumenter;
13+
import datadog.trace.agent.tooling.InstrumenterModule;
14+
import datadog.trace.bootstrap.InstrumentationContext;
15+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
16+
import java.lang.reflect.Field;
17+
import java.util.Map;
18+
import java.util.concurrent.TimeUnit;
19+
import net.bytebuddy.asm.Advice;
20+
21+
/**
22+
* Instrument Hikari's ConcurrentBag class to detect when blocking occurs trying to get an entry
23+
* from the connection pool.
24+
*/
25+
@AutoService(InstrumenterModule.class)
26+
public final class HikariConcurrentBagInstrumentation extends InstrumenterModule.Tracing
27+
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
28+
29+
public HikariConcurrentBagInstrumentation() {
30+
super("jdbc-datasource");
31+
}
32+
33+
@Override
34+
public String instrumentedType() {
35+
return "com.zaxxer.hikari.util.ConcurrentBag";
36+
}
37+
38+
@Override
39+
public String[] helperClassNames() {
40+
return new String[] {
41+
packageName + ".HikariBlockedTrackingSynchronousQueue", packageName + ".HikariBlockedTracker"
42+
};
43+
}
44+
45+
@Override
46+
public Map<String, String> contextStore() {
47+
// For getting the poolName
48+
return singletonMap("com.zaxxer.hikari.util.ConcurrentBag", String.class.getName());
49+
}
50+
51+
@Override
52+
public void methodAdvice(MethodTransformer transformer) {
53+
transformer.applyAdvice(
54+
isConstructor(), HikariConcurrentBagInstrumentation.class.getName() + "$ConstructorAdvice");
55+
transformer.applyAdvice(
56+
named("borrow"), HikariConcurrentBagInstrumentation.class.getName() + "$BorrowAdvice");
57+
}
58+
59+
public static class ConstructorAdvice {
60+
@Advice.OnMethodExit(suppress = Throwable.class)
61+
static void after(@Advice.This ConcurrentBag<?> thiz)
62+
throws IllegalAccessException, NoSuchFieldException {
63+
try {
64+
Field handoffQueueField = thiz.getClass().getDeclaredField("handoffQueue");
65+
handoffQueueField.setAccessible(true);
66+
handoffQueueField.set(thiz, new HikariBlockedTrackingSynchronousQueue<>());
67+
} catch (NoSuchFieldException e) {
68+
// ignore -- see HikariQueuedSequenceSynchronizerInstrumentation for older Hikari versions
69+
}
70+
71+
Field hikariPoolField = thiz.getClass().getDeclaredField("listener");
72+
hikariPoolField.setAccessible(true);
73+
HikariPool hikariPool = (HikariPool) hikariPoolField.get(thiz);
74+
75+
/*
76+
* In earlier versions of Hikari, poolName is directly inside HikariPool, and
77+
* in later versions it is in the PoolBase superclass.
78+
*/
79+
final Class<?> hikariPoolSuper = hikariPool.getClass().getSuperclass();
80+
final Class<?> poolNameContainingClass;
81+
if (!hikariPoolSuper.getName().equals("java.lang.Object")) {
82+
poolNameContainingClass = hikariPoolSuper;
83+
} else {
84+
poolNameContainingClass = hikariPool.getClass();
85+
}
86+
Field poolNameField = poolNameContainingClass.getDeclaredField("poolName");
87+
poolNameField.setAccessible(true);
88+
String poolName = (String) poolNameField.get(hikariPool);
89+
InstrumentationContext.get(ConcurrentBag.class, String.class).put(thiz, poolName);
90+
}
91+
}
92+
93+
public static class BorrowAdvice {
94+
private static final String POOL_WAITING = "pool.waiting";
95+
96+
@Advice.OnMethodEnter(suppress = Throwable.class)
97+
public static Long onEnter() {
98+
HikariBlockedTracker.clearBlocked();
99+
return System.currentTimeMillis();
100+
}
101+
102+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
103+
public static void stopSpan(
104+
@Advice.This ConcurrentBag thiz,
105+
@Advice.Enter final Long startTimeMillis,
106+
@Advice.Thrown final Throwable throwable) {
107+
if (HikariBlockedTracker.wasBlocked()) {
108+
final AgentSpan span =
109+
startSpan("hikari", POOL_WAITING, TimeUnit.MILLISECONDS.toMicros(startTimeMillis));
110+
final String poolName =
111+
InstrumentationContext.get(ConcurrentBag.class, String.class).get(thiz);
112+
if (poolName != null) {
113+
span.setTag(DB_POOL_NAME, poolName);
114+
}
115+
// XXX should we do anything with the throwable?
116+
span.finish();
117+
}
118+
HikariBlockedTracker.clearBlocked();
119+
}
120+
}
121+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package datadog.trace.instrumentation.jdbc;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
5+
import com.google.auto.service.AutoService;
6+
import datadog.trace.agent.tooling.Instrumenter;
7+
import datadog.trace.agent.tooling.InstrumenterModule;
8+
import net.bytebuddy.asm.Advice;
9+
10+
/** Blocked getConnection() tracking for Hikari starting before commit f0b3c520c. */
11+
@AutoService(InstrumenterModule.class)
12+
public final class HikariQueuedSequenceSynchronizerInstrumentation
13+
extends InstrumenterModule.Tracing
14+
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
15+
16+
public HikariQueuedSequenceSynchronizerInstrumentation() {
17+
super("jdbc-datasource");
18+
}
19+
20+
@Override
21+
public String instrumentedType() {
22+
return "com.zaxxer.hikari.util.QueuedSequenceSynchronizer";
23+
}
24+
25+
@Override
26+
public void methodAdvice(MethodTransformer transformer) {
27+
transformer.applyAdvice(
28+
named("waitUntilSequenceExceeded"),
29+
HikariQueuedSequenceSynchronizerInstrumentation.class.getName()
30+
+ "$WaitUntilSequenceExceededAdvice");
31+
}
32+
33+
public static class WaitUntilSequenceExceededAdvice {
34+
@Advice.OnMethodEnter(suppress = Throwable.class)
35+
public static void onEnter() {
36+
HikariBlockedTracker.setBlocked();
37+
}
38+
}
39+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import datadog.trace.agent.test.AgentTestRunner
2+
import com.zaxxer.hikari.HikariConfig
3+
import com.zaxxer.hikari.HikariDataSource
4+
import test.TestDataSource
5+
6+
import java.sql.SQLTimeoutException
7+
import java.sql.SQLTransientConnectionException
8+
9+
/**
10+
* Ideas taken from Hikari's com.zaxxer.hikari.pool.TestSaturatedPool830.
11+
*/
12+
class SaturatedPoolBlockingTest extends AgentTestRunner {
13+
def "saturated pool test"(int connectionTimeout, Long exhaustPoolForMillis, int expectedWaitingSpans, boolean expectedTimeout) {
14+
setup:
15+
TEST_WRITER.setFilter((trace) -> trace.get(0).getOperationName() == "test.when")
16+
17+
final HikariConfig config = new HikariConfig()
18+
config.setPoolName("testPool")
19+
config.setMaximumPoolSize(1)
20+
config.setConnectionTimeout(connectionTimeout)
21+
config.setDataSourceClassName(TestDataSource.class.getName())
22+
final HikariDataSource ds = new HikariDataSource(config)
23+
24+
when:
25+
if (exhaustPoolForMillis != null) {
26+
def saturatedConnection = ds.getConnection()
27+
new Thread(() -> {
28+
Thread.sleep(exhaustPoolForMillis)
29+
saturatedConnection.close()
30+
}, "saturated connection closer").start()
31+
}
32+
33+
def timedOut = false
34+
def span = TEST_TRACER.startSpan("test", "test.when")
35+
try (def ignore = TEST_TRACER.activateSpan(span)) {
36+
def connection = ds.getConnection()
37+
connection.close()
38+
} catch (SQLTransientConnectionException e) {
39+
if (e.getMessage().contains("request timed out after")) {
40+
// Hikari, newer
41+
timedOut = true
42+
} else {
43+
throw e
44+
}
45+
} catch (SQLTimeoutException ignored) {
46+
// Hikari, older
47+
timedOut = true
48+
}
49+
span.finish()
50+
51+
then:
52+
def waiting = TEST_WRITER.firstTrace().findAll {
53+
element -> element.getOperationName() == "pool.waiting"
54+
}
55+
56+
print(TEST_WRITER.firstTrace())
57+
58+
verifyAll {
59+
TEST_WRITER.size() == 1
60+
waiting.size() == expectedWaitingSpans
61+
timedOut == expectedTimeout
62+
}
63+
64+
where:
65+
connectionTimeout | exhaustPoolForMillis | expectedWaitingSpans | expectedTimeout
66+
1000 | null | 0 | false
67+
1000 | null | 0 | false
68+
1000 | 500 | 1 | false
69+
1000 | 1500 | 1 | true
70+
}
71+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright (C) 2013 Brett Wooldridge
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package test;
18+
19+
import java.io.PrintWriter;
20+
import java.sql.Connection;
21+
import java.sql.SQLException;
22+
import java.sql.SQLFeatureNotSupportedException;
23+
import java.util.logging.Logger;
24+
import javax.sql.DataSource;
25+
26+
/**
27+
* Test DataSource. Derived from Hikari's StubDataSource.
28+
*
29+
* @author Brett Wooldridge
30+
*/
31+
public class TestDataSource implements DataSource {
32+
private String user;
33+
private String password;
34+
private PrintWriter logWriter;
35+
private SQLException throwException;
36+
private int loginTimeout;
37+
38+
public String getUser() {
39+
return user;
40+
}
41+
42+
public void setUser(String user) {
43+
this.user = user;
44+
}
45+
46+
public String getPassword() {
47+
return password;
48+
}
49+
50+
public void setURL(String url) {
51+
// we don't care
52+
}
53+
54+
/** {@inheritDoc} */
55+
@Override
56+
public PrintWriter getLogWriter() throws SQLException {
57+
return logWriter;
58+
}
59+
60+
/** {@inheritDoc} */
61+
@Override
62+
public void setLogWriter(PrintWriter out) throws SQLException {
63+
this.logWriter = out;
64+
}
65+
66+
/** {@inheritDoc} */
67+
@Override
68+
public void setLoginTimeout(int seconds) throws SQLException {
69+
this.loginTimeout = seconds;
70+
}
71+
72+
/** {@inheritDoc} */
73+
@Override
74+
public int getLoginTimeout() throws SQLException {
75+
return loginTimeout;
76+
}
77+
78+
/** {@inheritDoc} */
79+
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
80+
return null;
81+
}
82+
83+
/** {@inheritDoc} */
84+
@SuppressWarnings("unchecked")
85+
@Override
86+
public <T> T unwrap(Class<T> iface) throws SQLException {
87+
if (iface.isInstance(this)) {
88+
return (T) this;
89+
}
90+
91+
throw new SQLException("Wrapped DataSource is not an instance of " + iface);
92+
}
93+
94+
/** {@inheritDoc} */
95+
@Override
96+
public boolean isWrapperFor(Class<?> iface) throws SQLException {
97+
return false;
98+
}
99+
100+
/** {@inheritDoc} */
101+
@Override
102+
public Connection getConnection() throws SQLException {
103+
return new TestConnection(false);
104+
}
105+
106+
/** {@inheritDoc} */
107+
@Override
108+
public Connection getConnection(String username, String password) throws SQLException {
109+
return new TestConnection(false);
110+
}
111+
}

0 commit comments

Comments
 (0)