@@ -198,12 +198,15 @@ private static class PartialResultSetsIterator implements Iterator<PartialResult
198198 private boolean hasNext ;
199199 private boolean first = true ;
200200 private int currentRow = 0 ;
201- private boolean isMultiplexedSession = false ;
201+ private final boolean setPrecommitToken ;
202+ private final ByteString transactionId ;
202203
203- private PartialResultSetsIterator (ResultSet resultSet , boolean isMultiplexedSession ) {
204+ private PartialResultSetsIterator (
205+ ResultSet resultSet , boolean setPrecommitToken , ByteString transactionId ) {
204206 this .resultSet = resultSet ;
205207 this .hasNext = true ;
206- this .isMultiplexedSession = isMultiplexedSession ;
208+ this .setPrecommitToken = setPrecommitToken ;
209+ this .transactionId = transactionId ;
207210 }
208211
209212 @ Override
@@ -230,8 +233,8 @@ public PartialResultSet next() {
230233 }
231234 builder .setResumeToken (ByteString .copyFromUtf8 (String .format ("%09d" , currentRow )));
232235 hasNext = currentRow < resultSet .getRowsCount ();
233- if (this .isMultiplexedSession ) {
234- builder .setPrecommitToken (getPartialResultSetPrecommitToken ());
236+ if (this .setPrecommitToken ) {
237+ builder .setPrecommitToken (getPartialResultSetPrecommitToken (this . transactionId ));
235238 }
236239 return builder .build ();
237240 }
@@ -606,6 +609,10 @@ private static void checkStreamException(
606609 private ConcurrentMap <String , AtomicLong > transactionCounters = new ConcurrentHashMap <>();
607610 private ConcurrentMap <String , List <ByteString >> partitionTokens = new ConcurrentHashMap <>();
608611 private ConcurrentMap <ByteString , Instant > transactionLastUsed = new ConcurrentHashMap <>();
612+
613+ // Stores latest sequence number required for precommit token.
614+ private static ConcurrentMap <ByteString , AtomicInteger > transactionSequenceNo =
615+ new ConcurrentHashMap <>();
609616 private int maxNumSessionsInOneBatch = 100 ;
610617 private int maxTotalSessions = Integer .MAX_VALUE ;
611618 private Iterable <BatchWriteResponse > batchWriteResult = new ArrayList <>();
@@ -1056,8 +1063,8 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver<ResultSet> resp
10561063 ? Transaction .getDefaultInstance ()
10571064 : Transaction .newBuilder ().setId (transactionId ).build ())
10581065 .build ());
1059- if (session .getMultiplexed ()) {
1060- resultSetBuilder .setPrecommitToken (getResultSetPrecommitToken ());
1066+ if (session .getMultiplexed () && isReadWriteTransaction ( transactionId ) ) {
1067+ resultSetBuilder .setPrecommitToken (getResultSetPrecommitToken (transactionId ));
10611068 }
10621069 responseObserver .onNext (resultSetBuilder .build ());
10631070 }
@@ -1095,8 +1102,8 @@ private void returnResultSet(
10951102 }
10961103 ResultSet .Builder resultSetBuilder = resultSet .toBuilder ();
10971104 resultSetBuilder .setMetadata (metadata );
1098- if (session .getMultiplexed ()) {
1099- resultSetBuilder .setPrecommitToken (getResultSetPrecommitToken ());
1105+ if (session .getMultiplexed () && isReadWriteTransaction ( transactionId ) ) {
1106+ resultSetBuilder .setPrecommitToken (getResultSetPrecommitToken (transactionId ));
11001107 }
11011108 resultSet = resultSetBuilder .build ();
11021109 responseObserver .onNext (resultSet );
@@ -1193,8 +1200,8 @@ public void executeBatchDml(
11931200 .build ());
11941201 }
11951202 builder .setStatus (status );
1196- if (session .getMultiplexed ()) {
1197- builder .setPrecommitToken (getExecuteBatchDmlResponsePrecommitToken ());
1203+ if (session .getMultiplexed () && isReadWriteTransaction ( transactionId ) ) {
1204+ builder .setPrecommitToken (getExecuteBatchDmlResponsePrecommitToken (transactionId ));
11981205 }
11991206 responseObserver .onNext (builder .build ());
12001207 responseObserver .onCompleted ();
@@ -1726,7 +1733,10 @@ private void returnPartialResultSet(
17261733 }
17271734 resultSet = resultSet .toBuilder ().setMetadata (metadata ).build ();
17281735 PartialResultSetsIterator iterator =
1729- new PartialResultSetsIterator (resultSet , isMultiplexedSession );
1736+ new PartialResultSetsIterator (
1737+ resultSet ,
1738+ isMultiplexedSession && isReadWriteTransaction (transactionId ),
1739+ transactionId );
17301740 long index = 0L ;
17311741 while (iterator .hasNext ()) {
17321742 SimulatedExecutionTime .checkStreamException (
@@ -2060,6 +2070,7 @@ private void commitTransaction(ByteString transactionId) {
20602070 transactions .remove (transactionId );
20612071 isPartitionedDmlTransaction .remove (transactionId );
20622072 transactionLastUsed .remove (transactionId );
2073+ transactionSequenceNo .remove (transactionId );
20632074 }
20642075
20652076 @ Override
@@ -2091,13 +2102,15 @@ void rollbackTransaction(ByteString transactionId) {
20912102 transactions .remove (transactionId );
20922103 isPartitionedDmlTransaction .remove (transactionId );
20932104 transactionLastUsed .remove (transactionId );
2105+ transactionSequenceNo .remove (transactionId );
20942106 }
20952107
20962108 void markAbortedTransaction (ByteString transactionId ) {
20972109 abortedTransactions .put (transactionId , Boolean .TRUE );
20982110 transactions .remove (transactionId );
20992111 isPartitionedDmlTransaction .remove (transactionId );
21002112 transactionLastUsed .remove (transactionId );
2113+ transactionSequenceNo .remove (transactionId );
21012114 }
21022115
21032116 @ Override
@@ -2302,6 +2315,7 @@ public void reset() {
23022315 transactionCounters = new ConcurrentHashMap <>();
23032316 partitionTokens = new ConcurrentHashMap <>();
23042317 transactionLastUsed = new ConcurrentHashMap <>();
2318+ transactionSequenceNo = new ConcurrentHashMap <>();
23052319
23062320 numSessionsCreated .set (0 );
23072321 stickyGlobalExceptions = false ;
@@ -2474,22 +2488,31 @@ Session getSession(String name) {
24742488 return null ;
24752489 }
24762490
2477- static MultiplexedSessionPrecommitToken getResultSetPrecommitToken () {
2478- return getPrecommitToken ("ResultSetPrecommitToken" , 1 );
2491+ static MultiplexedSessionPrecommitToken getResultSetPrecommitToken (ByteString transactionId ) {
2492+ return getPrecommitToken ("ResultSetPrecommitToken" , transactionId );
24792493 }
24802494
2481- static MultiplexedSessionPrecommitToken getPartialResultSetPrecommitToken () {
2482- return getPrecommitToken ("PartialResultSetPrecommitToken" , 3 );
2495+ static MultiplexedSessionPrecommitToken getPartialResultSetPrecommitToken (
2496+ ByteString transactionId ) {
2497+ return getPrecommitToken ("PartialResultSetPrecommitToken" , transactionId );
24832498 }
24842499
2485- static MultiplexedSessionPrecommitToken getExecuteBatchDmlResponsePrecommitToken () {
2486- return getPrecommitToken ("ExecuteBatchDmlResponsePrecommitToken" , 2 );
2500+ static MultiplexedSessionPrecommitToken getExecuteBatchDmlResponsePrecommitToken (
2501+ ByteString transactionId ) {
2502+ return getPrecommitToken ("ExecuteBatchDmlResponsePrecommitToken" , transactionId );
24872503 }
24882504
2489- static MultiplexedSessionPrecommitToken getPrecommitToken (String value , int seqNo ) {
2505+ static MultiplexedSessionPrecommitToken getPrecommitToken (
2506+ String value , ByteString transactionId ) {
2507+ if (!transactionSequenceNo .containsKey (transactionId )) {
2508+ transactionSequenceNo .put (transactionId , new AtomicInteger (0 ));
2509+ }
2510+
2511+ // Generates an incrementing sequence number
2512+ int seqNum = transactionSequenceNo .get (transactionId ).addAndGet (1 );
24902513 return MultiplexedSessionPrecommitToken .newBuilder ()
24912514 .setPrecommitToken (ByteString .copyFromUtf8 (value ))
2492- .setSeqNum (seqNo )
2515+ .setSeqNum (seqNum )
24932516 .build ();
24942517 }
24952518}
0 commit comments