Skip to content

Commit ef95543

Browse files
committed
Merge branch 'main' into PR #3928 to update
2 parents 798940a + 54f4b5a commit ef95543

File tree

13 files changed

+177
-29
lines changed

13 files changed

+177
-29
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -818,7 +818,11 @@ CloseableIterator<PartialResultSet> startStream(
818818
@Nullable ByteString resumeToken,
819819
AsyncResultSet.StreamMessageListener streamListener) {
820820
GrpcStreamIterator stream =
821-
new GrpcStreamIterator(statement, prefetchChunks, cancelQueryWhenClientIsClosed);
821+
new GrpcStreamIterator(
822+
statement,
823+
request.getLastStatement(),
824+
prefetchChunks,
825+
cancelQueryWhenClientIsClosed);
822826
if (streamListener != null) {
823827
stream.registerListener(streamListener);
824828
}
@@ -935,7 +939,8 @@ String getTransactionTag() {
935939
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {}
936940

937941
@Override
938-
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
942+
public SpannerException onError(
943+
SpannerException e, boolean withBeginTransaction, boolean lastStatement) {
939944
this.session.onError(e);
940945
return e;
941946
}
@@ -1009,6 +1014,8 @@ ResultSet readInternalWithOptions(
10091014
}
10101015
final int prefetchChunks =
10111016
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
1017+
final boolean lastStatement =
1018+
readOptions.hasLastStatement() ? readOptions.isLastStatement() : false;
10121019
ResumableStreamIterator stream =
10131020
new ResumableStreamIterator(
10141021
MAX_BUFFERED_CHUNKS,
@@ -1025,7 +1032,8 @@ CloseableIterator<PartialResultSet> startStream(
10251032
@Nullable ByteString resumeToken,
10261033
AsyncResultSet.StreamMessageListener streamListener) {
10271034
GrpcStreamIterator stream =
1028-
new GrpcStreamIterator(prefetchChunks, cancelQueryWhenClientIsClosed);
1035+
new GrpcStreamIterator(
1036+
lastStatement, prefetchChunks, cancelQueryWhenClientIsClosed);
10291037
if (streamListener != null) {
10301038
stream.registerListener(streamListener);
10311039
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)
5656
throws SpannerException;
5757

5858
/** Called when the read finishes with an error. Returns the error that should be thrown. */
59-
SpannerException onError(SpannerException e, boolean withBeginTransaction);
59+
SpannerException onError(
60+
SpannerException e, boolean withBeginTransaction, boolean lastStatement);
6061

6162
/** Called when the read finishes normally. */
6263
void onDone(boolean withBeginTransaction);
@@ -153,6 +154,8 @@ interface CloseableIterator<T> extends Iterator<T> {
153154

154155
boolean isWithBeginTransaction();
155156

157+
boolean isLastStatement();
158+
156159
/**
157160
* @param streamMessageListener A class object which implements StreamMessageListener
158161
* @return true if streaming is supported by the iterator, otherwise false

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ public boolean next() throws SpannerException {
109109
} catch (Throwable t) {
110110
throw yieldError(
111111
SpannerExceptionFactory.asSpannerException(t),
112-
iterator.isWithBeginTransaction() && currRow == null);
112+
iterator.isWithBeginTransaction() && currRow == null,
113+
iterator.isLastStatement());
113114
}
114115
}
115116

@@ -149,8 +150,9 @@ public Type getType() {
149150
return currRow.getType();
150151
}
151152

152-
private SpannerException yieldError(SpannerException e, boolean beginTransaction) {
153-
SpannerException toThrow = listener.onError(e, beginTransaction);
153+
private SpannerException yieldError(
154+
SpannerException e, boolean beginTransaction, boolean lastStatement) {
155+
SpannerException toThrow = listener.onError(e, beginTransaction, lastStatement);
154156
close();
155157
throw toThrow;
156158
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,20 +49,26 @@ class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
4949

5050
private SpannerRpc.StreamingCall call;
5151
private volatile boolean withBeginTransaction;
52+
private final boolean lastStatement;
5253
private TimeUnit streamWaitTimeoutUnit;
5354
private long streamWaitTimeoutValue;
5455
private SpannerException error;
5556
private boolean done;
5657

5758
@VisibleForTesting
58-
GrpcStreamIterator(int prefetchChunks, boolean cancelQueryWhenClientIsClosed) {
59-
this(null, prefetchChunks, cancelQueryWhenClientIsClosed);
59+
GrpcStreamIterator(
60+
boolean lastStatement, int prefetchChunks, boolean cancelQueryWhenClientIsClosed) {
61+
this(null, lastStatement, prefetchChunks, cancelQueryWhenClientIsClosed);
6062
}
6163

6264
@VisibleForTesting
6365
GrpcStreamIterator(
64-
Statement statement, int prefetchChunks, boolean cancelQueryWhenClientIsClosed) {
66+
Statement statement,
67+
boolean lastStatement,
68+
int prefetchChunks,
69+
boolean cancelQueryWhenClientIsClosed) {
6570
this.statement = statement;
71+
this.lastStatement = lastStatement;
6672
this.prefetchChunks = prefetchChunks;
6773
this.consumer = new ConsumerImpl(cancelQueryWhenClientIsClosed);
6874
// One extra to allow for END_OF_STREAM message.
@@ -118,6 +124,11 @@ public boolean isWithBeginTransaction() {
118124
return withBeginTransaction;
119125
}
120126

127+
@Override
128+
public boolean isLastStatement() {
129+
return lastStatement;
130+
}
131+
121132
@Override
122133
protected final PartialResultSet computeNext() {
123134
PartialResultSet next;

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ boolean isWithBeginTransaction() {
183183
return stream.isWithBeginTransaction();
184184
}
185185

186+
boolean isLastStatement() {
187+
return stream.isLastStatement();
188+
}
189+
186190
/**
187191
* @param a is a mutable list and b will be concatenated into a.
188192
*/

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,11 @@ public boolean isWithBeginTransaction() {
242242
return stream != null && stream.isWithBeginTransaction();
243243
}
244244

245+
@Override
246+
public boolean isLastStatement() {
247+
return stream != null && stream.isLastStatement();
248+
}
249+
245250
@Override
246251
@InternalApi
247252
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,11 @@ public void run() {
559559
span.addAnnotation("Commit Failed", resultException);
560560
opSpan.setStatus(resultException);
561561
opSpan.end();
562-
res.setException(onError(resultException, false));
562+
res.setException(
563+
onError(
564+
resultException,
565+
/* withBeginTransaction= */ false,
566+
/* lastStatement= */ true));
563567
} catch (Throwable unexpectedError) {
564568
// This is a safety precaution to make sure that a result is always returned.
565569
res.setException(unexpectedError);
@@ -759,8 +763,9 @@ MultiplexedSessionPrecommitToken getLatestPrecommitToken() {
759763
}
760764

761765
@Override
762-
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
763-
e = super.onError(e, withBeginTransaction);
766+
public SpannerException onError(
767+
SpannerException e, boolean withBeginTransaction, boolean lastStatement) {
768+
e = super.onError(e, withBeginTransaction, lastStatement);
764769

765770
// If the statement that caused an error was the statement that included a BeginTransaction
766771
// option, we simulate an aborted transaction to force a retry of the entire transaction. This
@@ -770,13 +775,17 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction
770775
// statement are included in the transaction, even if the statement again causes an error
771776
// during the retry.
772777
if (withBeginTransaction) {
773-
// Simulate an aborted transaction to force a retry with a new transaction.
774-
this.transactionIdFuture.setException(
775-
SpannerExceptionFactory.newSpannerException(
776-
ErrorCode.ABORTED,
777-
"Aborted due to failed initial statement",
778-
SpannerExceptionFactory.createAbortedExceptionWithRetryDelay(
779-
"Aborted due to failed initial statement", e, 0, 1)));
778+
if (lastStatement) {
779+
this.transactionIdFuture.setException(e);
780+
} else {
781+
// Simulate an aborted transaction to force a retry with a new transaction.
782+
this.transactionIdFuture.setException(
783+
SpannerExceptionFactory.newSpannerException(
784+
ErrorCode.ABORTED,
785+
"Aborted due to failed initial statement",
786+
SpannerExceptionFactory.createAbortedExceptionWithRetryDelay(
787+
"Aborted due to failed initial statement", e, 0, 1)));
788+
}
780789
}
781790
SpannerException exceptionToThrow;
782791
if (withBeginTransaction
@@ -950,7 +959,9 @@ private ResultSet internalExecuteUpdate(
950959
return resultSet;
951960
} catch (Throwable t) {
952961
throw onError(
953-
SpannerExceptionFactory.asSpannerException(t), builder.getTransaction().hasBegin());
962+
SpannerExceptionFactory.asSpannerException(t),
963+
builder.getTransaction().hasBegin(),
964+
builder.getLastStatement());
954965
}
955966
}
956967

@@ -1008,7 +1019,7 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... u
10081019
input -> {
10091020
SpannerException e = SpannerExceptionFactory.asSpannerException(input);
10101021
SpannerException exceptionToThrow =
1011-
onError(e, builder.getTransaction().hasBegin());
1022+
onError(e, builder.getTransaction().hasBegin(), builder.getLastStatement());
10121023
span.setStatus(exceptionToThrow);
10131024
throw exceptionToThrow;
10141025
},
@@ -1099,7 +1110,9 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... update
10991110
return results;
11001111
} catch (Throwable e) {
11011112
throw onError(
1102-
SpannerExceptionFactory.asSpannerException(e), builder.getTransaction().hasBegin());
1113+
SpannerExceptionFactory.asSpannerException(e),
1114+
builder.getTransaction().hasBegin(),
1115+
builder.getLastStatements());
11031116
}
11041117
} catch (Throwable throwable) {
11051118
span.setStatus(throwable);
@@ -1178,7 +1191,7 @@ public ApiFuture<long[]> batchUpdateAsync(
11781191
input -> {
11791192
SpannerException e = SpannerExceptionFactory.asSpannerException(input);
11801193
SpannerException exceptionToThrow =
1181-
onError(e, builder.getTransaction().hasBegin());
1194+
onError(e, builder.getTransaction().hasBegin(), builder.getLastStatements());
11821195
span.setStatus(exceptionToThrow);
11831196
throw exceptionToThrow;
11841197
},

google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
7575
throws SpannerException {}
7676

7777
@Override
78-
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
78+
public SpannerException onError(
79+
SpannerException e, boolean withBeginTransaction, boolean lastStatement) {
7980
return e;
8081
}
8182

@@ -88,7 +89,9 @@ public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}
8889

8990
@Before
9091
public void setUp() {
91-
stream = new GrpcStreamIterator(10, /* cancelQueryWhenClientIsClosed= */ false);
92+
stream =
93+
new GrpcStreamIterator(
94+
/* lastStatement= */ false, 10, /* cancelQueryWhenClientIsClosed= */ false);
9295
stream.setCall(
9396
new SpannerRpc.StreamingCall() {
9497
@Override

google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
5151
throws SpannerException {}
5252

5353
@Override
54-
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
54+
public SpannerException onError(
55+
SpannerException e, boolean withBeginTransaction, boolean lastStatement) {
5556
return e;
5657
}
5758

@@ -118,7 +119,9 @@ private static class TestCaseRunner {
118119
}
119120

120121
private void run() throws Exception {
121-
stream = new GrpcStreamIterator(10, /* cancelQueryWhenClientIsClosed= */ false);
122+
stream =
123+
new GrpcStreamIterator(
124+
/* lastStatement= */ false, 10, /* cancelQueryWhenClientIsClosed= */ false);
122125
stream.setCall(
123126
new SpannerRpc.StreamingCall() {
124127
@Override

google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ public boolean isWithBeginTransaction() {
4747
return false;
4848
}
4949

50+
@Override
51+
public boolean isLastStatement() {
52+
return false;
53+
}
54+
5055
@Override
5156
public boolean hasNext() {
5257
return first || iterator.hasNext();
@@ -77,7 +82,8 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
7782
throws SpannerException {}
7883

7984
@Override
80-
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
85+
public SpannerException onError(
86+
SpannerException e, boolean withBeginTransaction, boolean isLastStatement) {
8187
return e;
8288
}
8389

0 commit comments

Comments
 (0)