2424import com .google .cloud .spanner .SpannerImpl .ClosedException ;
2525import com .google .cloud .spanner .Statement .StatementFactory ;
2626import com .google .common .annotations .VisibleForTesting ;
27- import com .google .common .base .Function ;
2827import com .google .common .util .concurrent .ListenableFuture ;
2928import com .google .spanner .v1 .BatchWriteResponse ;
3029import io .opentelemetry .api .common .Attributes ;
30+ import java .util .ArrayList ;
31+ import java .util .Arrays ;
32+ import java .util .Objects ;
3133import java .util .concurrent .ExecutionException ;
3234import java .util .concurrent .Future ;
3335import java .util .concurrent .TimeUnit ;
3436import java .util .concurrent .TimeoutException ;
37+ import java .util .concurrent .atomic .AtomicInteger ;
38+ import java .util .function .BiFunction ;
3539import javax .annotation .Nullable ;
3640
3741class DatabaseClientImpl implements DatabaseClient {
@@ -45,6 +49,8 @@ class DatabaseClientImpl implements DatabaseClient {
4549 @ VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient ;
4650 @ VisibleForTesting final boolean useMultiplexedSessionPartitionedOps ;
4751 @ VisibleForTesting final boolean useMultiplexedSessionForRW ;
52+ private final int dbId ;
53+ private final AtomicInteger nthRequest ;
4854
4955 final boolean useMultiplexedSessionBlindWrite ;
5056
@@ -91,6 +97,18 @@ class DatabaseClientImpl implements DatabaseClient {
9197 this .tracer = tracer ;
9298 this .useMultiplexedSessionForRW = useMultiplexedSessionForRW ;
9399 this .commonAttributes = commonAttributes ;
100+
101+ this .dbId = this .dbIdFromClientId (this .clientId );
102+ this .nthRequest = new AtomicInteger (0 );
103+ }
104+
105+ private int dbIdFromClientId (String clientId ) {
106+ int i = clientId .indexOf ("-" );
107+ String strWithValue = clientId .substring (i + 1 );
108+ if (Objects .equals (strWithValue , "" )) {
109+ strWithValue = "0" ;
110+ }
111+ return Integer .parseInt (strWithValue );
94112 }
95113
96114 @ VisibleForTesting
@@ -188,7 +206,11 @@ public CommitResponse writeWithOptions(
188206 if (canUseMultiplexedSessionsForRW () && getMultiplexedSessionDatabaseClient () != null ) {
189207 return getMultiplexedSessionDatabaseClient ().writeWithOptions (mutations , options );
190208 }
191- return runWithSessionRetry (session -> session .writeWithOptions (mutations , options ));
209+
210+ return runWithSessionRetry (
211+ (session , reqId ) -> {
212+ return session .writeWithOptions (mutations , withReqId (reqId , options ));
213+ });
192214 } catch (RuntimeException e ) {
193215 span .setStatus (e );
194216 throw e ;
@@ -213,7 +235,8 @@ public CommitResponse writeAtLeastOnceWithOptions(
213235 .writeAtLeastOnceWithOptions (mutations , options );
214236 }
215237 return runWithSessionRetry (
216- session -> session .writeAtLeastOnceWithOptions (mutations , options ));
238+ (session , reqId ) ->
239+ session .writeAtLeastOnceWithOptions (mutations , withReqId (reqId , options )));
217240 } catch (RuntimeException e ) {
218241 span .setStatus (e );
219242 throw e ;
@@ -222,6 +245,10 @@ public CommitResponse writeAtLeastOnceWithOptions(
222245 }
223246 }
224247
248+ private int nextNthRequest () {
249+ return this .nthRequest .incrementAndGet ();
250+ }
251+
225252 @ Override
226253 public ServerStream <BatchWriteResponse > batchWriteAtLeastOnce (
227254 final Iterable <MutationGroup > mutationGroups , final TransactionOption ... options )
@@ -231,7 +258,9 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
231258 if (canUseMultiplexedSessionsForRW () && getMultiplexedSessionDatabaseClient () != null ) {
232259 return getMultiplexedSessionDatabaseClient ().batchWriteAtLeastOnce (mutationGroups , options );
233260 }
234- return runWithSessionRetry (session -> session .batchWriteAtLeastOnce (mutationGroups , options ));
261+ return runWithSessionRetry (
262+ (session , reqId ) ->
263+ session .batchWriteAtLeastOnce (mutationGroups , withReqId (reqId , options )));
235264 } catch (RuntimeException e ) {
236265 span .setStatus (e );
237266 throw e ;
@@ -383,27 +412,57 @@ private Future<Dialect> getDialectAsync() {
383412 return pool .getDialectAsync ();
384413 }
385414
415+ private UpdateOption [] withReqId (
416+ final XGoogSpannerRequestId reqId , final UpdateOption ... options ) {
417+ if (reqId == null ) {
418+ return options ;
419+ }
420+ ArrayList <UpdateOption > allOptions = new ArrayList (Arrays .asList (options ));
421+ allOptions .add (new Options .RequestIdOption (reqId ));
422+ return allOptions .toArray (new UpdateOption [0 ]);
423+ }
424+
425+ private TransactionOption [] withReqId (
426+ final XGoogSpannerRequestId reqId , final TransactionOption ... options ) {
427+ if (reqId == null ) {
428+ return options ;
429+ }
430+ ArrayList <TransactionOption > allOptions = new ArrayList (Arrays .asList (options ));
431+ allOptions .add (new Options .RequestIdOption (reqId ));
432+ return allOptions .toArray (new TransactionOption [0 ]);
433+ }
434+
386435 private long executePartitionedUpdateWithPooledSession (
387436 final Statement stmt , final UpdateOption ... options ) {
388437 ISpan span = tracer .spanBuilder (PARTITION_DML_TRANSACTION , commonAttributes );
389438 try (IScope s = tracer .withSpan (span )) {
390- return runWithSessionRetry (session -> session .executePartitionedUpdate (stmt , options ));
439+ return runWithSessionRetry (
440+ (session , reqId ) -> {
441+ return session .executePartitionedUpdate (stmt , withReqId (reqId , options ));
442+ });
391443 } catch (RuntimeException e ) {
392444 span .setStatus (e );
393445 span .end ();
394446 throw e ;
395447 }
396448 }
397449
398- private <T > T runWithSessionRetry (Function <Session , T > callable ) {
450+ private <T > T runWithSessionRetry (BiFunction <Session , XGoogSpannerRequestId , T > callable ) {
399451 PooledSessionFuture session = getSession ();
452+ XGoogSpannerRequestId reqId =
453+ XGoogSpannerRequestId .of (
454+ this .dbId , Long .valueOf (session .getChannel ()), this .nextNthRequest (), 0 );
400455 while (true ) {
401456 try {
402- return callable .apply (session );
457+ reqId .incrementAttempt ();
458+ return callable .apply (session , reqId );
403459 } catch (SessionNotFoundException e ) {
404460 session =
405461 (PooledSessionFuture )
406462 pool .getPooledSessionReplacementHandler ().replaceSession (e , session );
463+ reqId =
464+ XGoogSpannerRequestId .of (
465+ this .dbId , Long .valueOf (session .getChannel ()), this .nextNthRequest (), 0 );
407466 }
408467 }
409468 }
0 commit comments