Skip to content

Commit 89309d8

Browse files
fix: Avoid blocking thread in AsyncResultSet
1 parent 7096899 commit 89309d8

File tree

11 files changed

+61
-44
lines changed

11 files changed

+61
-44
lines changed

google-cloud-spanner/clirr-ignored-differences.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -790,5 +790,9 @@
790790
<className>com/google/cloud/spanner/connection/Connection</className>
791791
<method>boolean isAutoBatchDmlUpdateCountVerification()</method>
792792
</difference>
793-
793+
<difference>
794+
<differenceType>7012</differenceType>
795+
<className>com/google/cloud/spanner/ResultSet</className>
796+
<method>boolean initiateStreaming(com.google.cloud.spanner.AsyncResultSet$StreamMessageListener)</method>
797+
</difference>
794798
</differences>

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -768,8 +768,9 @@ ResultSet executeQueryInternalWithOptions(
768768
rpc.getExecuteQueryRetrySettings(),
769769
rpc.getExecuteQueryRetryableCodes()) {
770770
@Override
771-
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken,
772-
AsyncResultSet.StreamMessageListener streamListener) {
771+
CloseableIterator<PartialResultSet> startStream(
772+
@Nullable ByteString resumeToken,
773+
AsyncResultSet.StreamMessageListener streamListener) {
773774
GrpcStreamIterator stream =
774775
new GrpcStreamIterator(statement, prefetchChunks, cancelQueryWhenClientIsClosed);
775776
stream.registerListener(streamListener);
@@ -961,8 +962,9 @@ ResultSet readInternalWithOptions(
961962
rpc.getReadRetrySettings(),
962963
rpc.getReadRetryableCodes()) {
963964
@Override
964-
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken,
965-
AsyncResultSet.StreamMessageListener streamListener) {
965+
CloseableIterator<PartialResultSet> startStream(
966+
@Nullable ByteString resumeToken,
967+
AsyncResultSet.StreamMessageListener streamListener) {
966968
GrpcStreamIterator stream =
967969
new GrpcStreamIterator(prefetchChunks, cancelQueryWhenClientIsClosed);
968970
stream.registerListener(streamListener);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.google.api.core.ApiFuture;
2020
import com.google.common.base.Function;
2121
import com.google.spanner.v1.PartialResultSet;
22-
2322
import java.util.List;
2423
import java.util.concurrent.ExecutionException;
2524
import java.util.concurrent.Executor;
@@ -227,16 +226,20 @@ interface ReadyCallback {
227226
<T> List<T> toList(Function<StructReader, T> transformer) throws SpannerException;
228227

229228
/**
230-
* An interface to register the listener for streaming gRPC request. It will be called when a chunk is received
231-
* from gRPC streaming call.
229+
* An interface to register the listener for streaming gRPC request. It will be called when a
230+
* chunk is received from gRPC streaming call.
232231
*/
233232
interface StreamMessageListener {
234-
void onStreamMessage(PartialResultSet partialResultSet, int prefetchChunks, int currentBufferSize, StreamMessageRequestor streamMessageRequestor);
233+
void onStreamMessage(
234+
PartialResultSet partialResultSet,
235+
int prefetchChunks,
236+
int currentBufferSize,
237+
StreamMessageRequestor streamMessageRequestor);
235238
}
236239

237240
/**
238-
* An interface to request more messages from the gRPC streaming call. It will be implemented by the class which has access
239-
* to SpannerRpc.StreamingCall object
241+
* An interface to request more messages from the gRPC streaming call. It will be implemented by
242+
* the class which has access to SpannerRpc.StreamingCall object
240243
*/
241244
interface StreamMessageRequestor {
242245
void requestMessages(int numOfMessages);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import com.google.spanner.v1.PartialResultSet;
3232
import com.google.spanner.v1.ResultSetMetadata;
3333
import com.google.spanner.v1.ResultSetStats;
34-
3534
import java.util.Collection;
3635
import java.util.LinkedList;
3736
import java.util.List;
@@ -40,10 +39,10 @@
4039
import java.util.logging.Logger;
4140

4241
/** Default implementation for {@link AsyncResultSet}. */
43-
class AsyncResultSetImpl extends ForwardingStructReader implements ListenableAsyncResultSet, AsyncResultSet.StreamMessageListener {
42+
class AsyncResultSetImpl extends ForwardingStructReader
43+
implements ListenableAsyncResultSet, AsyncResultSet.StreamMessageListener {
4444
private static final Logger log = Logger.getLogger(AsyncResultSetImpl.class.getName());
4545

46-
4746
/** State of an {@link AsyncResultSetImpl}. */
4847
private enum State {
4948
INITIALIZED,
@@ -458,7 +457,7 @@ private class InitiateStreamingRunnable implements Runnable {
458457
@Override
459458
public void run() {
460459
try {
461-
if(!initiateStreaming(AsyncResultSetImpl.this)) {
460+
if (!initiateStreaming(AsyncResultSetImpl.this)) {
462461
initiateProduceRows();
463462
}
464463
} catch (SpannerException e) {
@@ -504,7 +503,6 @@ public void cancel() {
504503
"cannot cancel a result set without a callback");
505504
state = State.CANCELLED;
506505
pausedLatch.countDown();
507-
this.result.setException(CANCELLED_EXCEPTION);
508506
}
509507
}
510508

@@ -625,14 +623,21 @@ public Struct getCurrentRowAsStruct() {
625623
}
626624

627625
@Override
628-
public void onStreamMessage(PartialResultSet partialResultSet, int prefetchChunks, int currentBufferSize, StreamMessageRequestor streamMessageRequestor) {
626+
public void onStreamMessage(
627+
PartialResultSet partialResultSet,
628+
int prefetchChunks,
629+
int currentBufferSize,
630+
StreamMessageRequestor streamMessageRequestor) {
629631
synchronized (monitor) {
630632
if (state == State.IN_PROGRESS) {
631-
// if PartialResultSet contains resume token or buffer size is more than configured size or we have reached
633+
// if PartialResultSet contains resume token or buffer size is more than configured size or
634+
// we have reached
632635
// end of stream, we can start the thread
633-
boolean startJobThread = !partialResultSet.getResumeToken().isEmpty()
634-
|| currentBufferSize > prefetchChunks || partialResultSet == GrpcStreamIterator.END_OF_STREAM;
635-
if (startJobThread){
636+
boolean startJobThread =
637+
!partialResultSet.getResumeToken().isEmpty()
638+
|| currentBufferSize > prefetchChunks
639+
|| partialResultSet == GrpcStreamIterator.END_OF_STREAM;
640+
if (startJobThread) {
636641
initiateProduceRows();
637642
} else {
638643
streamMessageRequestor.requestMessages(1);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,6 @@ public ResultSetMetadata getMetadata() {
105105

106106
@Override
107107
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
108-
return delegate.get().initiateStreaming(streamMessageListener);
108+
return delegate.get().initiateStreaming(streamMessageListener);
109109
}
110110
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,14 @@
2323
import com.google.common.collect.AbstractIterator;
2424
import com.google.common.util.concurrent.Uninterruptibles;
2525
import com.google.spanner.v1.PartialResultSet;
26-
import org.threeten.bp.Duration;
27-
28-
import javax.annotation.Nullable;
2926
import java.util.Optional;
3027
import java.util.concurrent.BlockingQueue;
3128
import java.util.concurrent.LinkedBlockingQueue;
3229
import java.util.concurrent.TimeUnit;
3330
import java.util.logging.Level;
3431
import java.util.logging.Logger;
32+
import javax.annotation.Nullable;
33+
import org.threeten.bp.Duration;
3534

3635
/** Adapts a streaming read/query call into an iterator over partial result sets. */
3736
@VisibleForTesting
@@ -199,7 +198,7 @@ public boolean cancelQueryWhenClientIsClosed() {
199198
}
200199

201200
private void onStreamMessage(PartialResultSet partialResultSet) {
202-
Optional.ofNullable(streamMessageListener).ifPresent(
203-
sl -> sl.onStreamMessage(partialResultSet, prefetchChunks, stream.size(), this));
201+
Optional.ofNullable(streamMessageListener)
202+
.ifPresent(sl -> sl.onStreamMessage(partialResultSet, prefetchChunks, stream.size(), this));
204203
}
205204
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ default ResultSetMetadata getMetadata() {
8383
throw new UnsupportedOperationException("Method should be overridden");
8484
}
8585

86+
/**
87+
* Returns the {@link boolean} for this {@link ResultSet}. This method will be used by AsyncResultSet
88+
* to initiate gRPC streaming
89+
*/
8690
default boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
8791
return false;
8892
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616

1717
package com.google.cloud.spanner;
1818

19+
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
20+
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionForCancellation;
21+
import static com.google.common.base.Preconditions.checkArgument;
22+
import static com.google.common.base.Preconditions.checkNotNull;
23+
1924
import com.google.api.client.util.BackOff;
2025
import com.google.api.client.util.ExponentialBackOff;
2126
import com.google.api.gax.grpc.GrpcStatusCode;
@@ -30,8 +35,6 @@
3035
import com.google.spanner.v1.PartialResultSet;
3136
import io.grpc.Context;
3237
import io.opentelemetry.api.common.Attributes;
33-
34-
import javax.annotation.Nullable;
3538
import java.io.IOException;
3639
import java.util.LinkedList;
3740
import java.util.Objects;
@@ -41,11 +44,7 @@
4144
import java.util.concurrent.TimeUnit;
4245
import java.util.logging.Level;
4346
import java.util.logging.Logger;
44-
45-
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
46-
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionForCancellation;
47-
import static com.google.common.base.Preconditions.checkArgument;
48-
import static com.google.common.base.Preconditions.checkNotNull;
47+
import javax.annotation.Nullable;
4948

5049
/**
5150
* Wraps an iterator over partial result sets, supporting resuming RPCs on error. This class keeps
@@ -198,8 +197,8 @@ public void execute(Runnable command) {
198197
}
199198
}
200199

201-
abstract CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken,
202-
AsyncResultSet.StreamMessageListener streamMessageListener);
200+
abstract CloseableIterator<PartialResultSet> startStream(
201+
@Nullable ByteString resumeToken, AsyncResultSet.StreamMessageListener streamMessageListener);
203202

204203
/**
205204
* Prepares the iterator for a retry on a different gRPC channel. Returns true if that is
@@ -318,9 +317,9 @@ && prepareIteratorForRetryOnDifferentGrpcChannel()) {
318317
private void eagerStartStreaming() {
319318
if (stream == null) {
320319
span.addAnnotation(
321-
"Starting/Resuming stream",
322-
"ResumeToken",
323-
resumeToken == null ? "null" : resumeToken.toStringUtf8());
320+
"Starting/Resuming stream",
321+
"ResumeToken",
322+
resumeToken == null ? "null" : resumeToken.toStringUtf8());
324323
try (IScope scope = tracer.withSpan(span)) {
325324
// When start a new stream set the Span as current to make the gRPC Span a child of
326325
// this Span.

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,8 @@ public boolean next() throws SpannerException {
289289
}
290290

291291
@Override
292-
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
292+
public boolean initiateStreaming(
293+
AsyncResultSet.StreamMessageListener streamMessageListener) {
293294
try {
294295
boolean ret = super.initiateStreaming(streamMessageListener);
295296
if (beforeFirst) {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@
4848
import org.junit.Test;
4949
import org.junit.runner.RunWith;
5050
import org.junit.runners.JUnit4;
51-
import org.mockito.Mock;
52-
import org.mockito.Mockito;
5351
import org.mockito.invocation.InvocationOnMock;
5452
import org.mockito.stubbing.Answer;
5553

0 commit comments

Comments
 (0)