Skip to content

Commit 8f560a7

Browse files
committed
Tracing: instrumenting RequestHandler: added various driver-side tags.
All data that constitute added tags are collected offline, i.e. not fetched from the cluster, but known instantly by the driver.
1 parent 018842f commit 8f560a7

File tree

7 files changed

+305
-1
lines changed

7 files changed

+305
-1
lines changed

clirr-ignores.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616
Modified by ScyllaDB
1717
-->
1818
<differences>
19+
<difference>
20+
<differenceType>7012</differenceType> <!-- method added to interface -->
21+
<className>com/datastax/driver/core/PreparedStatement</className>
22+
<method>java.lang.String getOperationType()</method>
23+
<justification>New method to get the type of operation performed by this PreparedStatement</justification>
24+
</difference>
1925
<difference>
2026
<differenceType>7012</differenceType> <!-- method added to interface -->
2127
<className>com/datastax/driver/core/Session</className>

driver-core/src/main/java/com/datastax/driver/core/BoundStatement.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ public class BoundStatement extends Statement
7474

7575
private ByteBuffer routingKey;
7676

77+
private final String operationType;
78+
7779
/**
7880
* Creates a new {@code BoundStatement} from the provided prepared statement.
7981
*
@@ -92,6 +94,7 @@ public BoundStatement(PreparedStatement statement) {
9294
this.setSerialConsistencyLevel(statement.getSerialConsistencyLevel());
9395
if (statement.isTracing()) this.enableTracing();
9496
if (statement.getRetryPolicy() != null) this.setRetryPolicy(statement.getRetryPolicy());
97+
this.operationType = statement.getOperationType();
9598
if (statement.getOutgoingPayload() != null)
9699
this.setOutgoingPayload(statement.getOutgoingPayload());
97100
else
@@ -104,6 +107,10 @@ public BoundStatement(PreparedStatement statement) {
104107
}
105108
}
106109

110+
public String getOperationType() {
111+
return operationType;
112+
}
113+
107114
@Override
108115
public boolean isLWT() {
109116
return statement.isLWT();

driver-core/src/main/java/com/datastax/driver/core/DefaultPreparedStatement.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class DefaultPreparedStatement implements PreparedStatement {
4141
final Cluster cluster;
4242
final boolean isLWT;
4343
final Token.Factory partitioner;
44+
final String operationType;
4445

4546
volatile ByteBuffer routingKey;
4647

@@ -66,6 +67,7 @@ private DefaultPreparedStatement(
6667
this.cluster = cluster;
6768
this.isLWT = isLWT;
6869
this.partitioner = partitioner;
70+
this.operationType = null;
6971
}
7072

7173
static DefaultPreparedStatement fromMessage(
@@ -315,4 +317,10 @@ public Boolean isIdempotent() {
315317
public boolean isLWT() {
316318
return isLWT;
317319
}
320+
321+
/** {@inheritDoc} */
322+
@Override
323+
public String getOperationType() {
324+
return operationType;
325+
}
318326
}

driver-core/src/main/java/com/datastax/driver/core/PreparedStatement.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,4 +361,7 @@ public interface PreparedStatement {
361361

362362
/** Whether a prepared statement is LWT statement */
363363
public boolean isLWT();
364+
365+
/** Type of prepared operation (e.g. SELECT) */
366+
public String getOperationType();
364367
}

