Skip to content

Commit 92b6eee

Browse files
Andrew Weaverlukasz-antoniak
authored andcommitted
Add ExecutionInfo to RequestTracker methods
1 parent 7bc085b commit 92b6eee

File tree

14 files changed

+299
-98
lines changed

14 files changed

+299
-98
lines changed

core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919

2020
import com.datastax.dse.driver.api.core.DseProtocolVersion;
2121
import com.datastax.dse.driver.api.core.cql.continuous.ContinuousAsyncResultSet;
22+
import com.datastax.dse.driver.api.core.graph.AsyncGraphResultSet;
2223
import com.datastax.dse.driver.internal.core.DseProtocolFeature;
2324
import com.datastax.dse.driver.internal.core.cql.DseConversions;
2425
import com.datastax.dse.protocol.internal.request.Revise;
2526
import com.datastax.dse.protocol.internal.response.result.DseRowsMetadata;
2627
import com.datastax.oss.driver.api.core.AllNodesFailedException;
28+
import com.datastax.oss.driver.api.core.AsyncPagingIterable;
2729
import com.datastax.oss.driver.api.core.CqlIdentifier;
2830
import com.datastax.oss.driver.api.core.DriverTimeoutException;
2931
import com.datastax.oss.driver.api.core.NodeUnavailableException;
@@ -627,7 +629,7 @@ public void operationComplete(@NonNull Future<java.lang.Void> future) {
627629
Throwable error = future.cause();
628630
if (error instanceof EncoderException
629631
&& error.getCause() instanceof FrameTooLongException) {
630-
trackNodeError(node, error.getCause());
632+
trackNodeError(node, error.getCause(), null);
631633
lock.lock();
632634
try {
633635
abort(error.getCause(), false);
@@ -644,7 +646,7 @@ public void operationComplete(@NonNull Future<java.lang.Void> future) {
644646
.getMetricUpdater()
645647
.incrementCounter(DefaultNodeMetric.UNSENT_REQUESTS, executionProfile.getName());
646648
recordError(node, error);
647-
trackNodeError(node, error.getCause());
649+
trackNodeError(node, error.getCause(), null);
648650
sendRequest(statement, null, executionIndex, retryCount, scheduleSpeculativeExecution);
649651
}
650652
} else {
@@ -739,7 +741,8 @@ private void onPageTimeout(int expectedPage) {
739741
* Invoked when a continuous paging response is received, either a successful or failed one.
740742
*
741743
* <p>Delegates further processing to appropriate methods: {@link #processResultResponse(Result,
742-
* Frame)} if the response was successful, or {@link #processErrorResponse(Error)} if it wasn't.
744+
* Frame)} if the response was successful, or {@link #processErrorResponse(Error, Frame)} if it
745+
* wasn't.
743746
*
744747
* @param response the received {@link Frame}.
745748
*/
@@ -760,15 +763,15 @@ public void onResponse(@NonNull Frame response) {
760763
processResultResponse((Result) responseMessage, response);
761764
} else if (responseMessage instanceof Error) {
762765
LOG.trace("[{}] Got error response", logPrefix);
763-
processErrorResponse((Error) responseMessage);
766+
processErrorResponse((Error) responseMessage, response);
764767
} else {
765768
IllegalStateException error =
766769
new IllegalStateException("Unexpected response " + responseMessage);
767-
trackNodeError(node, error);
770+
trackNodeError(node, error, response);
768771
abort(error, false);
769772
}
770773
} catch (Throwable t) {
771-
trackNodeError(node, t);
774+
trackNodeError(node, t, response);
772775
abort(t, false);
773776
}
774777
} finally {
@@ -902,7 +905,7 @@ private void processResultResponse(@NonNull Result result, @Nullable Frame frame
902905
* @param errorMessage the error message received.
903906
*/
904907
@SuppressWarnings("GuardedBy") // this method is only called with the lock held
905-
private void processErrorResponse(@NonNull Error errorMessage) {
908+
private void processErrorResponse(@NonNull Error errorMessage, @NonNull Frame frame) {
906909
assert lock.isHeldByCurrentThread();
907910
if (errorMessage instanceof Unprepared) {
908911
processUnprepared((Unprepared) errorMessage);
@@ -911,7 +914,7 @@ private void processErrorResponse(@NonNull Error errorMessage) {
911914
if (error instanceof BootstrappingException) {
912915
LOG.trace("[{}] {} is bootstrapping, trying next node", logPrefix, node);
913916
recordError(node, error);
914-
trackNodeError(node, error);
917+
trackNodeError(node, error, frame);
915918
sendRequest(statement, null, executionIndex, retryCount, false);
916919
} else if (error instanceof QueryValidationException
917920
|| error instanceof FunctionFailureException
@@ -923,7 +926,7 @@ private void processErrorResponse(@NonNull Error errorMessage) {
923926
NodeMetricUpdater metricUpdater = ((DefaultNode) node).getMetricUpdater();
924927
metricUpdater.incrementCounter(
925928
DefaultNodeMetric.OTHER_ERRORS, executionProfile.getName());
926-
trackNodeError(node, error);
929+
trackNodeError(node, error, frame);
927930
abort(error, true);
928931
} else {
929932
try {
@@ -1062,7 +1065,7 @@ private void processUnprepared(@NonNull Unprepared errorMessage) {
10621065
+ "This usually happens when you run a 'USE...' query after "
10631066
+ "the statement was prepared.",
10641067
Bytes.toHexString(idToReprepare), Bytes.toHexString(repreparedId)));
1065-
trackNodeError(node, illegalStateException);
1068+
trackNodeError(node, illegalStateException, null);
10661069
fatalError = illegalStateException;
10671070
} else {
10681071
LOG.trace(
@@ -1081,18 +1084,18 @@ private void processUnprepared(@NonNull Unprepared errorMessage) {
10811084
|| prepareError instanceof FunctionFailureException
10821085
|| prepareError instanceof ProtocolError) {
10831086
LOG.trace("[{}] Unrecoverable error on re-prepare, rethrowing", logPrefix);
1084-
trackNodeError(node, prepareError);
1087+
trackNodeError(node, prepareError, null);
10851088
fatalError = prepareError;
10861089
}
10871090
}
10881091
} else if (exception instanceof RequestThrottlingException) {
1089-
trackNodeError(node, exception);
1092+
trackNodeError(node, exception, null);
10901093
fatalError = exception;
10911094
}
10921095
if (fatalError == null) {
10931096
LOG.trace("[{}] Re-prepare failed, trying next node", logPrefix);
10941097
recordError(node, exception);
1095-
trackNodeError(node, exception);
1098+
trackNodeError(node, exception, null);
10961099
sendRequest(statement, null, executionIndex, retryCount, false);
10971100
}
10981101
}
@@ -1120,18 +1123,18 @@ private void processRetryVerdict(@NonNull RetryVerdict verdict, @NonNull Throwab
11201123
switch (verdict.getRetryDecision()) {
11211124
case RETRY_SAME:
11221125
recordError(node, error);
1123-
trackNodeError(node, error);
1126+
trackNodeError(node, error, null);
11241127
sendRequest(
11251128
verdict.getRetryRequest(statement), node, executionIndex, retryCount + 1, false);
11261129
break;
11271130
case RETRY_NEXT:
11281131
recordError(node, error);
1129-
trackNodeError(node, error);
1132+
trackNodeError(node, error, null);
11301133
sendRequest(
11311134
verdict.getRetryRequest(statement), null, executionIndex, retryCount + 1, false);
11321135
break;
11331136
case RETHROW:
1134-
trackNodeError(node, error);
1137+
trackNodeError(node, error, null);
11351138
abort(error, true);
11361139
break;
11371140
case IGNORE:
@@ -1444,12 +1447,20 @@ private void reenableAutoReadIfNeeded() {
14441447

14451448
// ERROR HANDLING
14461449

1447-
private void trackNodeError(@NonNull Node node, @NonNull Throwable error) {
1450+
private void trackNodeError(
1451+
@NonNull Node node, @NonNull Throwable error, @Nullable Frame frame) {
14481452
if (nodeErrorReported.compareAndSet(false, true)) {
14491453
long latencyNanos = System.nanoTime() - this.messageStartTimeNanos;
14501454
context
14511455
.getRequestTracker()
1452-
.onNodeError(this.statement, error, latencyNanos, executionProfile, node, logPrefix);
1456+
.onNodeError(
1457+
this.statement,
1458+
error,
1459+
latencyNanos,
1460+
executionProfile,
1461+
node,
1462+
createExecutionInfo(frame),
1463+
logPrefix);
14531464
}
14541465
}
14551466

@@ -1563,21 +1574,32 @@ private void completeResultSetFuture(
15631574
if (resultSetClass.isInstance(pageOrError)) {
15641575
if (future.complete(resultSetClass.cast(pageOrError))) {
15651576
throttler.signalSuccess(ContinuousRequestHandlerBase.this);
1577+
1578+
ExecutionInfo executionInfo = null;
1579+
if (pageOrError instanceof AsyncPagingIterable) {
1580+
executionInfo = ((AsyncPagingIterable) pageOrError).getExecutionInfo();
1581+
} else if (pageOrError instanceof AsyncGraphResultSet) {
1582+
executionInfo = ((AsyncGraphResultSet) pageOrError).getRequestExecutionInfo();
1583+
}
1584+
15661585
if (nodeSuccessReported.compareAndSet(false, true)) {
15671586
context
15681587
.getRequestTracker()
1569-
.onNodeSuccess(statement, nodeLatencyNanos, executionProfile, node, logPrefix);
1588+
.onNodeSuccess(
1589+
statement, nodeLatencyNanos, executionProfile, node, executionInfo, logPrefix);
15701590
}
15711591
context
15721592
.getRequestTracker()
1573-
.onSuccess(statement, totalLatencyNanos, executionProfile, node, logPrefix);
1593+
.onSuccess(
1594+
statement, totalLatencyNanos, executionProfile, node, executionInfo, logPrefix);
15741595
}
15751596
} else {
15761597
Throwable error = (Throwable) pageOrError;
15771598
if (future.completeExceptionally(error)) {
15781599
context
15791600
.getRequestTracker()
1580-
.onError(statement, error, totalLatencyNanos, executionProfile, node, logPrefix);
1601+
.onError(
1602+
statement, error, totalLatencyNanos, executionProfile, node, null, logPrefix);
15811603
if (error instanceof DriverTimeoutException) {
15821604
throttler.signalTimeout(ContinuousRequestHandlerBase.this);
15831605
session
@@ -1608,6 +1630,22 @@ private ExecutionInfo createExecutionInfo(@NonNull Result result, @Nullable Fram
16081630
executionProfile);
16091631
}
16101632

1633+
@NonNull
1634+
private ExecutionInfo createExecutionInfo(@Nullable Frame response) {
1635+
return new DefaultExecutionInfo(
1636+
statement,
1637+
node,
1638+
startedSpeculativeExecutionsCount.get(),
1639+
executionIndex,
1640+
errors,
1641+
null,
1642+
response,
1643+
true,
1644+
session,
1645+
context,
1646+
executionProfile);
1647+
}
1648+
16111649
private void logTimeoutSchedulingError(IllegalStateException timeoutError) {
16121650
// If we're racing with session shutdown, the timer might be stopped already. We don't want
16131651
// to schedule more executions anyway, so swallow the error.

core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -361,9 +361,19 @@ private void setFinalResult(
361361
totalLatencyNanos = completionTimeNanos - startTimeNanos;
362362
long nodeLatencyNanos = completionTimeNanos - callback.nodeStartTimeNanos;
363363
requestTracker.onNodeSuccess(
364-
callback.statement, nodeLatencyNanos, executionProfile, callback.node, logPrefix);
364+
callback.statement,
365+
nodeLatencyNanos,
366+
executionProfile,
367+
callback.node,
368+
executionInfo,
369+
logPrefix);
365370
requestTracker.onSuccess(
366-
callback.statement, totalLatencyNanos, executionProfile, callback.node, logPrefix);
371+
callback.statement,
372+
totalLatencyNanos,
373+
executionProfile,
374+
callback.node,
375+
executionInfo,
376+
logPrefix);
367377
}
368378
if (sessionMetricUpdater.isEnabled(
369379
DseSessionMetric.GRAPH_REQUESTS, executionProfile.getName())) {
@@ -447,27 +457,28 @@ private void setFinalError(
447457
GraphStatement<?> statement, Throwable error, Node node, int execution) {
448458
DriverExecutionProfile executionProfile =
449459
Conversions.resolveExecutionProfile(statement, context);
460+
ExecutionInfo executionInfo =
461+
new DefaultExecutionInfo(
462+
statement,
463+
node,
464+
startedSpeculativeExecutionsCount.get(),
465+
execution,
466+
errors,
467+
null,
468+
null,
469+
true,
470+
session,
471+
context,
472+
executionProfile);
450473
if (error instanceof DriverException) {
451-
((DriverException) error)
452-
.setExecutionInfo(
453-
new DefaultExecutionInfo(
454-
statement,
455-
node,
456-
startedSpeculativeExecutionsCount.get(),
457-
execution,
458-
errors,
459-
null,
460-
null,
461-
true,
462-
session,
463-
context,
464-
executionProfile));
474+
((DriverException) error).setExecutionInfo(executionInfo);
465475
}
466476
if (result.completeExceptionally(error)) {
467477
cancelScheduledTasks();
468478
if (!(requestTracker instanceof NoopRequestTracker)) {
469479
long latencyNanos = System.nanoTime() - startTimeNanos;
470-
requestTracker.onError(statement, error, latencyNanos, executionProfile, node, logPrefix);
480+
requestTracker.onError(
481+
statement, error, latencyNanos, executionProfile, node, executionInfo, logPrefix);
471482
}
472483
if (error instanceof DriverTimeoutException) {
473484
throttler.signalTimeout(this);
@@ -860,7 +871,8 @@ private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNan
860871
nodeResponseTimeNanos = System.nanoTime();
861872
}
862873
long latencyNanos = nodeResponseTimeNanos - this.nodeStartTimeNanos;
863-
requestTracker.onNodeError(statement, error, latencyNanos, executionProfile, node, logPrefix);
874+
requestTracker.onNodeError(
875+
statement, error, latencyNanos, executionProfile, node, null, logPrefix);
864876
}
865877

866878
@Override

0 commit comments

Comments
 (0)