Skip to content

Commit 389a830

Browse files
Vizeraiolavloite
andauthored
test: Add ResumableStreamIterator tests for RESOURCE_EXHAUSTED error code (#3039)
* feat: Add ResumableStreamIterator tests for RESOURCE_EXHAUSTED error code. * chore: simplify test a bit * test: add test to Gax retryer test Add test cases to Gax retryer test file to ensure that only RESOURCE_EXHAUSTED errors with RetryInfo are actually retried end-to-end by the Spanner client library and the Gax configuration. --------- Co-authored-by: Knut Olav Løite <[email protected]>
1 parent 9fea7a3 commit 389a830

File tree

2 files changed

+72
-14
lines changed

2 files changed

+72
-14
lines changed

google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.api.client.util.BackOff;
2727
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
2828
import com.google.common.collect.AbstractIterator;
29+
import com.google.common.collect.ImmutableList;
2930
import com.google.common.collect.Lists;
3031
import com.google.protobuf.ByteString;
3132
import com.google.protobuf.Duration;
@@ -52,11 +53,13 @@
5253
import org.junit.Before;
5354
import org.junit.Test;
5455
import org.junit.runner.RunWith;
55-
import org.junit.runners.JUnit4;
56+
import org.junit.runners.Parameterized;
57+
import org.junit.runners.Parameterized.Parameter;
58+
import org.junit.runners.Parameterized.Parameters;
5659
import org.mockito.Mockito;
5760

5861
/** Unit tests for {@link ResumableStreamIterator}. */
59-
@RunWith(JUnit4.class)
62+
@RunWith(Parameterized.class)
6063
public class ResumableStreamIteratorTest {
6164
interface Starter {
6265
AbstractResultSet.CloseableIterator<PartialResultSet> startStream(
@@ -69,6 +72,14 @@ interface ResultSetStream {
6972
void close();
7073
}
7174

75+
@Parameter(0)
76+
public ErrorCode errorCodeParameter;
77+
78+
@Parameters(name = "errorCodeParameter = {0}")
79+
public static List<ErrorCode> data() {
80+
return ImmutableList.of(ErrorCode.UNAVAILABLE, ErrorCode.RESOURCE_EXHAUSTED);
81+
}
82+
7283
private static StatusRuntimeException statusWithRetryInfo(ErrorCode code) {
7384
Metadata.Key<RetryInfo> key = ProtoUtils.keyForProto(RetryInfo.getDefaultInstance());
7485
Metadata trailers = new Metadata();
@@ -223,7 +234,7 @@ public void restart() {
223234
Mockito.when(s1.next())
224235
.thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
225236
.thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b"))
226-
.thenThrow(new RetryableException(ErrorCode.UNAVAILABLE, "failed by test"));
237+
.thenThrow(new RetryableException(errorCodeParameter, "failed by test"));
227238

228239
ResultSetStream s2 = Mockito.mock(ResultSetStream.class);
229240
Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2")))
@@ -244,7 +255,7 @@ public void restartWithHoldBack() {
244255
.thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b"))
245256
.thenReturn(resultSet(null, "X"))
246257
.thenReturn(resultSet(null, "X"))
247-
.thenThrow(new RetryableException(ErrorCode.UNAVAILABLE, "failed by test"));
258+
.thenThrow(new RetryableException(errorCodeParameter, "failed by test"));
248259

249260
ResultSetStream s2 = Mockito.mock(ResultSetStream.class);
250261
Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2")))
@@ -265,7 +276,7 @@ public void restartWithHoldBackMidStream() {
265276
.thenReturn(resultSet(null, "b"))
266277
.thenReturn(resultSet(null, "c"))
267278
.thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "d"))
268-
.thenThrow(new RetryableException(ErrorCode.UNAVAILABLE, "failed by test"));
279+
.thenThrow(new RetryableException(errorCodeParameter, "failed by test"));
269280

270281
ResultSetStream s2 = Mockito.mock(ResultSetStream.class);
271282
Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2")))
@@ -360,7 +371,7 @@ public void bufferLimitRestart() {
360371
Mockito.when(s1.next())
361372
.thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
362373
.thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b"))
363-
.thenThrow(new RetryableException(ErrorCode.UNAVAILABLE, "failed by test"));
374+
.thenThrow(new RetryableException(errorCodeParameter, "failed by test"));
364375

365376
ResultSetStream s2 = Mockito.mock(ResultSetStream.class);
366377
Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2")))
@@ -380,7 +391,7 @@ public void bufferLimitRestartWithinLimitAtStartOfResults() {
380391
Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1));
381392
Mockito.when(s1.next())
382393
.thenReturn(resultSet(null, "XXXXXX"))
383-
.thenThrow(new RetryableException(ErrorCode.UNAVAILABLE, "failed by test"));
394+
.thenThrow(new RetryableException(errorCodeParameter, "failed by test"));
384395

385396
ResultSetStream s2 = Mockito.mock(ResultSetStream.class);
386397
Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s2));
@@ -400,7 +411,7 @@ public void bufferLimitRestartWithinLimitMidResults() {
400411
Mockito.when(s1.next())
401412
.thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
402413
.thenReturn(resultSet(null, "XXXXXX"))
403-
.thenThrow(new RetryableException(ErrorCode.UNAVAILABLE, "failed by test"));
414+
.thenThrow(new RetryableException(errorCodeParameter, "failed by test"));
404415

405416
ResultSetStream s2 = Mockito.mock(ResultSetStream.class);
406417
Mockito.when(starter.startStream(ByteString.copyFromUtf8("r1")))
@@ -422,11 +433,11 @@ public void bufferLimitMissingTokensUnsafeToRetry() {
422433
.thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
423434
.thenReturn(resultSet(null, "b"))
424435
.thenReturn(resultSet(null, "c"))
425-
.thenThrow(new RetryableException(ErrorCode.UNAVAILABLE, "failed by test"));
436+
.thenThrow(new RetryableException(errorCodeParameter, "failed by test"));
426437

427438
assertThat(consumeAtMost(3, resumableStreamIterator)).containsExactly("a", "b", "c").inOrder();
428439
SpannerException e = assertThrows(SpannerException.class, () -> resumableStreamIterator.next());
429-
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.UNAVAILABLE);
440+
assertThat(e.getErrorCode()).isEqualTo(errorCodeParameter);
430441
}
431442

432443
@Test
@@ -439,7 +450,7 @@ public void bufferLimitMissingTokensSafeToRetry() {
439450
.thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a"))
440451
.thenReturn(resultSet(null, "b"))
441452
.thenReturn(resultSet(ByteString.copyFromUtf8("r3"), "c"))
442-
.thenThrow(new RetryableException(ErrorCode.UNAVAILABLE, "failed by test"));
453+
.thenThrow(new RetryableException(errorCodeParameter, "failed by test"));
443454

444455
ResultSetStream s2 = Mockito.mock(ResultSetStream.class);
445456
Mockito.when(starter.startStream(ByteString.copyFromUtf8("r3")))

google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerGaxRetryTest.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,28 @@
2121
import static org.hamcrest.MatcherAssert.assertThat;
2222
import static org.junit.Assert.assertEquals;
2323
import static org.junit.Assert.assertThrows;
24+
import static org.junit.Assert.assertTrue;
2425

2526
import com.google.api.gax.grpc.testing.LocalChannelProvider;
2627
import com.google.api.gax.retrying.RetrySettings;
2728
import com.google.cloud.NoCredentials;
2829
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
2930
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
3031
import com.google.protobuf.ListValue;
32+
import com.google.rpc.RetryInfo;
3133
import com.google.spanner.v1.ResultSetMetadata;
3234
import com.google.spanner.v1.StructType;
3335
import com.google.spanner.v1.StructType.Field;
3436
import com.google.spanner.v1.TypeCode;
37+
import io.grpc.Metadata;
3538
import io.grpc.Server;
39+
import io.grpc.Status;
3640
import io.grpc.StatusRuntimeException;
3741
import io.grpc.inprocess.InProcessServerBuilder;
42+
import io.grpc.protobuf.ProtoUtils;
3843
import java.io.IOException;
3944
import java.util.concurrent.ScheduledThreadPoolExecutor;
45+
import java.util.concurrent.TimeUnit;
4046
import java.util.concurrent.atomic.AtomicInteger;
4147
import org.junit.After;
4248
import org.junit.AfterClass;
@@ -84,6 +90,14 @@ public class SpannerGaxRetryTest {
8490
SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0);
8591
private static final StatusRuntimeException UNAVAILABLE =
8692
io.grpc.Status.UNAVAILABLE.withDescription("Retryable test exception.").asRuntimeException();
93+
private static final StatusRuntimeException RESOURCE_EXHAUSTED_NON_RETRYABLE =
94+
Status.RESOURCE_EXHAUSTED
95+
.withDescription("Non-retryable test exception.")
96+
.asRuntimeException();
97+
private static final StatusRuntimeException RESOURCE_EXHAUSTED_RETRYABLE =
98+
Status.RESOURCE_EXHAUSTED
99+
.withDescription("Retryable test exception.")
100+
.asRuntimeException(createRetryInfo());
87101
private static final StatusRuntimeException FAILED_PRECONDITION =
88102
io.grpc.Status.FAILED_PRECONDITION
89103
.withDescription("Non-retryable test exception.")
@@ -192,6 +206,19 @@ public void tearDown() {
192206
spanner.close();
193207
}
194208

209+
static Metadata createRetryInfo() {
210+
Metadata trailers = new Metadata();
211+
RetryInfo retryInfo =
212+
RetryInfo.newBuilder()
213+
.setRetryDelay(
214+
com.google.protobuf.Duration.newBuilder()
215+
.setNanos((int) TimeUnit.MILLISECONDS.toNanos(1L))
216+
.setSeconds(0L))
217+
.build();
218+
trailers.put(ProtoUtils.keyForProto(RetryInfo.getDefaultInstance()), retryInfo);
219+
return trailers;
220+
}
221+
195222
private void warmUpSessionPool(DatabaseClient client) {
196223
for (int i = 0; i < 10; i++) {
197224
int retryCount = 0;
@@ -229,9 +256,29 @@ public void singleUseTimeout() {
229256

230257
@Test
231258
public void singleUseUnavailable() {
232-
mockSpanner.addException(UNAVAILABLE);
233-
try (ResultSet rs = client.singleUse().executeQuery(SELECT1AND2)) {
234-
while (rs.next()) {}
259+
mockSpanner.setExecuteStreamingSqlExecutionTime(
260+
SimulatedExecutionTime.ofException(UNAVAILABLE));
261+
try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1AND2)) {
262+
assertTrue(resultSet.next());
263+
}
264+
}
265+
266+
@Test
267+
public void singleUseResourceExhausted_nonRetryable() {
268+
mockSpanner.setExecuteStreamingSqlExecutionTime(
269+
SimulatedExecutionTime.ofException(RESOURCE_EXHAUSTED_NON_RETRYABLE));
270+
try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1AND2)) {
271+
SpannerException exception = assertThrows(SpannerException.class, resultSet::next);
272+
assertEquals(ErrorCode.RESOURCE_EXHAUSTED, exception.getErrorCode());
273+
}
274+
}
275+
276+
@Test
277+
public void singleUseResourceExhausted_retryable() {
278+
mockSpanner.setExecuteStreamingSqlExecutionTime(
279+
SimulatedExecutionTime.ofException(RESOURCE_EXHAUSTED_RETRYABLE));
280+
try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1AND2)) {
281+
assertTrue(resultSet.next());
235282
}
236283
}
237284

0 commit comments

Comments
 (0)