driver-core/src/main/java/com/datastax/driver/core/RequestHandler.java

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import com.google.common.util.concurrent.ListenableFuture;
4949
import io.netty.util.Timeout;
5050
import io.netty.util.TimerTask;
51+
import java.net.InetSocketAddress;
5152
import java.nio.ByteBuffer;
5253
import java.util.Collections;
5354
import java.util.Iterator;
@@ -74,6 +75,10 @@ class RequestHandler {
7475
private static final QueryLogger QUERY_LOGGER = QueryLogger.builder().build();
7576
static final String DISABLE_QUERY_WARNING_LOGS = "com.datastax.driver.DISABLE_QUERY_WARNING_LOGS";
7677

78+
private static final int STATEMENT_MAX_LENGTH = 1000;
79+
private static final int PARTITION_KEY_MAX_LENGTH = 1000;
80+
private static final int REPLICAS_MAX_LENGTH = 1000;
81+
7782
final String id;
7883

7984
private final SessionManager manager;
@@ -161,8 +166,70 @@ public RequestHandler(
161166
this.timerContext = metricsEnabled() ? metrics().getRequestsTimer().time() : null;
162167
this.startTime = System.nanoTime();
163168

169+
ConsistencyLevel consistency = statement.getConsistencyLevel();
170+
if (consistency == null) consistency = Statement.DEFAULT.getConsistencyLevel();
171+
172+
String statementType = null;
173+
String statementText = null;
174+
int batchSize = 1;
175+
176+
String keyspace = null;
177+
String partitionKey = null;
178+
String table = null;
179+
String operationType = null;
180+
181+
if (statement instanceof BatchStatement) {
182+
statementType = "batch";
183+
batchSize = ((BatchStatement) statement).size();
184+
StringBuilder statementTextBuilder = new StringBuilder(STATEMENT_MAX_LENGTH);
185+
for (Statement subStatement : ((BatchStatement) statement).getStatements()) {
186+
if (subStatement instanceof BoundStatement)
187+
statementTextBuilder.append(((BoundStatement) subStatement).statement.getQueryString());
188+
else statementTextBuilder.append(subStatement.toString());
189+
}
190+
statementText = statementTextBuilder.toString();
191+
} else if (statement instanceof BoundStatement) {
192+
statementType = "prepared";
193+
statementText = ((BoundStatement) statement).statement.getQueryString();
194+
keyspace = ((BoundStatement) statement).getKeyspace();
195+
operationType = ((BoundStatement) statement).getOperationType();
196+
197+
ColumnDefinitions boundColumns =
198+
((BoundStatement) statement).statement.getPreparedId().boundValuesMetadata.variables;
199+
200+
StringBuilder partitionKeyBuilder = new StringBuilder(PARTITION_KEY_MAX_LENGTH);
201+
int[] rkIndexes = ((BoundStatement) statement).statement.getPreparedId().routingKeyIndexes;
202+
if (rkIndexes != null) {
203+
for (int i : rkIndexes) {
204+
Object value = ((BoundStatement) statement).getObject(i);
205+
String valueString = (value == null) ? "NULL" : value.toString();
206+
if (partitionKeyBuilder.length() > 0) partitionKeyBuilder.append(", ");
207+
String columnName = boundColumns.getName(i);
208+
partitionKeyBuilder.append(columnName);
209+
partitionKeyBuilder.append('=');
210+
partitionKeyBuilder.append(valueString);
211+
}
212+
}
213+
partitionKey = partitionKeyBuilder.toString();
214+
215+
if (boundColumns.size() > 0) table = boundColumns.getTable(0);
216+
} else if (statement instanceof RegularStatement) {
217+
statementType = "regular";
218+
statementText = ((RegularStatement) statement).toString();
219+
}
220+
164221
this.tracingInfo = tracingInfo;
165222
this.tracingInfo.setNameAndStartTime("request");
223+
this.tracingInfo.setConsistencyLevel(consistency);
224+
this.tracingInfo.setRetryPolicy(retryPolicy());
225+
this.tracingInfo.setBatchSize(batchSize);
226+
this.tracingInfo.setLoadBalancingPolicy(manager.loadBalancingPolicy());
227+
if (statementType != null) this.tracingInfo.setStatementType(statementType);
228+
if (statementText != null) this.tracingInfo.setStatement(statementText, STATEMENT_MAX_LENGTH);
229+
if (keyspace != null) this.tracingInfo.setKeyspace(keyspace);
230+
if (partitionKey != null) this.tracingInfo.setPartitionKey(partitionKey);
231+
if (table != null) this.tracingInfo.setTable(table);
232+
if (operationType != null) this.tracingInfo.setOperationType(operationType);
166233
}
167234

168235
void sendRequest() {
@@ -281,6 +348,15 @@ private void setFinalResult(
281348
logServerWarnings(response.warnings);
282349
}
283350
callback.onSet(connection, response, info, statement, System.nanoTime() - startTime);
351+
352+
if (response.type == Message.Response.Type.RESULT) {
353+
Responses.Result rm = (Responses.Result) response;
354+
if (rm.kind == Responses.Result.Kind.ROWS) {
355+
Responses.Result.Rows r = (Responses.Result.Rows) rm;
356+
tracingInfo.setRowsCount(r.data.size());
357+
}
358+
}
359+
tracingInfo.setQueryPaged(info.getPagingState() != null);
284360
tracingInfo.setStatus(TracingInfo.StatusCode.OK);
285361
tracingInfo.tracingFinished();
286362
} catch (Exception e) {
@@ -291,8 +367,8 @@ private void setFinalResult(
291367
System.nanoTime() - startTime, /*unused*/
292368
0);
293369

294-
tracingInfo.setStatus(TracingInfo.StatusCode.ERROR, e.toString());
295370
tracingInfo.recordException(e);
371+
tracingInfo.setStatus(TracingInfo.StatusCode.ERROR, e.toString());
296372
tracingInfo.tracingFinished();
297373
}
298374
}
@@ -332,6 +408,7 @@ private void setFinalException(
332408
// Triggered when an execution reaches the end of the query plan.
333409
// This is only a failure if there are no other running executions.
334410
private void reportNoMoreHosts(SpeculativeExecution execution) {
411+
execution.parentTracingInfo.setRetryCount(execution.retryCount());
335412
execution.parentTracingInfo.tracingFinished();
336413
runningExecutions.remove(execution);
337414
if (runningExecutions.isEmpty())
@@ -455,6 +532,10 @@ private boolean query(final Host host) {
455532

456533
currentChildTracingInfo = manager.getTracingInfoFactory().buildTracingInfo(parentTracingInfo);
457534
currentChildTracingInfo.setNameAndStartTime("query");
535+
InetSocketAddress hostAddress = host.getEndPoint().resolve();
536+
currentChildTracingInfo.setPeerName(hostAddress.getHostName());
537+
currentChildTracingInfo.setPeerIP(hostAddress.getAddress());
538+
currentChildTracingInfo.setPeerPort(hostAddress.getPort());
458539

459540
if (allowSpeculativeExecutions && nextExecutionScheduled.compareAndSet(false, true))
460541
scheduleExecution(speculativeExecutionPlan.nextExecution(host));
@@ -674,6 +755,7 @@ void cancel() {
674755
CancelledSpeculativeExecutionException.INSTANCE,
675756
System.nanoTime() - startTime);
676757
}
758+
parentTracingInfo.setRetryCount(retryCount());
677759
parentTracingInfo.tracingFinished();
678760
return;
679761
} else if (!previous.inProgress
@@ -687,6 +769,7 @@ void cancel() {
687769
CancelledSpeculativeExecutionException.INSTANCE,
688770
System.nanoTime() - startTime);
689771
}
772+
parentTracingInfo.setRetryCount(retryCount());
690773
parentTracingInfo.tracingFinished();
691774
return;
692775
}
@@ -703,6 +786,7 @@ public Message.Request request() {
703786
@Override
704787
public void onSet(
705788
Connection connection, Message.Response response, long latency, int retryCount) {
789+
currentChildTracingInfo.setShardID(connection.shardId());
706790
currentChildTracingInfo.tracingFinished();
707791

708792
QueryState queryState = queryStateRef.get();
@@ -1013,6 +1097,7 @@ public boolean onTimeout(Connection connection, long latency, int retryCount) {
10131097
@Override
10141098
public void onException(
10151099
Connection connection, Exception exception, long latency, int retryCount) {
1100+
currentChildTracingInfo.setShardID(connection.shardId());
10161101
currentChildTracingInfo.tracingFinished();
10171102

10181103
QueryState queryState = queryStateRef.get();
@@ -1052,6 +1137,7 @@ public void onException(
10521137

10531138
@Override
10541139
public boolean onTimeout(Connection connection, long latency, int retryCount) {
1140+
currentChildTracingInfo.setShardID(connection.shardId());
10551141
currentChildTracingInfo.tracingFinished();
10561142

10571143
QueryState queryState = queryStateRef.get();
@@ -1095,11 +1181,13 @@ public int retryCount() {
10951181
}
10961182

10971183
private void setFinalException(Connection connection, Exception exception) {
1184+
parentTracingInfo.setRetryCount(retryCount());
10981185
parentTracingInfo.tracingFinished();
10991186
RequestHandler.this.setFinalException(this, connection, exception);
11001187
}
11011188

11021189
private void setFinalResult(Connection connection, Message.Response response) {
1190+
parentTracingInfo.setRetryCount(retryCount());
11031191
parentTracingInfo.tracingFinished();
11041192
RequestHandler.this.setFinalResult(this, connection, response);
11051193
}

driver-core/src/main/java/com/datastax/driver/core/tracing/NoopTracingInfoFactory.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,71 @@
1616

1717
package com.datastax.driver.core.tracing;
1818

19+
import com.datastax.driver.core.ConsistencyLevel;
20+
import com.datastax.driver.core.policies.LoadBalancingPolicy;
21+
import com.datastax.driver.core.policies.RetryPolicy;
22+
import java.net.InetAddress;
23+
1924
public class NoopTracingInfoFactory implements TracingInfoFactory {
2025

2126
private static class NoopTracingInfo implements TracingInfo {
2227
@Override
2328
public void setNameAndStartTime(String name) {}
2429

30+
@Override
31+
public void setConsistencyLevel(ConsistencyLevel consistency) {}
32+
33+
@Override
34+
public void setStatementType(String statementType) {}
35+
36+
@Override
37+
public void setRetryPolicy(RetryPolicy retryPolicy) {}
38+
39+
@Override
40+
public void setLoadBalancingPolicy(LoadBalancingPolicy loadBalancingPolicy) {};
41+
42+
@Override
43+
public void setBatchSize(int batchSize) {}
44+
45+
@Override
46+
public void setRetryCount(int retryCount) {}
47+
48+
@Override
49+
public void setShardID(int shardID) {}
50+
51+
@Override
52+
public void setPeerName(String peerName) {}
53+
54+
@Override
55+
public void setPeerIP(InetAddress peerIP) {}
56+
57+
@Override
58+
public void setPeerPort(int peerPort) {}
59+
60+
@Override
61+
public void setQueryPaged(Boolean queryPaged) {}
62+
63+
@Override
64+
public void setRowsCount(int rowsCount) {}
65+
66+
@Override
67+
public void setStatement(String statement, int limit) {}
68+
69+
@Override
70+
public void setKeyspace(String keyspace) {}
71+
72+
@Override
73+
public void setPartitionKey(String partitionKey) {}
74+
75+
@Override
76+
public void setTable(String table) {}
77+
78+
@Override
79+
public void setOperationType(String operationType) {}
80+
81+
@Override
82+
public void setReplicas(String replicas) {}
83+
2584
@Override
2685
public void recordException(Exception exception) {}
2786

0 commit comments

Comments
 (0)