Skip to content

Commit e0cfeae

Browse files
fix(spanner): add logs
1 parent 0541692 commit e0cfeae

File tree

5 files changed

+73
-12
lines changed

5 files changed

+73
-12
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.google.spanner.v1.PartialResultSet;
3232
import com.google.spanner.v1.ResultSetMetadata;
3333
import com.google.spanner.v1.ResultSetStats;
34+
import java.time.OffsetDateTime;
3435
import java.util.Collection;
3536
import java.util.LinkedList;
3637
import java.util.List;
@@ -342,6 +343,9 @@ public void run() {
342343
boolean stop = false;
343344
boolean hasNext = false;
344345
try {
346+
System.out.printf(
347+
"[%s][%s] Inside ProduceRowsRunnable thread\n",
348+
OffsetDateTime.now(), Thread.currentThread().getName());
345349
hasNext = delegateResultSet.get().next();
346350
} catch (Throwable e) {
347351
synchronized (monitor) {
@@ -468,6 +472,9 @@ public void run() {
468472
// Those result sets will trigger initiateProduceRows() when the first results are received.
469473
// Non-streaming result sets do not trigger this callback, and for those result sets, we
470474
// need to eagerly start the ProduceRowsRunnable.
475+
System.out.printf(
476+
"[%s][%s] Inside InitiateStreamingRunnable thread\n",
477+
OffsetDateTime.now(), Thread.currentThread().getName());
471478
if (!initiateStreaming(AsyncResultSetImpl.this)) {
472479
initiateProduceRows();
473480
}
@@ -486,6 +493,8 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
486493
Preconditions.checkState(
487494
this.state == State.INITIALIZED, "callback may not be set multiple times");
488495

496+
System.out.printf(
497+
"[%s][%s] Inside callback\n", OffsetDateTime.now(), Thread.currentThread().getName());
489498
// Start to fetch data and buffer these.
490499
this.result = SettableApiFuture.create();
491500
this.state = State.STREAMING_INITIALIZED;
@@ -498,6 +507,9 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
498507
}
499508

500509
private void initiateProduceRows() {
510+
System.out.printf(
511+
"[%s][%s] Inside initiateProduceRows\n",
512+
OffsetDateTime.now(), Thread.currentThread().getName());
501513
synchronized (monitor) {
502514
if (this.state == State.STREAMING_INITIALIZED) {
503515
this.state = State.RUNNING;
@@ -639,6 +651,9 @@ public Struct getCurrentRowAsStruct() {
639651

640652
@Override
641653
public void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsFull) {
654+
System.out.printf(
655+
"[%s][%s] Inside onStreamMessage\n",
656+
OffsetDateTime.now(), Thread.currentThread().getName());
642657
synchronized (monitor) {
643658
if (produceRowsInitiated) {
644659
return;

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.common.base.Suppliers;
2323
import com.google.spanner.v1.ResultSetMetadata;
2424
import com.google.spanner.v1.ResultSetStats;
25+
import java.time.OffsetDateTime;
2526

2627
/** Forwarding implementation of ResultSet that forwards all calls to a delegate. */
2728
public class ForwardingResultSet extends ForwardingStructReader
@@ -108,6 +109,9 @@ public ResultSetMetadata getMetadata() {
108109
@Override
109110
@InternalApi
110111
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
112+
System.out.printf(
113+
"[%s][%s] Inside initiateStreaming ForwardingResultSet\n",
114+
OffsetDateTime.now(), Thread.currentThread().getName());
111115
return StreamingUtil.initiateStreaming(delegate.get(), streamMessageListener);
112116
}
113117
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.spanner.v1.PartialResultSet;
2626
import com.google.spanner.v1.ResultSetMetadata;
2727
import com.google.spanner.v1.ResultSetStats;
28+
import java.time.OffsetDateTime;
2829
import java.util.ArrayList;
2930
import java.util.Collections;
3031
import java.util.List;
@@ -80,6 +81,9 @@ public boolean next() throws SpannerException {
8081
}
8182
try {
8283
if (currRow == null) {
84+
System.out.printf(
85+
"[%s][%s] Inside GrpcResultSet next()\n",
86+
OffsetDateTime.now(), Thread.currentThread().getName());
8387
metadata = iterator.getMetadata();
8488
if (metadata.hasTransaction()) {
8589
listener.onTransactionMetadata(
@@ -128,6 +132,9 @@ public ResultSetMetadata getMetadata() {
128132
@Override
129133
@InternalApi
130134
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
135+
System.out.printf(
136+
"[%s][%s] Inside initiateStreaming GrpcResultSet\n",
137+
OffsetDateTime.now(), Thread.currentThread().getName());
131138
return iterator.initiateStreaming(streamMessageListener);
132139
}
133140

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import io.grpc.Context;
3838
import io.opentelemetry.api.common.Attributes;
3939
import java.io.IOException;
40+
import java.time.OffsetDateTime;
4041
import java.util.LinkedList;
4142
import java.util.Objects;
4243
import java.util.Set;
@@ -65,7 +66,7 @@ abstract class ResumableStreamIterator extends AbstractIterator<PartialResultSet
6566
private static final Logger logger = Logger.getLogger(ResumableStreamIterator.class.getName());
6667
private BackOff backOff;
6768
private final LinkedList<PartialResultSet> buffer = new LinkedList<>();
68-
private final Object monitor = new Object();
69+
// private final Object monitor = new Object();
6970
private final int maxBufferSize;
7071
private final ISpan span;
7172
private final TraceWrapper tracer;
@@ -318,19 +319,25 @@ && prepareIteratorForRetryOnDifferentGrpcChannel()) {
318319
}
319320

320321
private void startGrpcStreaming() {
321-
synchronized (monitor) {
322-
if (stream == null) {
323-
span.addAnnotation(
324-
"Starting/Resuming stream",
325-
"ResumeToken",
326-
resumeToken == null ? "null" : resumeToken.toStringUtf8());
327-
try (IScope scope = tracer.withSpan(span)) {
328-
// When start a new stream set the Span as current to make the gRPC Span a child of
329-
// this Span.
330-
stream = checkNotNull(startStream(resumeToken, streamMessageListener));
331-
}
322+
// synchronized (monitor) {
323+
System.out.printf(
324+
"[%s][%s] Inside startGrpcStreaming\n",
325+
OffsetDateTime.now(), Thread.currentThread().getName());
326+
if (stream == null) {
327+
span.addAnnotation(
328+
"Starting/Resuming stream",
329+
"ResumeToken",
330+
resumeToken == null ? "null" : resumeToken.toStringUtf8());
331+
try (IScope scope = tracer.withSpan(span)) {
332+
// When start a new stream set the Span as current to make the gRPC Span a child of
333+
// this Span.
334+
System.out.printf(
335+
"[%s][%s] Inside creating stream\n",
336+
OffsetDateTime.now(), Thread.currentThread().getName());
337+
stream = checkNotNull(startStream(resumeToken, streamMessageListener));
332338
}
333339
}
340+
// }
334341
}
335342

336343
boolean isRetryable(SpannerException spannerException) {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import io.grpc.Server;
4747
import io.grpc.inprocess.InProcessServerBuilder;
4848
import java.io.IOException;
49+
import java.time.OffsetDateTime;
4950
import java.util.ArrayList;
5051
import java.util.Arrays;
5152
import java.util.Collection;
@@ -60,7 +61,10 @@
6061
import org.junit.AfterClass;
6162
import org.junit.Before;
6263
import org.junit.BeforeClass;
64+
import org.junit.Rule;
6365
import org.junit.Test;
66+
import org.junit.rules.TestWatcher;
67+
import org.junit.runner.Description;
6468
import org.junit.runner.RunWith;
6569
import org.junit.runners.Parameterized;
6670
import org.junit.runners.Parameterized.Parameter;
@@ -75,6 +79,30 @@ public Long apply(StructReader input) {
7579
}
7680
}
7781

82+
@Rule
83+
public TestWatcher testWatcher = new TestWatcher() {
84+
@Override
85+
protected void succeeded(Description description) {
86+
System.out.printf(
87+
"[%s][%s] Succeeded test %s\n",
88+
OffsetDateTime.now(), Thread.currentThread().getName(), description.getMethodName());
89+
}
90+
91+
@Override
92+
protected void failed(Throwable e, Description description) {
93+
System.out.printf(
94+
"[%s][%s] Failed test %s\n",
95+
OffsetDateTime.now(), Thread.currentThread().getName(), description.getMethodName());
96+
}
97+
98+
@Override
99+
protected void starting(Description description) {
100+
System.out.printf(
101+
"[%s][%s] Starting test %s\n",
102+
OffsetDateTime.now(), Thread.currentThread().getName(), description.getMethodName());
103+
}
104+
};
105+
78106
private static final ToLongTransformer TO_LONG = new ToLongTransformer();
79107

80108
@Parameter(0)

0 commit comments

Comments
 (0)