Skip to content

Commit 97529b3

Browse files
committed
feat: RetryHelper to report all previous exceptions
1 parent fa0a12e commit 97529b3

File tree

2 files changed

+232
-9
lines changed

2 files changed

+232
-9
lines changed

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryRetryHelper.java

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import io.opentelemetry.api.trace.Tracer;
3030
import io.opentelemetry.context.Scope;
3131
import java.io.IOException;
32+
import java.util.ArrayList;
33+
import java.util.List;
3234
import java.util.concurrent.Callable;
3335
import java.util.concurrent.ExecutionException;
3436
import java.util.logging.Level;
@@ -54,6 +56,8 @@ public static <V> V runWithRetries(
5456
.spanBuilder("com.google.cloud.bigquery.BigQueryRetryHelper.runWithRetries")
5557
.startSpan();
5658
}
59+
final List<Throwable> attemptFailures = new ArrayList<>();
60+
5761
try (Scope runWithRetriesScope = runWithRetries != null ? runWithRetries.makeCurrent() : null) {
5862
// Suppressing should be ok as a workaraund. Current and only ResultRetryAlgorithm
5963
// implementation does not use response at all, so ignoring its type is ok.
@@ -63,14 +67,29 @@ public static <V> V runWithRetries(
6367
callable,
6468
new ExponentialRetryAlgorithm(retrySettings, clock),
6569
algorithm,
66-
bigQueryRetryConfig);
70+
bigQueryRetryConfig,
71+
attemptFailures);
72+
6773
} catch (Exception e) {
68-
// Checks for IOException and translate it into BigQueryException. The BigQueryException
69-
// constructor parses the IOException and translate it into internal code.
70-
if (e.getCause() instanceof IOException) {
71-
throw new BigQueryRetryHelperException(new BigQueryException((IOException) e.getCause()));
74+
Throwable cause = e.getCause() != null ? e.getCause() : e;
75+
76+
// Attach previous retry failures (the terminal cause is not added to its own suppressed list).
77+
for (Throwable prev : attemptFailures) {
78+
if (prev != cause) {
79+
cause.addSuppressed(prev);
80+
}
81+
}
82+
83+
if (cause instanceof IOException) {
84+
BigQueryException bq = new BigQueryException((IOException) cause);
85+
// Preserve suppressed info after wrapping.
86+
for (Throwable s : cause.getSuppressed()) {
87+
bq.addSuppressed(s);
88+
}
89+
throw new BigQueryRetryHelperException(bq);
7290
}
73-
throw new BigQueryRetryHelperException(e.getCause());
91+
92+
throw new BigQueryRetryHelperException(cause);
7493
} finally {
7594
if (runWithRetries != null) {
7695
runWithRetries.end();
@@ -82,7 +101,8 @@ private static <V> V run(
82101
Callable<V> callable,
83102
TimedRetryAlgorithm timedAlgorithm,
84103
ResultRetryAlgorithm<V> resultAlgorithm,
85-
BigQueryRetryConfig bigQueryRetryConfig)
104+
BigQueryRetryConfig bigQueryRetryConfig,
105+
List<Throwable> attemptFailures)
86106
throws ExecutionException, InterruptedException {
87107
RetryAlgorithm<V> retryAlgorithm =
88108
new BigQueryRetryAlgorithm<>(
@@ -93,7 +113,16 @@ private static <V> V run(
93113
// BigQueryRetryAlgorithm retries considering bigQueryRetryConfig
94114
RetryingExecutor<V> executor = new DirectRetryingExecutor<>(retryAlgorithm);
95115

96-
// Log retry info
116+
Callable<V> recordingCallable =
117+
() -> {
118+
try {
119+
return callable.call();
120+
} catch (Throwable t) {
121+
attemptFailures.add(t);
122+
throw t;
123+
}
124+
};
125+
97126
if (LOG.isLoggable(Level.FINEST)) {
98127
LOG.log(
99128
Level.FINEST,
@@ -104,7 +133,7 @@ private static <V> V run(
104133
});
105134
}
106135

107-
RetryingFuture<V> retryingFuture = executor.createFuture(callable);
136+
RetryingFuture<V> retryingFuture = executor.createFuture(recordingCallable);
108137
executor.submit(retryingFuture);
109138
return retryingFuture.get();
110139
}
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.bigquery;
18+
19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertNotNull;
21+
import static org.junit.Assert.assertSame;
22+
import static org.junit.Assert.fail;
23+
24+
import com.google.api.core.ApiClock;
25+
import com.google.api.core.NanoClock;
26+
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
27+
import com.google.api.gax.retrying.ResultRetryAlgorithm;
28+
import com.google.api.gax.retrying.RetrySettings;
29+
import io.opentelemetry.api.trace.Tracer;
30+
import java.util.concurrent.Callable;
31+
import java.util.concurrent.atomic.AtomicInteger;
32+
import org.junit.Test;
33+
import org.threeten.bp.Duration;
34+
35+
public class BigQueryRetryHelperTest {
36+
37+
private static final ApiClock CLOCK = NanoClock.getDefaultClock();
38+
39+
@Test
40+
public void runWithRetries_happyPath_noRetries_success() {
41+
AtomicInteger calls = new AtomicInteger(0);
42+
43+
Callable<String> ok =
44+
() -> {
45+
calls.incrementAndGet();
46+
return "OK";
47+
};
48+
49+
String result =
50+
BigQueryRetryHelper.runWithRetries(
51+
ok,
52+
retrySettingsMaxAttempts(3),
53+
retryAlgorithm(),
54+
CLOCK,
55+
defaultRetryConfig(),
56+
/* isOpenTelemetryEnabled= */ false,
57+
/* openTelemetryTracer= */ (Tracer) null);
58+
59+
assertEquals("OK", result);
60+
assertEquals("Callable should be invoked exactly once", 1, calls.get());
61+
}
62+
63+
@Test
64+
public void runWithRetries_oneFail_thenSuccess_succeeds() {
65+
AtomicInteger calls = new AtomicInteger(0);
66+
67+
RuntimeException first = new RuntimeException("A");
68+
69+
Callable<String> flaky =
70+
() -> {
71+
int n = calls.incrementAndGet();
72+
if (n == 1) {
73+
throw first;
74+
}
75+
return "OK";
76+
};
77+
78+
String result =
79+
BigQueryRetryHelper.runWithRetries(
80+
flaky,
81+
retrySettingsMaxAttempts(3),
82+
retryAlgorithm(),
83+
CLOCK,
84+
defaultRetryConfig(),
85+
/* isOpenTelemetryEnabled= */ false,
86+
/* openTelemetryTracer= */ null);
87+
88+
assertEquals("OK", result);
89+
assertEquals("Expected exactly 2 calls (1 fail + 1 success)", 2, calls.get());
90+
}
91+
92+
@Test
93+
public void runWithRetries_twoFails_thenSuccess_succeedsWithinThreshold() {
94+
AtomicInteger calls = new AtomicInteger(0);
95+
96+
RuntimeException exA = new RuntimeException("A");
97+
RuntimeException exB = new RuntimeException("B");
98+
99+
Callable<String> flaky =
100+
() -> {
101+
int n = calls.incrementAndGet();
102+
if (n == 1) {
103+
throw exA;
104+
}
105+
if (n == 2) {
106+
throw exB;
107+
}
108+
return "OK";
109+
};
110+
111+
String result =
112+
BigQueryRetryHelper.runWithRetries(
113+
flaky,
114+
retrySettingsMaxAttempts(3),
115+
retryAlgorithm(),
116+
CLOCK,
117+
defaultRetryConfig(),
118+
/* isOpenTelemetryEnabled= */ false,
119+
/* openTelemetryTracer= */ null);
120+
121+
assertEquals("OK", result);
122+
assertEquals("Expected 3 calls (A fail, B fail, then success)", 3, calls.get());
123+
}
124+
125+
@Test
126+
public void runWithRetries_threeFails_threshold3_throws_withSuppressedHistory() {
127+
AtomicInteger calls = new AtomicInteger(0);
128+
129+
RuntimeException exA = new RuntimeException("A");
130+
RuntimeException exB = new RuntimeException("B");
131+
RuntimeException exC = new RuntimeException("C");
132+
133+
Callable<String> alwaysFail3Times =
134+
() -> {
135+
int n = calls.incrementAndGet();
136+
if (n == 1) {
137+
throw exA;
138+
}
139+
if (n == 2) {
140+
throw exB;
141+
}
142+
throw exC; // 3rd attempt fails and should be terminal at maxAttempts=3
143+
};
144+
145+
try {
146+
BigQueryRetryHelper.runWithRetries(
147+
alwaysFail3Times,
148+
retrySettingsMaxAttempts(3),
149+
retryAlgorithm(),
150+
CLOCK,
151+
defaultRetryConfig(),
152+
/* isOpenTelemetryEnabled= */ false,
153+
/* openTelemetryTracer= */ null);
154+
fail("Expected BigQueryRetryHelperException");
155+
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
156+
assertEquals("Expected exactly 3 attempts", 3, calls.get());
157+
158+
Throwable terminal = e.getCause();
159+
assertNotNull(terminal);
160+
161+
// Terminal cause should be exactly Exception C (identity check).
162+
assertSame(exC, terminal);
163+
164+
// Suppressed should contain exactly A and B (identity + order).
165+
Throwable[] suppressed = terminal.getSuppressed();
166+
assertEquals("Expected 2 suppressed exceptions (A,B)", 2, suppressed.length);
167+
assertSame(exA, suppressed[0]);
168+
assertSame(exB, suppressed[1]);
169+
}
170+
}
171+
172+
private RetrySettings retrySettingsMaxAttempts(int maxAttempts) {
173+
// Keep delays tiny so tests run fast.
174+
return RetrySettings.newBuilder()
175+
.setMaxAttempts(maxAttempts)
176+
.setInitialRetryDelay(Duration.ofMillis(1))
177+
.setRetryDelayMultiplier(1.0)
178+
.setMaxRetryDelay(Duration.ofMillis(5))
179+
.setInitialRpcTimeout(Duration.ofMillis(50))
180+
.setRpcTimeoutMultiplier(1.0)
181+
.setMaxRpcTimeout(Duration.ofMillis(50))
182+
.setTotalTimeout(Duration.ofSeconds(2))
183+
.build();
184+
}
185+
186+
private BigQueryRetryConfig defaultRetryConfig() {
187+
return BigQueryRetryConfig.newBuilder().build();
188+
}
189+
190+
@SuppressWarnings("unchecked")
191+
private <V> ResultRetryAlgorithm<V> retryAlgorithm() {
192+
return new BasicResultRetryAlgorithm<>();
193+
}
194+
}

0 commit comments

Comments
 (0)