Skip to content

Commit 82a05e3

Browse files
TheNeuralBitnielm
andauthored
[release-2.37.0][BEAM-14005] Fix ignored exception in BatchSpannerRead (#16969)
Failures to read from Spanner were ignored, and the "ok" serviceCallMwtric was updated before the read took place. Fix code and tests. Co-authored-by: Niel Markwick <nielm@users.noreply.github.com>
1 parent 304420a commit 82a05e3

File tree

2 files changed

+114
-16
lines changed

2 files changed

+114
-16
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,6 @@ public void processElement(ProcessContext c) throws Exception {
218218
BatchReadOnlyTransaction batchTx =
219219
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
220220

221-
serviceCallMetric.call("ok");
222221
Partition p = c.element();
223222
try (ResultSet resultSet = batchTx.execute(p)) {
224223
while (resultSet.next()) {
@@ -227,7 +226,9 @@ public void processElement(ProcessContext c) throws Exception {
227226
}
228227
} catch (SpannerException e) {
229228
serviceCallMetric.call(e.getErrorCode().getGrpcStatusCode().toString());
229+
throw (e);
230230
}
231+
serviceCallMetric.call("ok");
231232
}
232233

233234
private ServiceCallMetric createServiceCallMetric(

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java

Lines changed: 112 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@
3434
import com.google.cloud.spanner.Partition;
3535
import com.google.cloud.spanner.PartitionOptions;
3636
import com.google.cloud.spanner.ResultSets;
37+
import com.google.cloud.spanner.SpannerException;
3738
import com.google.cloud.spanner.SpannerExceptionFactory;
3839
import com.google.cloud.spanner.Statement;
3940
import com.google.cloud.spanner.Struct;
4041
import com.google.cloud.spanner.TimestampBound;
4142
import com.google.cloud.spanner.Type;
4243
import com.google.cloud.spanner.Value;
4344
import com.google.protobuf.ByteString;
45+
import io.grpc.Status.Code;
4446
import java.io.Serializable;
4547
import java.util.Arrays;
4648
import java.util.HashMap;
@@ -49,6 +51,7 @@
4951
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
5052
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
5153
import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
54+
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
5255
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.Read;
5356
import org.apache.beam.sdk.metrics.MetricsEnvironment;
5457
import org.apache.beam.sdk.testing.PAssert;
@@ -293,7 +296,7 @@ public void runReadWithPriority() throws Exception {
293296
}
294297

295298
@Test
296-
public void testQueryMetrics() throws Exception {
299+
public void testQueryMetricsFail() throws Exception {
297300
Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345);
298301
TimestampBound timestampBound = TimestampBound.ofReadTimestamp(timestamp);
299302

@@ -322,25 +325,74 @@ public void testQueryMetrics() throws Exception {
322325
any(PartitionOptions.class),
323326
eq(Statement.of("SELECT * FROM users")),
324327
any(ReadQueryUpdateTransactionOption.class)))
325-
.thenReturn(Arrays.asList(fakePartition, fakePartition));
328+
.thenReturn(Arrays.asList(fakePartition));
326329
when(mockBatchTx.execute(any(Partition.class)))
327330
.thenThrow(
328331
SpannerExceptionFactory.newSpannerException(
329-
ErrorCode.DEADLINE_EXCEEDED, "Simulated Timeout 1"))
330-
.thenThrow(
331-
SpannerExceptionFactory.newSpannerException(
332-
ErrorCode.DEADLINE_EXCEEDED, "Simulated Timeout 2"))
332+
ErrorCode.DEADLINE_EXCEEDED, "Simulated Timeout 1"));
333+
try {
334+
pipeline.run();
335+
} catch (PipelineExecutionException e) {
336+
if (e.getCause() instanceof SpannerException
337+
&& ((SpannerException) e.getCause()).getErrorCode().getGrpcStatusCode()
338+
== Code.DEADLINE_EXCEEDED) {
339+
// expected
340+
} else {
341+
throw e;
342+
}
343+
}
344+
verifyMetricWasSet("test", "aaa", "123", "deadline_exceeded", null, 1);
345+
verifyMetricWasSet("test", "aaa", "123", "ok", null, 0);
346+
}
347+
348+
@Test
349+
public void testQueryMetricsSucceed() throws Exception {
350+
Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345);
351+
TimestampBound timestampBound = TimestampBound.ofReadTimestamp(timestamp);
352+
353+
SpannerConfig spannerConfig = getSpannerConfig();
354+
355+
pipeline.apply(
356+
"read q",
357+
SpannerIO.read()
358+
.withSpannerConfig(spannerConfig)
359+
.withQuery("SELECT * FROM users")
360+
.withQueryName("queryName")
361+
.withTimestampBound(timestampBound));
362+
363+
FakeBatchTransactionId id = new FakeBatchTransactionId("runQueryTest");
364+
when(mockBatchTx.getBatchTransactionId()).thenReturn(id);
365+
366+
when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(timestampBound))
367+
.thenReturn(mockBatchTx);
368+
when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(any(BatchTransactionId.class)))
369+
.thenReturn(mockBatchTx);
370+
371+
Partition fakePartition =
372+
FakePartitionFactory.createFakeQueryPartition(ByteString.copyFromUtf8("one"));
373+
374+
when(mockBatchTx.partitionQuery(
375+
any(PartitionOptions.class),
376+
eq(Statement.of("SELECT * FROM users")),
377+
any(ReadQueryUpdateTransactionOption.class)))
378+
.thenReturn(Arrays.asList(fakePartition, fakePartition));
379+
when(mockBatchTx.execute(any(Partition.class)))
333380
.thenReturn(
334381
ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(0, 2)),
335-
ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 6)));
382+
ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 4)),
383+
ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(4, 6)))
384+
.thenReturn(
385+
ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(0, 2)),
386+
ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 4)),
387+
ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(4, 6)));
336388

