40
40
import com .datastax .driver .core .policies .RetryPolicy ;
41
41
import com .datastax .driver .core .policies .RetryPolicy .RetryDecision .Type ;
42
42
import com .datastax .driver .core .policies .SpeculativeExecutionPolicy .SpeculativeExecutionPlan ;
43
+ import com .datastax .driver .core .tracing .TracingInfo ;
43
44
import com .google .common .collect .ImmutableList ;
44
45
import com .google .common .collect .Iterators ;
45
46
import com .google .common .collect .Sets ;
@@ -95,6 +96,8 @@ class RequestHandler {
95
96
private final AtomicBoolean isDone = new AtomicBoolean ();
96
97
private final AtomicInteger executionIndex = new AtomicInteger ();
97
98
99
+ private final TracingInfo tracingInfo ;
100
+
98
101
private Iterator <Host > getReplicas (
99
102
String loggedKeyspace , Statement statement , Iterator <Host > fallback ) {
100
103
ProtocolVersion protocolVersion = manager .cluster .manager .protocolVersion ();
@@ -120,7 +123,8 @@ private Iterator<Host> getReplicas(
120
123
return replicas .iterator ();
121
124
}
122
125
123
- public RequestHandler (SessionManager manager , Callback callback , Statement statement ) {
126
+ public RequestHandler (
127
+ SessionManager manager , Callback callback , Statement statement , TracingInfo tracingInfo ) {
124
128
this .id = Long .toString (System .identityHashCode (this ));
125
129
if (logger .isTraceEnabled ()) logger .trace ("[{}] {}" , id , statement );
126
130
this .manager = manager ;
@@ -156,6 +160,9 @@ public RequestHandler(SessionManager manager, Callback callback, Statement state
156
160
157
161
this .timerContext = metricsEnabled () ? metrics ().getRequestsTimer ().time () : null ;
158
162
this .startTime = System .nanoTime ();
163
+
164
+ this .tracingInfo = tracingInfo ;
165
+ this .tracingInfo .setNameAndStartTime ("request" );
159
166
}
160
167
161
168
void sendRequest () {
@@ -274,13 +281,16 @@ private void setFinalResult(
274
281
logServerWarnings (response .warnings );
275
282
}
276
283
callback .onSet (connection , response , info , statement , System .nanoTime () - startTime );
284
+ tracingInfo .tracingFinished ();
277
285
} catch (Exception e ) {
278
286
callback .onException (
279
287
connection ,
280
288
new DriverInternalError (
281
289
"Unexpected exception while setting final result from " + response , e ),
282
290
System .nanoTime () - startTime , /*unused*/
283
291
0 );
292
+
293
+ tracingInfo .tracingFinished ();
284
294
}
285
295
}
286
296
@@ -305,6 +315,8 @@ private void setFinalException(
305
315
306
316
cancelPendingExecutions (execution );
307
317
318
+ tracingInfo .tracingFinished ();
319
+
308
320
try {
309
321
if (timerContext != null ) timerContext .stop ();
310
322
} finally {
@@ -315,6 +327,7 @@ private void setFinalException(
315
327
// Triggered when an execution reaches the end of the query plan.
316
328
// This is only a failure if there are no other running executions.
317
329
private void reportNoMoreHosts (SpeculativeExecution execution ) {
330
+ execution .parentTracingInfo .tracingFinished ();
318
331
runningExecutions .remove (execution );
319
332
if (runningExecutions .isEmpty ())
320
333
setFinalException (
@@ -383,11 +396,17 @@ class SpeculativeExecution implements Connection.ResponseCallback {
383
396
384
397
private volatile Connection .ResponseHandler connectionHandler ;
385
398
399
+ private final TracingInfo parentTracingInfo ;
400
+ private TracingInfo currentChildTracingInfo ;
401
+
386
402
SpeculativeExecution (Message .Request request , int position ) {
387
403
this .id = RequestHandler .this .id + "-" + position ;
388
404
this .request = request ;
389
405
this .position = position ;
390
406
this .queryStateRef = new AtomicReference <QueryState >(QueryState .INITIAL );
407
+ this .parentTracingInfo =
408
+ manager .getTracingInfoFactory ().buildTracingInfo (RequestHandler .this .tracingInfo );
409
+ this .parentTracingInfo .setNameAndStartTime ("speculative_execution" );
391
410
if (logger .isTraceEnabled ()) logger .trace ("[{}] Starting" , id );
392
411
}
393
412
@@ -429,6 +448,9 @@ private boolean query(final Host host) {
429
448
430
449
if (logger .isTraceEnabled ()) logger .trace ("[{}] Querying node {}" , id , host );
431
450
451
+ currentChildTracingInfo = manager .getTracingInfoFactory ().buildTracingInfo (parentTracingInfo );
452
+ currentChildTracingInfo .setNameAndStartTime ("attempt" );
453
+
432
454
if (allowSpeculativeExecutions && nextExecutionScheduled .compareAndSet (false , true ))
433
455
scheduleExecution (speculativeExecutionPlan .nextExecution (host ));
434
456
@@ -647,6 +669,7 @@ void cancel() {
647
669
CancelledSpeculativeExecutionException .INSTANCE ,
648
670
System .nanoTime () - startTime );
649
671
}
672
+ parentTracingInfo .tracingFinished ();
650
673
return ;
651
674
} else if (!previous .inProgress
652
675
&& queryStateRef .compareAndSet (previous , QueryState .CANCELLED_WHILE_COMPLETE )) {
@@ -659,6 +682,7 @@ void cancel() {
659
682
CancelledSpeculativeExecutionException .INSTANCE ,
660
683
System .nanoTime () - startTime );
661
684
}
685
+ parentTracingInfo .tracingFinished ();
662
686
return ;
663
687
}
664
688
}
@@ -674,6 +698,8 @@ public Message.Request request() {
674
698
@ Override
675
699
public void onSet (
676
700
Connection connection , Message .Response response , long latency , int retryCount ) {
701
+ currentChildTracingInfo .tracingFinished ();
702
+
677
703
QueryState queryState = queryStateRef .get ();
678
704
if (!queryState .isInProgressAt (retryCount )
679
705
|| !queryStateRef .compareAndSet (queryState , queryState .complete ())) {
@@ -832,7 +858,10 @@ public void onSet(
832
858
toPrepare .getQueryKeyspace (),
833
859
connection .endPoint );
834
860
835
- write (connection , prepareAndRetry (toPrepare .getQueryString ()));
861
+ TracingInfo prepareTracingInfo =
862
+ manager .getTracingInfoFactory ().buildTracingInfo (parentTracingInfo );
863
+ prepareTracingInfo .setNameAndStartTime ("prepare" );
864
+ write (connection , prepareAndRetry (toPrepare .getQueryString (), prepareTracingInfo ));
836
865
// we're done for now, the prepareAndRetry callback will handle the rest
837
866
return ;
838
867
case READ_FAILURE :
@@ -878,7 +907,8 @@ public void onSet(
878
907
}
879
908
}
880
909
881
- private Connection .ResponseCallback prepareAndRetry (final String toPrepare ) {
910
+ private Connection .ResponseCallback prepareAndRetry (
911
+ final String toPrepare , final TracingInfo prepareTracingInfo ) {
882
912
// do not bother inspecting retry policy at this step, no other decision
883
913
// makes sense than retry on the same host if the query was prepared,
884
914
// or on another host, if an error/timeout occurred.
@@ -902,6 +932,8 @@ public int retryCount() {
902
932
@ Override
903
933
public void onSet (
904
934
Connection connection , Message .Response response , long latency , int retryCount ) {
935
+ prepareTracingInfo .tracingFinished ();
936
+
905
937
QueryState queryState = queryStateRef .get ();
906
938
if (!queryState .isInProgressAt (retryCount )
907
939
|| !queryStateRef .compareAndSet (queryState , queryState .complete ())) {
@@ -944,11 +976,14 @@ public void onSet(
944
976
@ Override
945
977
public void onException (
946
978
Connection connection , Exception exception , long latency , int retryCount ) {
979
+ prepareTracingInfo .tracingFinished ();
947
980
SpeculativeExecution .this .onException (connection , exception , latency , retryCount );
948
981
}
949
982
950
983
@ Override
951
984
public boolean onTimeout (Connection connection , long latency , int retryCount ) {
985
+ prepareTracingInfo .tracingFinished ();
986
+
952
987
QueryState queryState = queryStateRef .get ();
953
988
if (!queryState .isInProgressAt (retryCount )
954
989
|| !queryStateRef .compareAndSet (queryState , queryState .complete ())) {
@@ -973,6 +1008,8 @@ public boolean onTimeout(Connection connection, long latency, int retryCount) {
973
1008
@ Override
974
1009
public void onException (
975
1010
Connection connection , Exception exception , long latency , int retryCount ) {
1011
+ currentChildTracingInfo .tracingFinished ();
1012
+
976
1013
QueryState queryState = queryStateRef .get ();
977
1014
if (!queryState .isInProgressAt (retryCount )
978
1015
|| !queryStateRef .compareAndSet (queryState , queryState .complete ())) {
@@ -1010,6 +1047,8 @@ public void onException(
1010
1047
1011
1048
@ Override
1012
1049
public boolean onTimeout (Connection connection , long latency , int retryCount ) {
1050
+ currentChildTracingInfo .tracingFinished ();
1051
+
1013
1052
QueryState queryState = queryStateRef .get ();
1014
1053
if (!queryState .isInProgressAt (retryCount )
1015
1054
|| !queryStateRef .compareAndSet (queryState , queryState .complete ())) {
@@ -1051,10 +1090,12 @@ public int retryCount() {
1051
1090
}
1052
1091
1053
1092
private void setFinalException (Connection connection , Exception exception ) {
1093
+ parentTracingInfo .tracingFinished ();
1054
1094
RequestHandler .this .setFinalException (this , connection , exception );
1055
1095
}
1056
1096
1057
1097
private void setFinalResult (Connection connection , Message .Response response ) {
1098
+ parentTracingInfo .tracingFinished ();
1058
1099
RequestHandler .this .setFinalResult (this , connection , response );
1059
1100
}
1060
1101
}
0 commit comments