Skip to content

Commit 8561cf6

Browse files
committed
Instrumenting RequestHandler with tracing: added empty spans.
Every request is covered by one "request" span, and every speculative execution has its own "speculative_execution.n" span, where n is number of that (possibly speculative) execution. Retries are performed in the same execution and therefore do not increase that number, whereas speculative executions do. Each retry is covered by "query" span, effectively yielding a tree of form: "request" -1--many-> "speculative_execution.n" -1--many-> "query"
1 parent d77492b commit 8561cf6

File tree

2 files changed

+59
-7
lines changed

2 files changed

+59
-7
lines changed

driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.datastax.driver.core.policies.RetryPolicy;
4141
import com.datastax.driver.core.policies.RetryPolicy.RetryDecision.Type;
4242
import com.datastax.driver.core.policies.SpeculativeExecutionPolicy.SpeculativeExecutionPlan;
43+
import com.datastax.driver.core.tracing.TracingInfo;
4344
import com.google.common.collect.ImmutableList;
4445
import com.google.common.collect.Iterators;
4546
import com.google.common.collect.Sets;
@@ -95,6 +96,8 @@ class RequestHandler {
9596
private final AtomicBoolean isDone = new AtomicBoolean();
9697
private final AtomicInteger executionIndex = new AtomicInteger();
9798

99+
private final TracingInfo tracingInfo;
100+
98101
private Iterator<Host> getReplicas(
99102
String loggedKeyspace, Statement statement, Iterator<Host> fallback) {
100103
ProtocolVersion protocolVersion = manager.cluster.manager.protocolVersion();
@@ -120,7 +123,8 @@ private Iterator<Host> getReplicas(
120123
return replicas.iterator();
121124
}
122125

123-
public RequestHandler(SessionManager manager, Callback callback, Statement statement) {
126+
public RequestHandler(
127+
SessionManager manager, Callback callback, Statement statement, TracingInfo tracingInfo) {
124128
this.id = Long.toString(System.identityHashCode(this));
125129
if (logger.isTraceEnabled()) logger.trace("[{}] {}", id, statement);
126130
this.manager = manager;
@@ -156,6 +160,9 @@ public RequestHandler(SessionManager manager, Callback callback, Statement state
156160

157161
this.timerContext = metricsEnabled() ? metrics().getRequestsTimer().time() : null;
158162
this.startTime = System.nanoTime();
163+
164+
this.tracingInfo = tracingInfo;
165+
this.tracingInfo.setNameAndStartTime("request");
159166
}
160167

161168
void sendRequest() {
@@ -274,13 +281,16 @@ private void setFinalResult(
274281
logServerWarnings(response.warnings);
275282
}
276283
callback.onSet(connection, response, info, statement, System.nanoTime() - startTime);
284+
tracingInfo.tracingFinished();
277285
} catch (Exception e) {
278286
callback.onException(
279287
connection,
280288
new DriverInternalError(
281289
"Unexpected exception while setting final result from " + response, e),
282290
System.nanoTime() - startTime, /*unused*/
283291
0);
292+
293+
tracingInfo.tracingFinished();
284294
}
285295
}
286296

@@ -305,6 +315,8 @@ private void setFinalException(
305315

306316
cancelPendingExecutions(execution);
307317

318+
tracingInfo.tracingFinished();
319+
308320
try {
309321
if (timerContext != null) timerContext.stop();
310322
} finally {
@@ -315,6 +327,7 @@ private void setFinalException(
315327
// Triggered when an execution reaches the end of the query plan.
316328
// This is only a failure if there are no other running executions.
317329
private void reportNoMoreHosts(SpeculativeExecution execution) {
330+
execution.parentTracingInfo.tracingFinished();
318331
runningExecutions.remove(execution);
319332
if (runningExecutions.isEmpty())
320333
setFinalException(
@@ -383,11 +396,17 @@ class SpeculativeExecution implements Connection.ResponseCallback {
383396

384397
private volatile Connection.ResponseHandler connectionHandler;
385398

399+
private final TracingInfo parentTracingInfo;
400+
private TracingInfo currentChildTracingInfo;
401+
386402
SpeculativeExecution(Message.Request request, int position) {
387403
this.id = RequestHandler.this.id + "-" + position;
388404
this.request = request;
389405
this.position = position;
390406
this.queryStateRef = new AtomicReference<QueryState>(QueryState.INITIAL);
407+
this.parentTracingInfo =
408+
manager.getTracingInfoFactory().buildTracingInfo(RequestHandler.this.tracingInfo);
409+
this.parentTracingInfo.setNameAndStartTime("speculative_execution." + position);
391410
if (logger.isTraceEnabled()) logger.trace("[{}] Starting", id);
392411
}
393412

@@ -429,6 +448,9 @@ private boolean query(final Host host) {
429448

430449
if (logger.isTraceEnabled()) logger.trace("[{}] Querying node {}", id, host);
431450

451+
currentChildTracingInfo = manager.getTracingInfoFactory().buildTracingInfo(parentTracingInfo);
452+
currentChildTracingInfo.setNameAndStartTime("query");
453+
432454
if (allowSpeculativeExecutions && nextExecutionScheduled.compareAndSet(false, true))
433455
scheduleExecution(speculativeExecutionPlan.nextExecution(host));
434456

@@ -647,6 +669,7 @@ void cancel() {
647669
CancelledSpeculativeExecutionException.INSTANCE,
648670
System.nanoTime() - startTime);
649671
}
672+
parentTracingInfo.tracingFinished();
650673
return;
651674
} else if (!previous.inProgress
652675
&& queryStateRef.compareAndSet(previous, QueryState.CANCELLED_WHILE_COMPLETE)) {
@@ -659,6 +682,7 @@ void cancel() {
659682
CancelledSpeculativeExecutionException.INSTANCE,
660683
System.nanoTime() - startTime);
661684
}
685+
parentTracingInfo.tracingFinished();
662686
return;
663687
}
664688
}
@@ -674,6 +698,8 @@ public Message.Request request() {
674698
@Override
675699
public void onSet(
676700
Connection connection, Message.Response response, long latency, int retryCount) {
701+
currentChildTracingInfo.tracingFinished();
702+
677703
QueryState queryState = queryStateRef.get();
678704
if (!queryState.isInProgressAt(retryCount)
679705
|| !queryStateRef.compareAndSet(queryState, queryState.complete())) {
@@ -832,7 +858,10 @@ public void onSet(
832858
toPrepare.getQueryKeyspace(),
833859
connection.endPoint);
834860

835-
write(connection, prepareAndRetry(toPrepare.getQueryString()));
861+
TracingInfo prepareTracingInfo =
862+
manager.getTracingInfoFactory().buildTracingInfo(parentTracingInfo);
863+
prepareTracingInfo.setNameAndStartTime("prepare");
864+
write(connection, prepareAndRetry(toPrepare.getQueryString(), prepareTracingInfo));
836865
// we're done for now, the prepareAndRetry callback will handle the rest
837866
return;
838867
case READ_FAILURE:
@@ -878,7 +907,8 @@ public void onSet(
878907
}
879908
}
880909

881-
private Connection.ResponseCallback prepareAndRetry(final String toPrepare) {
910+
private Connection.ResponseCallback prepareAndRetry(
911+
final String toPrepare, final TracingInfo prepareTracingInfo) {
882912
// do not bother inspecting retry policy at this step, no other decision
883913
// makes sense than retry on the same host if the query was prepared,
884914
// or on another host, if an error/timeout occurred.
@@ -902,6 +932,8 @@ public int retryCount() {
902932
@Override
903933
public void onSet(
904934
Connection connection, Message.Response response, long latency, int retryCount) {
935+
prepareTracingInfo.tracingFinished();
936+
905937
QueryState queryState = queryStateRef.get();
906938
if (!queryState.isInProgressAt(retryCount)
907939
|| !queryStateRef.compareAndSet(queryState, queryState.complete())) {
@@ -944,11 +976,14 @@ public void onSet(
944976
@Override
945977
public void onException(
946978
Connection connection, Exception exception, long latency, int retryCount) {
979+
prepareTracingInfo.tracingFinished();
947980
SpeculativeExecution.this.onException(connection, exception, latency, retryCount);
948981
}
949982

950983
@Override
951984
public boolean onTimeout(Connection connection, long latency, int retryCount) {
985+
prepareTracingInfo.tracingFinished();
986+
952987
QueryState queryState = queryStateRef.get();
953988
if (!queryState.isInProgressAt(retryCount)
954989
|| !queryStateRef.compareAndSet(queryState, queryState.complete())) {
@@ -973,6 +1008,8 @@ public boolean onTimeout(Connection connection, long latency, int retryCount) {
9731008
@Override
9741009
public void onException(
9751010
Connection connection, Exception exception, long latency, int retryCount) {
1011+
currentChildTracingInfo.tracingFinished();
1012+
9761013
QueryState queryState = queryStateRef.get();
9771014
if (!queryState.isInProgressAt(retryCount)
9781015
|| !queryStateRef.compareAndSet(queryState, queryState.complete())) {
@@ -1010,6 +1047,8 @@ public void onException(
10101047

10111048
@Override
10121049
public boolean onTimeout(Connection connection, long latency, int retryCount) {
1050+
currentChildTracingInfo.tracingFinished();
1051+
10131052
QueryState queryState = queryStateRef.get();
10141053
if (!queryState.isInProgressAt(retryCount)
10151054
|| !queryStateRef.compareAndSet(queryState, queryState.complete())) {
@@ -1051,10 +1090,12 @@ public int retryCount() {
10511090
}
10521091

10531092
private void setFinalException(Connection connection, Exception exception) {
1093+
parentTracingInfo.tracingFinished();
10541094
RequestHandler.this.setFinalException(this, connection, exception);
10551095
}
10561096

10571097
private void setFinalResult(Connection connection, Message.Response response) {
1098+
parentTracingInfo.tracingFinished();
10581099
RequestHandler.this.setFinalResult(this, connection, response);
10591100
}
10601101
}

driver-core/src/main/java/com/datastax/driver/core/SessionManager.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.datastax.driver.core.policies.LoadBalancingPolicy;
3030
import com.datastax.driver.core.policies.ReconnectionPolicy;
3131
import com.datastax.driver.core.policies.SpeculativeExecutionPolicy;
32+
import com.datastax.driver.core.tracing.TracingInfo;
3233
import com.datastax.driver.core.tracing.TracingInfoFactory;
3334
import com.datastax.driver.core.utils.MoreFutures;
3435
import com.google.common.base.Functions;
@@ -161,6 +162,7 @@ public ResultSetFuture executeAsync(final Statement statement) {
161162
// Because of the way the future is built, we need another 'proxy' future that we can return
162163
// now.
163164
final ChainedResultSetFuture chainedFuture = new ChainedResultSetFuture();
165+
final TracingInfo tracingInfo = getTracingInfoFactory().buildTracingInfo();
164166
this.initAsync()
165167
.addListener(
166168
new Runnable() {
@@ -171,7 +173,7 @@ public void run() {
171173
SessionManager.this,
172174
cluster.manager.protocolVersion(),
173175
makeRequestMessage(statement, null));
174-
execute(actualFuture, statement);
176+
execute(actualFuture, statement, tracingInfo);
175177
chainedFuture.setSource(actualFuture);
176178
}
177179
},
@@ -712,25 +714,34 @@ else if (fetchSize != Integer.MAX_VALUE)
712714
* <p>This method will find a suitable node to connect to using the {@link LoadBalancingPolicy}
713715
* and handle host failover.
714716
*/
715-
void execute(final RequestHandler.Callback callback, final Statement statement) {
717+
void execute(
718+
final RequestHandler.Callback callback,
719+
final Statement statement,
720+
final TracingInfo tracingInfo) {
716721
if (this.isClosed()) {
717722
callback.onException(
718723
null, new IllegalStateException("Could not send request, session is closed"), 0, 0);
719724
return;
720725
}
721-
if (isInit) new RequestHandler(this, callback, statement).sendRequest();
726+
if (isInit) new RequestHandler(this, callback, statement, tracingInfo).sendRequest();
722727
else
723728
this.initAsync()
724729
.addListener(
725730
new Runnable() {
726731
@Override
727732
public void run() {
728-
new RequestHandler(SessionManager.this, callback, statement).sendRequest();
733+
new RequestHandler(SessionManager.this, callback, statement, tracingInfo)
734+
.sendRequest();
729735
}
730736
},
731737
executor());
732738
}
733739

740+
void execute(final RequestHandler.Callback callback, final Statement statement) {
741+
final TracingInfo tracingInfo = getTracingInfoFactory().buildTracingInfo();
742+
execute(callback, statement, tracingInfo);
743+
}
744+
734745
private ListenableFuture<PreparedStatement> prepare(
735746
final PreparedStatement statement, EndPoint toExclude) {
736747
final String query = statement.getQueryString();

0 commit comments

Comments
 (0)