48
48
import com .google .common .util .concurrent .ListenableFuture ;
49
49
import io .netty .util .Timeout ;
50
50
import io .netty .util .TimerTask ;
51
+ import java .net .InetSocketAddress ;
51
52
import java .nio .ByteBuffer ;
52
53
import java .util .Collections ;
53
54
import java .util .Iterator ;
@@ -74,6 +75,10 @@ class RequestHandler {
74
75
private static final QueryLogger QUERY_LOGGER = QueryLogger .builder ().build ();
75
76
static final String DISABLE_QUERY_WARNING_LOGS = "com.datastax.driver.DISABLE_QUERY_WARNING_LOGS" ;
76
77
78
+ private static final int STATEMENT_MAX_LENGTH = 1000 ;
79
+ private static final int PARTITION_KEY_MAX_LENGTH = 1000 ;
80
+ private static final int REPLICAS_MAX_LENGTH = 1000 ;
81
+
77
82
final String id ;
78
83
79
84
private final SessionManager manager ;
@@ -161,8 +166,71 @@ public RequestHandler(
161
166
this .timerContext = metricsEnabled () ? metrics ().getRequestsTimer ().time () : null ;
162
167
this .startTime = System .nanoTime ();
163
168
169
+ ConsistencyLevel consistency = statement .getConsistencyLevel ();
170
+ if (consistency == null ) consistency = Statement .DEFAULT .getConsistencyLevel ();
171
+
172
+ String statementType = null ;
173
+ String statementText = null ;
174
+ int batchSize = 1 ;
175
+
176
+ String keyspace = null ;
177
+ String partitionKey = null ;
178
+ String table = null ;
179
+ String operationType = null ;
180
+
181
+ if (statement instanceof BatchStatement ) {
182
+ statementType = "batch" ;
183
+ batchSize = ((BatchStatement ) statement ).size ();
184
+ StringBuilder statementTextBuilder = new StringBuilder (STATEMENT_MAX_LENGTH );
185
+ for (Statement subStatement : ((BatchStatement ) statement ).getStatements ()) {
186
+ if (subStatement instanceof BoundStatement )
187
+ statementTextBuilder .append (((BoundStatement ) subStatement ).statement .getQueryString ());
188
+ else statementTextBuilder .append (subStatement .toString ());
189
+ }
190
+ statementText = statementTextBuilder .toString ();
191
+ } else if (statement instanceof BoundStatement ) {
192
+ statementType = "prepared" ;
193
+ statementText = ((BoundStatement ) statement ).statement .getQueryString ();
194
+ keyspace = ((BoundStatement ) statement ).getKeyspace ();
195
+ operationType = ((BoundStatement ) statement ).getOperationType ();
196
+
197
+ ColumnDefinitions boundColumns =
198
+ ((BoundStatement ) statement ).statement .getPreparedId ().boundValuesMetadata .variables ;
199
+
200
+ StringBuilder partitionKeyBuilder = new StringBuilder (PARTITION_KEY_MAX_LENGTH );
201
+ int [] rkIndexes = ((BoundStatement ) statement ).statement .getPreparedId ().routingKeyIndexes ;
202
+ if (rkIndexes != null ) {
203
+ for (int i : rkIndexes ) {
204
+ Object value = ((BoundStatement ) statement ).getObject (i );
205
+ String valueString = (value == null ) ? "NULL" : value .toString ();
206
+ if (partitionKeyBuilder .length () > 0 ) partitionKeyBuilder .append (", " );
207
+ String columnName = boundColumns .getName (i );
208
+ partitionKeyBuilder .append (columnName );
209
+ partitionKeyBuilder .append ('=' );
210
+ partitionKeyBuilder .append (valueString );
211
+ }
212
+ }
213
+ partitionKey = partitionKeyBuilder .toString ();
214
+
215
+ if (boundColumns .size () > 0 ) table = boundColumns .getTable (0 );
216
+ } else if (statement instanceof RegularStatement ) {
217
+ statementType = "regular" ;
218
+ statementText = ((RegularStatement ) statement ).toString ();
219
+ }
220
+
164
221
this .tracingInfo = tracingInfo ;
165
222
this .tracingInfo .setNameAndStartTime ("request" );
223
+ this .tracingInfo .setConsistencyLevel (consistency );
224
+ this .tracingInfo .setRetryPolicy (retryPolicy ());
225
+ this .tracingInfo .setBatchSize (batchSize );
226
+ this .tracingInfo .setLoadBalancingPolicy (manager .loadBalancingPolicy ());
227
+ this .tracingInfo .setSpeculativeExecutionPolicy (manager .speculativeExecutionPolicy ());
228
+ if (statementType != null ) this .tracingInfo .setStatementType (statementType );
229
+ if (statementText != null ) this .tracingInfo .setStatement (statementText , STATEMENT_MAX_LENGTH );
230
+ if (keyspace != null ) this .tracingInfo .setKeyspace (keyspace );
231
+ if (partitionKey != null ) this .tracingInfo .setPartitionKey (partitionKey );
232
+ if (table != null ) this .tracingInfo .setTable (table );
233
+ if (operationType != null ) this .tracingInfo .setOperationType (operationType );
166
234
}
167
235
168
236
void sendRequest () {
@@ -282,6 +350,14 @@ private void setFinalResult(
282
350
}
283
351
callback .onSet (connection , response , info , statement , System .nanoTime () - startTime );
284
352
353
+ if (response .type == Message .Response .Type .RESULT ) {
354
+ Responses .Result rm = (Responses .Result ) response ;
355
+ if (rm .kind == Responses .Result .Kind .ROWS ) {
356
+ Responses .Result .Rows r = (Responses .Result .Rows ) rm ;
357
+ tracingInfo .setRowsCount (r .data .size ());
358
+ }
359
+ }
360
+ tracingInfo .setQueryPaged (info .getPagingState () != null );
285
361
tracingInfo .setStatus (
286
362
response .type == Message .Response .Type .ERROR
287
363
? TracingInfo .StatusCode .ERROR
@@ -298,6 +374,7 @@ private void setFinalResult(
298
374
tracingInfo .recordException (e );
299
375
tracingInfo .setStatus (TracingInfo .StatusCode .ERROR , e .toString ());
300
376
tracingInfo .recordException (e );
377
+ tracingInfo .setStatus (TracingInfo .StatusCode .ERROR , e .toString ());
301
378
tracingInfo .tracingFinished ();
302
379
}
303
380
}
@@ -337,6 +414,7 @@ private void setFinalException(
337
414
// Triggered when an execution reaches the end of the query plan.
338
415
// This is only a failure if there are no other running executions.
339
416
private void reportNoMoreHosts (SpeculativeExecution execution ) {
417
+ execution .parentTracingInfo .setRetryCount (execution .retryCount ());
340
418
execution .parentTracingInfo .setStatus (TracingInfo .StatusCode .ERROR );
341
419
execution .parentTracingInfo .tracingFinished ();
342
420
runningExecutions .remove (execution );
@@ -461,6 +539,10 @@ private boolean query(final Host host) {
461
539
462
540
currentChildTracingInfo = manager .getTracingInfoFactory ().buildTracingInfo (parentTracingInfo );
463
541
currentChildTracingInfo .setNameAndStartTime ("attempt" );
542
+ InetSocketAddress hostAddress = host .getEndPoint ().resolve ();
543
+ currentChildTracingInfo .setPeerName (hostAddress .getHostName ());
544
+ currentChildTracingInfo .setPeerIP (hostAddress .getAddress ());
545
+ currentChildTracingInfo .setPeerPort (hostAddress .getPort ());
464
546
465
547
if (allowSpeculativeExecutions && nextExecutionScheduled .compareAndSet (false , true ))
466
548
scheduleExecution (speculativeExecutionPlan .nextExecution (host ));
@@ -680,6 +762,7 @@ void cancel() {
680
762
CancelledSpeculativeExecutionException .INSTANCE ,
681
763
System .nanoTime () - startTime );
682
764
}
765
+ parentTracingInfo .setRetryCount (retryCount ());
683
766
parentTracingInfo .setStatus (TracingInfo .StatusCode .OK );
684
767
parentTracingInfo .tracingFinished ();
685
768
return ;
@@ -694,6 +777,7 @@ void cancel() {
694
777
CancelledSpeculativeExecutionException .INSTANCE ,
695
778
System .nanoTime () - startTime );
696
779
}
780
+ parentTracingInfo .setRetryCount (retryCount ());
697
781
parentTracingInfo .setStatus (TracingInfo .StatusCode .OK );
698
782
parentTracingInfo .tracingFinished ();
699
783
return ;
@@ -711,6 +795,7 @@ public Message.Request request() {
711
795
@ Override
712
796
public void onSet (
713
797
Connection connection , Message .Response response , long latency , int retryCount ) {
798
+ currentChildTracingInfo .setShardID (connection .shardId ());
714
799
currentChildTracingInfo .setStatus (TracingInfo .StatusCode .OK );
715
800
currentChildTracingInfo .tracingFinished ();
716
801
@@ -1022,6 +1107,7 @@ public boolean onTimeout(Connection connection, long latency, int retryCount) {
1022
1107
@ Override
1023
1108
public void onException (
1024
1109
Connection connection , Exception exception , long latency , int retryCount ) {
1110
+ currentChildTracingInfo .setShardID (connection .shardId ());
1025
1111
currentChildTracingInfo .recordException (exception );
1026
1112
currentChildTracingInfo .setStatus (TracingInfo .StatusCode .ERROR );
1027
1113
currentChildTracingInfo .tracingFinished ();
@@ -1063,6 +1149,7 @@ public void onException(
1063
1149
1064
1150
@ Override
1065
1151
public boolean onTimeout (Connection connection , long latency , int retryCount ) {
1152
+ currentChildTracingInfo .setShardID (connection .shardId ());
1066
1153
currentChildTracingInfo .setStatus (TracingInfo .StatusCode .ERROR , "timeout" );
1067
1154
currentChildTracingInfo .tracingFinished ();
1068
1155
@@ -1107,12 +1194,14 @@ public int retryCount() {
1107
1194
}
1108
1195
1109
1196
private void setFinalException (Connection connection , Exception exception ) {
1197
+ parentTracingInfo .setRetryCount (retryCount ());
1110
1198
parentTracingInfo .setStatus (TracingInfo .StatusCode .ERROR );
1111
1199
parentTracingInfo .tracingFinished ();
1112
1200
RequestHandler .this .setFinalException (this , connection , exception );
1113
1201
}
1114
1202
1115
1203
private void setFinalResult (Connection connection , Message .Response response ) {
1204
+ parentTracingInfo .setRetryCount (retryCount ());
1116
1205
parentTracingInfo .setStatus (TracingInfo .StatusCode .OK );
1117
1206
parentTracingInfo .tracingFinished ();
1118
1207
RequestHandler .this .setFinalResult (this , connection , response );
0 commit comments