Skip to content

Commit 045e1e2

Browse files
fix(spanner): add logs
1 parent 0541692 commit 045e1e2

File tree

5 files changed

+84
-21
lines changed

5 files changed

+84
-21
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.google.spanner.v1.PartialResultSet;
3232
import com.google.spanner.v1.ResultSetMetadata;
3333
import com.google.spanner.v1.ResultSetStats;
34+
import java.time.OffsetDateTime;
3435
import java.util.Collection;
3536
import java.util.LinkedList;
3637
import java.util.List;
@@ -342,6 +343,9 @@ public void run() {
342343
boolean stop = false;
343344
boolean hasNext = false;
344345
try {
346+
System.out.printf(
347+
"[%s][%s] Inside ProduceRowsRunnable thread\n",
348+
OffsetDateTime.now(), Thread.currentThread().getName());
345349
hasNext = delegateResultSet.get().next();
346350
} catch (Throwable e) {
347351
synchronized (monitor) {
@@ -468,6 +472,9 @@ public void run() {
468472
// Those result sets will trigger initiateProduceRows() when the first results are received.
469473
// Non-streaming result sets do not trigger this callback, and for those result sets, we
470474
// need to eagerly start the ProduceRowsRunnable.
475+
System.out.printf(
476+
"[%s][%s] Inside InitiateStreamingRunnable thread\n",
477+
OffsetDateTime.now(), Thread.currentThread().getName());
471478
if (!initiateStreaming(AsyncResultSetImpl.this)) {
472479
initiateProduceRows();
473480
}
@@ -486,6 +493,8 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
486493
Preconditions.checkState(
487494
this.state == State.INITIALIZED, "callback may not be set multiple times");
488495

496+
System.out.printf(
497+
"[%s][%s] Inside callback\n", OffsetDateTime.now(), Thread.currentThread().getName());
489498
// Start to fetch data and buffer these.
490499
this.result = SettableApiFuture.create();
491500
this.state = State.STREAMING_INITIALIZED;
@@ -498,6 +507,9 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
498507
}
499508

500509
private void initiateProduceRows() {
510+
System.out.printf(
511+
"[%s][%s] Inside initiateProduceRows\n",
512+
OffsetDateTime.now(), Thread.currentThread().getName());
501513
synchronized (monitor) {
502514
if (this.state == State.STREAMING_INITIALIZED) {
503515
this.state = State.RUNNING;
@@ -639,6 +651,9 @@ public Struct getCurrentRowAsStruct() {
639651

640652
@Override
641653
public void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsFull) {
654+
System.out.printf(
655+
"[%s][%s] Inside onStreamMessage\n",
656+
OffsetDateTime.now(), Thread.currentThread().getName());
642657
synchronized (monitor) {
643658
if (produceRowsInitiated) {
644659
return;

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.common.base.Suppliers;
2323
import com.google.spanner.v1.ResultSetMetadata;
2424
import com.google.spanner.v1.ResultSetStats;
25+
import java.time.OffsetDateTime;
2526

2627
/** Forwarding implementation of ResultSet that forwards all calls to a delegate. */
2728
public class ForwardingResultSet extends ForwardingStructReader
@@ -108,6 +109,9 @@ public ResultSetMetadata getMetadata() {
108109
@Override
109110
@InternalApi
110111
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
112+
System.out.printf(
113+
"[%s][%s] Inside initiateStreaming ForwardingResultSet\n",
114+
OffsetDateTime.now(), Thread.currentThread().getName());
111115
return StreamingUtil.initiateStreaming(delegate.get(), streamMessageListener);
112116
}
113117
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.spanner.v1.PartialResultSet;
2626
import com.google.spanner.v1.ResultSetMetadata;
2727
import com.google.spanner.v1.ResultSetStats;
28+
import java.time.OffsetDateTime;
2829
import java.util.ArrayList;
2930
import java.util.Collections;
3031
import java.util.List;
@@ -80,6 +81,9 @@ public boolean next() throws SpannerException {
8081
}
8182
try {
8283
if (currRow == null) {
84+
System.out.printf(
85+
"[%s][%s] Inside GrpcResultSet next()\n",
86+
OffsetDateTime.now(), Thread.currentThread().getName());
8387
metadata = iterator.getMetadata();
8488
if (metadata.hasTransaction()) {
8589
listener.onTransactionMetadata(
@@ -128,6 +132,9 @@ public ResultSetMetadata getMetadata() {
128132
@Override
129133
@InternalApi
130134
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
135+
System.out.printf(
136+
"[%s][%s] Inside initiateStreaming GrpcResultSet\n",
137+
OffsetDateTime.now(), Thread.currentThread().getName());
131138
return iterator.initiateStreaming(streamMessageListener);
132139
}
133140

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@
3737
import io.grpc.Context;
3838
import io.opentelemetry.api.common.Attributes;
3939
import java.io.IOException;
40+
import java.time.OffsetDateTime;
4041
import java.util.LinkedList;
4142
import java.util.Objects;
4243
import java.util.Set;
4344
import java.util.concurrent.CountDownLatch;
4445
import java.util.concurrent.Executor;
4546
import java.util.concurrent.TimeUnit;
47+
import java.util.concurrent.atomic.AtomicReference;
4648
import java.util.logging.Level;
4749
import java.util.logging.Logger;
4850
import javax.annotation.Nullable;
@@ -65,11 +67,11 @@ abstract class ResumableStreamIterator extends AbstractIterator<PartialResultSet
6567
private static final Logger logger = Logger.getLogger(ResumableStreamIterator.class.getName());
6668
private BackOff backOff;
6769
private final LinkedList<PartialResultSet> buffer = new LinkedList<>();
68-
private final Object monitor = new Object();
70+
// private final Object monitor = new Object();
6971
private final int maxBufferSize;
7072
private final ISpan span;
7173
private final TraceWrapper tracer;
72-
private volatile CloseableIterator<PartialResultSet> stream;
74+
private AtomicReference<CloseableIterator<PartialResultSet>> streamCache = new AtomicReference<>();
7375
private ByteString resumeToken;
7476
private boolean finished;
7577
/**
@@ -212,16 +214,16 @@ boolean prepareIteratorForRetryOnDifferentGrpcChannel() {
212214

213215
@Override
214216
public void close(@Nullable String message) {
215-
if (stream != null) {
216-
stream.close(message);
217+
if (streamCache.get() != null) {
218+
streamCache.get().close(message);
217219
span.end();
218-
stream = null;
220+
streamCache.set(null);
219221
}
220222
}
221223

222224
@Override
223225
public boolean isWithBeginTransaction() {
224-
return stream != null && stream.isWithBeginTransaction();
226+
return streamCache.get() != null && streamCache.get().isWithBeginTransaction();
225227
}
226228

227229
@Override
@@ -245,8 +247,8 @@ protected PartialResultSet computeNext() {
245247
return buffer.pop();
246248
}
247249
try {
248-
if (stream.hasNext()) {
249-
PartialResultSet next = stream.next();
250+
if (streamCache.get().hasNext()) {
251+
PartialResultSet next = streamCache.get().next();
250252
boolean hasResumeToken = !next.getResumeToken().isEmpty();
251253
if (hasResumeToken) {
252254
resumeToken = next.getResumeToken();
@@ -280,7 +282,7 @@ protected PartialResultSet computeNext() {
280282
buffer.removeLast();
281283
}
282284
assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken);
283-
stream = null;
285+
streamCache.set(null);
284286
try (IScope s = tracer.withSpan(span)) {
285287
long delay = spannerException.getRetryDelayInMillis();
286288
if (delay != -1) {
@@ -301,7 +303,7 @@ protected PartialResultSet computeNext() {
301303
if (translated instanceof RetryOnDifferentGrpcChannelException) {
302304
if (++numAttemptsOnOtherChannel < errorHandler.getMaxAttempts()
303305
&& prepareIteratorForRetryOnDifferentGrpcChannel()) {
304-
stream = null;
306+
streamCache.set(null);
305307
continue;
306308
}
307309
}
@@ -318,19 +320,25 @@ && prepareIteratorForRetryOnDifferentGrpcChannel()) {
318320
}
319321

320322
private void startGrpcStreaming() {
321-
synchronized (monitor) {
322-
if (stream == null) {
323-
span.addAnnotation(
324-
"Starting/Resuming stream",
325-
"ResumeToken",
326-
resumeToken == null ? "null" : resumeToken.toStringUtf8());
327-
try (IScope scope = tracer.withSpan(span)) {
328-
// When start a new stream set the Span as current to make the gRPC Span a child of
329-
// this Span.
330-
stream = checkNotNull(startStream(resumeToken, streamMessageListener));
331-
}
323+
// synchronized (monitor) {
324+
System.out.printf(
325+
"[%s][%s] Inside startGrpcStreaming\n",
326+
OffsetDateTime.now(), Thread.currentThread().getName());
327+
if (streamCache.get() == null) {
328+
span.addAnnotation(
329+
"Starting/Resuming stream",
330+
"ResumeToken",
331+
resumeToken == null ? "null" : resumeToken.toStringUtf8());
332+
try (IScope scope = tracer.withSpan(span)) {
333+
// When start a new stream set the Span as current to make the gRPC Span a child of
334+
// this Span.
335+
System.out.printf(
336+
"[%s][%s] Inside creating stream\n",
337+
OffsetDateTime.now(), Thread.currentThread().getName());
338+
streamCache.set(checkNotNull(startStream(resumeToken, streamMessageListener)));
332339
}
333340
}
341+
// }
334342
}
335343

336344
boolean isRetryable(SpannerException spannerException) {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import io.grpc.Server;
4747
import io.grpc.inprocess.InProcessServerBuilder;
4848
import java.io.IOException;
49+
import java.time.OffsetDateTime;
4950
import java.util.ArrayList;
5051
import java.util.Arrays;
5152
import java.util.Collection;
@@ -60,7 +61,10 @@
6061
import org.junit.AfterClass;
6162
import org.junit.Before;
6263
import org.junit.BeforeClass;
64+
import org.junit.Rule;
6365
import org.junit.Test;
66+
import org.junit.rules.TestWatcher;
67+
import org.junit.runner.Description;
6468
import org.junit.runner.RunWith;
6569
import org.junit.runners.Parameterized;
6670
import org.junit.runners.Parameterized.Parameter;
@@ -75,6 +79,31 @@ public Long apply(StructReader input) {
7579
}
7680
}
7781

82+
@Rule
83+
public TestWatcher testWatcher =
84+
new TestWatcher() {
85+
@Override
86+
protected void succeeded(Description description) {
87+
System.out.printf(
88+
"[%s][%s] Succeeded test %s\n",
89+
OffsetDateTime.now(), Thread.currentThread().getName(), description.getMethodName());
90+
}
91+
92+
@Override
93+
protected void failed(Throwable e, Description description) {
94+
System.out.printf(
95+
"[%s][%s] Failed test %s\n",
96+
OffsetDateTime.now(), Thread.currentThread().getName(), description.getMethodName());
97+
}
98+
99+
@Override
100+
protected void starting(Description description) {
101+
System.out.printf(
102+
"[%s][%s] Starting test %s\n",
103+
OffsetDateTime.now(), Thread.currentThread().getName(), description.getMethodName());
104+
}
105+
};
106+
78107
private static final ToLongTransformer TO_LONG = new ToLongTransformer();
79108

80109
@Parameter(0)

0 commit comments

Comments
 (0)