-
Notifications
You must be signed in to change notification settings - Fork 884
CASSJAVA-97: Let users inject an ID for each request and write to the custom payload #2037
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 4.x
Are you sure you want to change the base?
Changes from 6 commits
b774dff
5b12695
6660c04
15d0396
c075c0f
ce2ae9a
5cfcbc2
5f95b14
2f40b1e
457d582
05159d8
3505d7a
4fa88a0
e2a2482
6368097
f30baf7
69d0666
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.datastax.oss.driver.api.core.tracker; | ||
|
||
import com.datastax.oss.driver.api.core.session.Request; | ||
import edu.umd.cs.findbugs.annotations.NonNull; | ||
|
||
public interface DistributedTraceIdGenerator { | ||
SiyaoIsHiding marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/** | ||
* Generates a unique identifier for the session request. This identifier will be added to logs. | ||
SiyaoIsHiding marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* | ||
* @param statement the statement to be executed | ||
* @param sessionName the name of the session | ||
* @param hashCode the hashcode of the CqlRequestHandler | ||
* @return a unique identifier for the session request | ||
*/ | ||
String getSessionRequestId(@NonNull Request statement, @NonNull String sessionName, int hashCode); | ||
|
||
/** | ||
* Generates a unique identifier for the node request. This identifier will be added to logs, and | ||
* propagated to request trackers. | ||
* | ||
* @param statement the statement to be executed | ||
* @param sessionRequestId the session request identifier | ||
* @param executionCount the number of previous node requests for this session request, due to | ||
* retries or speculative executions | ||
* @return a unique identifier for the node request | ||
*/ | ||
String getNodeRequestId( | ||
@NonNull Request statement, @NonNull String sessionRequestId, int executionCount); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,7 @@ | |
import com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException; | ||
import com.datastax.oss.driver.api.core.session.throttling.RequestThrottler; | ||
import com.datastax.oss.driver.api.core.session.throttling.Throttled; | ||
import com.datastax.oss.driver.api.core.tracker.DistributedTraceIdGenerator; | ||
import com.datastax.oss.driver.api.core.tracker.RequestTracker; | ||
import com.datastax.oss.driver.internal.core.adminrequest.ThrottledAdminRequestHandler; | ||
import com.datastax.oss.driver.internal.core.adminrequest.UnexpectedResponseException; | ||
|
@@ -78,8 +79,10 @@ | |
import io.netty.util.concurrent.Future; | ||
import io.netty.util.concurrent.GenericFutureListener; | ||
import java.nio.ByteBuffer; | ||
import java.nio.charset.StandardCharsets; | ||
import java.time.Duration; | ||
import java.util.AbstractMap; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Queue; | ||
|
@@ -125,12 +128,14 @@ public class CqlRequestHandler implements Throttled { | |
private final List<NodeResponseCallback> inFlightCallbacks; | ||
private final RequestThrottler throttler; | ||
private final RequestTracker requestTracker; | ||
private final DistributedTraceIdGenerator distributedTraceIdGenerator; | ||
private final SessionMetricUpdater sessionMetricUpdater; | ||
private final DriverExecutionProfile executionProfile; | ||
|
||
// The errors on the nodes that were already tried (lazily initialized on the first error). | ||
// We don't use a map because nodes can appear multiple times. | ||
private volatile List<Map.Entry<Node, Throwable>> errors; | ||
private final String customPayloadKey; | ||
|
||
protected CqlRequestHandler( | ||
Statement<?> statement, | ||
|
@@ -139,7 +144,10 @@ protected CqlRequestHandler( | |
String sessionLogPrefix) { | ||
|
||
this.startTimeNanos = System.nanoTime(); | ||
this.logPrefix = sessionLogPrefix + "|" + this.hashCode(); | ||
this.distributedTraceIdGenerator = context.getDistributedTraceIdGenerator(); | ||
this.logPrefix = | ||
this.distributedTraceIdGenerator.getSessionRequestId( | ||
statement, sessionLogPrefix, this.hashCode()); | ||
LOG.trace("[{}] Creating new handler for request {}", logPrefix, statement); | ||
|
||
this.initialStatement = statement; | ||
|
@@ -170,6 +178,11 @@ protected CqlRequestHandler( | |
|
||
this.timer = context.getNettyOptions().getTimer(); | ||
this.executionProfile = Conversions.resolveExecutionProfile(initialStatement, context); | ||
|
||
this.customPayloadKey = | ||
this.executionProfile.getString( | ||
DefaultDriverOption.DISTRIBUTED_TRACE_ID_CUSTOM_PAYLOAD_KEY); | ||
|
||
Duration timeout = Conversions.resolveRequestTimeout(statement, executionProfile); | ||
this.scheduledTimeout = scheduleTimeout(timeout); | ||
|
||
|
@@ -248,6 +261,18 @@ private void sendRequest( | |
if (result.isDone()) { | ||
return; | ||
} | ||
String nodeRequestId = | ||
this.distributedTraceIdGenerator.getNodeRequestId( | ||
statement, logPrefix, currentExecutionIndex); | ||
if (!this.customPayloadKey.isEmpty()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are not missing |
||
// We cannot do statement.getCustomPayload().put() because the default empty map is abstract | ||
// But this will create new Statement instance for every request. We might want to optimize | ||
// this | ||
Map<String, ByteBuffer> existingMap = new HashMap<>(statement.getCustomPayload()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you can copy just the payload, not the whole statement: Map<String, ByteBuffer> customPayload = statement.getCustomPayload();
if (!this.customPayloadKey.isEmpty()) {
customPayload =
NullAllowingImmutableMap.<String, ByteBuffer>builder()
.putAll(customPayload)
.put(
this.customPayloadKey,
ByteBuffer.wrap(nodeRequestId.getBytes(StandardCharsets.UTF_8)))
.build();
} Then modify line 307 like so: channel
- .write(message, statement.isTracing(), statement.getCustomPayload(), nodeResponseCallback)
+ .write(message, statement.isTracing(), customPayload, nodeResponseCallback)
.addListener(nodeResponseCallback); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This solves the concurrency problem, but it also means the subsequent |
||
existingMap.put( | ||
this.customPayloadKey, ByteBuffer.wrap(nodeRequestId.getBytes(StandardCharsets.UTF_8))); | ||
statement = statement.setCustomPayload(existingMap); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Overriding custom payload here is not thread-safe. If client application executes the same statement instance multiple times concurrently (not a good use-case, but still possible), we do not guarantee how this map will be changed. Maybe indeed, there is no other way than make a shallow copy of the statement. Will think about it.
|
||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the wrong place to do this. In most cases we haven't even selected the node yet; note that this happens immediately below where we poll the query plan if no node is explicitly set in the request. Assuming we update the request ID generation logic to correctly account for the target node the setting of custom payload fields should happen after we determine which node we're actually sending to. |
||
Node node = retriedNode; | ||
DriverChannel channel = null; | ||
if (node == null || (channel = session.getChannel(node, logPrefix)) == null) { | ||
|
@@ -276,7 +301,7 @@ private void sendRequest( | |
currentExecutionIndex, | ||
retryCount, | ||
scheduleNextExecution, | ||
logPrefix); | ||
nodeRequestId); | ||
Message message = Conversions.toMessage(statement, executionProfile, context); | ||
channel | ||
.write(message, statement.isTracing(), statement.getCustomPayload(), nodeResponseCallback) | ||
|
@@ -489,7 +514,7 @@ private NodeResponseCallback( | |
this.execution = execution; | ||
this.retryCount = retryCount; | ||
this.scheduleNextExecution = scheduleNextExecution; | ||
this.logPrefix = logPrefix + "|" + execution; | ||
this.logPrefix = logPrefix; | ||
} | ||
|
||
// this gets invoked once the write completes. | ||
|
Uh oh!
There was an error while loading. Please reload this page.