48
48
import com .google .common .util .concurrent .ListenableFuture ;
49
49
import io .netty .util .Timeout ;
50
50
import io .netty .util .TimerTask ;
51
+ import java .net .InetSocketAddress ;
51
52
import java .nio .ByteBuffer ;
52
53
import java .util .Collections ;
53
54
import java .util .Iterator ;
@@ -74,6 +75,10 @@ class RequestHandler {
74
75
private static final QueryLogger QUERY_LOGGER = QueryLogger .builder ().build ();
75
76
static final String DISABLE_QUERY_WARNING_LOGS = "com.datastax.driver.DISABLE_QUERY_WARNING_LOGS" ;
76
77
78
+ private static final int STATEMENT_MAX_LENGTH = 1000 ;
79
+ private static final int PARTITION_KEY_MAX_LENGTH = 1000 ;
80
+ private static final int BOUND_VALUES_MAX_LENGTH = 1000 ;
81
+
77
82
final String id ;
78
83
79
84
private final SessionManager manager ;
@@ -161,8 +166,92 @@ public RequestHandler(
161
166
this .timerContext = metricsEnabled () ? metrics ().getRequestsTimer ().time () : null ;
162
167
this .startTime = System .nanoTime ();
163
168
169
+ ConsistencyLevel consistency = statement .getConsistencyLevel ();
170
+ if (consistency == null ) consistency = Statement .DEFAULT .getConsistencyLevel ();
171
+
172
+ String statementType = null ;
173
+ String statementText = null ;
174
+ Integer batchSize = null ;
175
+
176
+ String keyspace = null ;
177
+ String partitionKey = null ;
178
+ String boundValues = null ;
179
+ String table = null ;
180
+ String operationType = null ;
181
+
182
+ if (statement instanceof BatchStatement ) {
183
+ statementType = "batch" ;
184
+ batchSize = ((BatchStatement ) statement ).size ();
185
+ StringBuilder statementTextBuilder = new StringBuilder (STATEMENT_MAX_LENGTH );
186
+ for (Statement subStatement : ((BatchStatement ) statement ).getStatements ()) {
187
+ if (subStatement instanceof BoundStatement )
188
+ statementTextBuilder .append (((BoundStatement ) subStatement ).statement .getQueryString ());
189
+ else statementTextBuilder .append (subStatement .toString ());
190
+ }
191
+ statementText = statementTextBuilder .toString ();
192
+ } else if (statement instanceof BoundStatement ) {
193
+ statementType = "prepared" ;
194
+ statementText = ((BoundStatement ) statement ).statement .getQueryString ();
195
+ keyspace = ((BoundStatement ) statement ).getKeyspace ();
196
+ operationType = ((BoundStatement ) statement ).getOperationType ();
197
+
198
+ ColumnDefinitions boundColumns =
199
+ ((BoundStatement ) statement ).statement .getPreparedId ().boundValuesMetadata .variables ;
200
+
201
+ StringBuilder boundValuesBuilder = new StringBuilder (BOUND_VALUES_MAX_LENGTH );
202
+ StringBuilder partitionKeyBuilder = new StringBuilder (PARTITION_KEY_MAX_LENGTH );
203
+ int [] rkIndexes = ((BoundStatement ) statement ).statement .getPreparedId ().routingKeyIndexes ;
204
+
205
+ for (int i = 0 ; i < boundColumns .size (); ++i ) {
206
+ Object value = ((BoundStatement ) statement ).getObject (i );
207
+ String valueString =
208
+ (value == null )
209
+ ? "NULL"
210
+ : value instanceof ByteBuffer
211
+ ? CodecUtils .bytesToHex (((ByteBuffer ) value ).array ())
212
+ : value .toString ();
213
+ String columnName = boundColumns .getName (i );
214
+ if (boundValuesBuilder .length () > 0 ) boundValuesBuilder .append (", " );
215
+ boundValuesBuilder .append (columnName );
216
+ boundValuesBuilder .append ('=' );
217
+ boundValuesBuilder .append (valueString );
218
+
219
+ if (rkIndexes != null ) {
220
+ for (int j : rkIndexes ) {
221
+ if (i == j ) {
222
+ if (partitionKeyBuilder .length () > 0 ) partitionKeyBuilder .append (", " );
223
+ partitionKeyBuilder .append (columnName );
224
+ partitionKeyBuilder .append ('=' );
225
+ partitionKeyBuilder .append (valueString );
226
+ break ;
227
+ }
228
+ }
229
+ }
230
+ }
231
+ boundValues = boundValuesBuilder .toString ();
232
+ partitionKey = partitionKeyBuilder .toString ();
233
+
234
+ if (boundColumns .size () > 0 ) table = boundColumns .getTable (0 );
235
+ } else if (statement instanceof RegularStatement ) {
236
+ statementType = "regular" ;
237
+ statementText = ((RegularStatement ) statement ).toString ();
238
+ }
239
+
164
240
this .tracingInfo = tracingInfo ;
165
241
this .tracingInfo .setNameAndStartTime ("request" );
242
+ this .tracingInfo .setConsistencyLevel (consistency );
243
+ this .tracingInfo .setRetryPolicy (retryPolicy ());
244
+ this .tracingInfo .setLoadBalancingPolicy (manager .loadBalancingPolicy ());
245
+ this .tracingInfo .setSpeculativeExecutionPolicy (manager .speculativeExecutionPolicy ());
246
+ if (statement .getFetchSize () > 0 ) this .tracingInfo .setFetchSize (statement .getFetchSize ());
247
+ if (statementType != null ) this .tracingInfo .setStatementType (statementType );
248
+ if (statementText != null ) this .tracingInfo .setStatement (statementText , STATEMENT_MAX_LENGTH );
249
+ if (batchSize != null ) this .tracingInfo .setBatchSize (batchSize );
250
+ if (keyspace != null ) this .tracingInfo .setKeyspace (keyspace );
251
+ if (boundValues != null ) this .tracingInfo .setBoundValues (boundValues );
252
+ if (partitionKey != null ) this .tracingInfo .setPartitionKey (partitionKey );
253
+ if (table != null ) this .tracingInfo .setTable (table );
254
+ if (operationType != null ) this .tracingInfo .setOperationType (operationType );
166
255
}
167
256
168
257
void sendRequest () {
@@ -280,6 +369,15 @@ private void setFinalResult(
280
369
&& logger .isWarnEnabled ()) {
281
370
logServerWarnings (response .warnings );
282
371
}
372
+
373
+ if (response .type == Message .Response .Type .RESULT ) {
374
+ Responses .Result rm = (Responses .Result ) response ;
375
+ if (rm .kind == Responses .Result .Kind .ROWS ) {
376
+ Responses .Result .Rows r = (Responses .Result .Rows ) rm ;
377
+ tracingInfo .setRowsCount (r .data .size ());
378
+ tracingInfo .setHasMorePages (r .metadata .pagingState != null );
379
+ }
380
+ }
283
381
callback .onSet (connection , response , info , statement , System .nanoTime () - startTime );
284
382
285
383
tracingInfo .setStatus (
@@ -336,6 +434,7 @@ private void setFinalException(
336
434
// Triggered when an execution reaches the end of the query plan.
337
435
// This is only a failure if there are no other running executions.
338
436
private void reportNoMoreHosts (SpeculativeExecution execution ) {
437
+ execution .parentTracingInfo .setAttemptCount (execution .retryCount () + 1 );
339
438
execution .parentTracingInfo .setStatus (TracingInfo .StatusCode .ERROR );
340
439
execution .parentTracingInfo .tracingFinished ();
341
440
runningExecutions .remove (execution );
@@ -460,6 +559,10 @@ private boolean query(final Host host) {
460
559
461
560
currentChildTracingInfo = manager .getTracingInfoFactory ().buildTracingInfo (parentTracingInfo );
462
561
currentChildTracingInfo .setNameAndStartTime ("attempt" );
562
+ InetSocketAddress hostAddress = host .getEndPoint ().resolve ();
563
+ currentChildTracingInfo .setPeerName (hostAddress .getHostName ());
564
+ currentChildTracingInfo .setPeerIP (hostAddress .getAddress ());
565
+ currentChildTracingInfo .setPeerPort (hostAddress .getPort ());
463
566
464
567
if (allowSpeculativeExecutions && nextExecutionScheduled .compareAndSet (false , true ))
465
568
scheduleExecution (speculativeExecutionPlan .nextExecution (host ));
@@ -679,6 +782,7 @@ void cancel() {
679
782
CancelledSpeculativeExecutionException .INSTANCE ,
680
783
System .nanoTime () - startTime );
681
784
}
785
+ parentTracingInfo .setAttemptCount (previous .retryCount + 1 );
682
786
parentTracingInfo .setStatus (TracingInfo .StatusCode .OK );
683
787
parentTracingInfo .tracingFinished ();
684
788
return ;
@@ -693,6 +797,7 @@ void cancel() {
693
797
CancelledSpeculativeExecutionException .INSTANCE ,
694
798
System .nanoTime () - startTime );
695
799
}
800
+ parentTracingInfo .setAttemptCount (previous .retryCount + 1 );
696
801
parentTracingInfo .setStatus (TracingInfo .StatusCode .OK );
697
802
parentTracingInfo .tracingFinished ();
698
803
return ;
@@ -710,6 +815,7 @@ public Message.Request request() {
710
815
@ Override
711
816
public void onSet (
712
817
Connection connection , Message .Response response , long latency , int retryCount ) {
818
+ currentChildTracingInfo .setShardID (connection .shardId ());
713
819
currentChildTracingInfo .setStatus (TracingInfo .StatusCode .OK );
714
820
currentChildTracingInfo .tracingFinished ();
715
821
@@ -1021,6 +1127,7 @@ public boolean onTimeout(Connection connection, long latency, int retryCount) {
1021
1127
@ Override
1022
1128
public void onException (
1023
1129
Connection connection , Exception exception , long latency , int retryCount ) {
1130
+ currentChildTracingInfo .setShardID (connection .shardId ());
1024
1131
currentChildTracingInfo .recordException (exception );
1025
1132
currentChildTracingInfo .setStatus (TracingInfo .StatusCode .ERROR );
1026
1133
currentChildTracingInfo .tracingFinished ();
@@ -1062,6 +1169,7 @@ public void onException(
1062
1169
1063
1170
@ Override
1064
1171
public boolean onTimeout (Connection connection , long latency , int retryCount ) {
1172
+ currentChildTracingInfo .setShardID (connection .shardId ());
1065
1173
currentChildTracingInfo .setStatus (TracingInfo .StatusCode .ERROR , "timeout" );
1066
1174
currentChildTracingInfo .tracingFinished ();
1067
1175
@@ -1106,12 +1214,14 @@ public int retryCount() {
1106
1214
}
1107
1215
1108
1216
private void setFinalException (Connection connection , Exception exception ) {
1217
+ parentTracingInfo .setAttemptCount (retryCount () + 1 );
1109
1218
parentTracingInfo .setStatus (TracingInfo .StatusCode .ERROR );
1110
1219
parentTracingInfo .tracingFinished ();
1111
1220
RequestHandler .this .setFinalException (this , connection , exception );
1112
1221
}
1113
1222
1114
1223
private void setFinalResult (Connection connection , Message .Response response ) {
1224
+ parentTracingInfo .setAttemptCount (retryCount () + 1 );
1115
1225
parentTracingInfo .setStatus (TracingInfo .StatusCode .OK );
1116
1226
parentTracingInfo .tracingFinished ();
1117
1227
RequestHandler .this .setFinalResult (this , connection , response );
0 commit comments