337389
pipeline.run();
338-
verifyMetricWasSet("test", "aaa", "123", "deadline_exceeded", null, 2);
390+
verifyMetricWasSet("test", "aaa", "123", "deadline_exceeded", null, 0);
339391
verifyMetricWasSet("test", "aaa", "123", "ok", null, 2);
340392
}
341393

342394
@Test
343-
public void testReadMetrics() throws Exception {
395+
public void testReadMetricsFail() throws Exception {
344396
Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345);
345397
TimestampBound timestampBound = TimestampBound.ofReadTimestamp(timestamp);
346398

@@ -371,21 +423,66 @@ public void testReadMetrics() throws Exception {
371423
eq(KeySet.all()),
372424
eq(Arrays.asList("id", "name")),
373425
any(ReadQueryUpdateTransactionOption.class)))
374-
.thenReturn(Arrays.asList(fakePartition, fakePartition, fakePartition));
426+
.thenReturn(Arrays.asList(fakePartition));
375427
when(mockBatchTx.execute(any(Partition.class)))
376428
.thenThrow(
377429
SpannerExceptionFactory.newSpannerException(
378-
ErrorCode.DEADLINE_EXCEEDED, "Simulated Timeout 1"))
379-
.thenThrow(
380-
SpannerExceptionFactory.newSpannerException(
381-
ErrorCode.DEADLINE_EXCEEDED, "Simulated Timeout 2"))
430+
ErrorCode.DEADLINE_EXCEEDED, "Simulated Timeout 1"));
431+
try {
432+
pipeline.run();
433+
} catch (PipelineExecutionException e) {
434+
if (e.getCause() instanceof SpannerException
435+
&& ((SpannerException) e.getCause()).getErrorCode().getGrpcStatusCode()
436+
== Code.DEADLINE_EXCEEDED) {
437+
// expected
438+
} else {
439+
throw e;
440+
}
441+
}
442+
verifyMetricWasSet("test", "aaa", "123", "deadline_exceeded", null, 1);
443+
verifyMetricWasSet("test", "aaa", "123", "ok", null, 0);
444+
}
445+
446+
@Test
447+
public void testReadMetricsSucceed() throws Exception {
448+
Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345);
449+
TimestampBound timestampBound = TimestampBound.ofReadTimestamp(timestamp);
450+
451+
SpannerConfig spannerConfig = getSpannerConfig();
452+
453+
pipeline.apply(
454+
"read q",
455+
SpannerIO.read()
456+
.withSpannerConfig(spannerConfig)
457+
.withTable("users")
458+
.withColumns("id", "name")
459+
.withTimestampBound(timestampBound));
460+
461+
FakeBatchTransactionId id = new FakeBatchTransactionId("runReadTest");
462+
when(mockBatchTx.getBatchTransactionId()).thenReturn(id);
463+
464+
when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(timestampBound))
465+
.thenReturn(mockBatchTx);
466+
when(serviceFactory.mockBatchClient().batchReadOnlyTransaction(any(BatchTransactionId.class)))
467+
.thenReturn(mockBatchTx);
468+
469+
Partition fakePartition =
470+
FakePartitionFactory.createFakeReadPartition(ByteString.copyFromUtf8("one"));
471+
472+
when(mockBatchTx.partitionRead(
473+
any(PartitionOptions.class),
474+
eq("users"),
475+
eq(KeySet.all()),
476+
eq(Arrays.asList("id", "name")),
477+
any(ReadQueryUpdateTransactionOption.class)))
478+
.thenReturn(Arrays.asList(fakePartition, fakePartition, fakePartition));
479+
when(mockBatchTx.execute(any(Partition.class)))
382480
.thenReturn(
383481
ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(0, 2)),
384482
ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 4)),
385483
ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(4, 6)));
386484

387485
pipeline.run();
388-
verifyMetricWasSet("test", "aaa", "123", "deadline_exceeded", null, 2);
389486
verifyMetricWasSet("test", "aaa", "123", "ok", null, 3);
390487
}
391488

0 commit comments

Comments
 (0)