Skip to content

Commit cb55d87

Browse files
scwhittlejrmccluskey
authored andcommitted
Remove unused IOException from Backoff and BackoffUtils. (#35445)
Update some now unnecessary call-site handling.
1 parent 7fdc941 commit cb55d87

File tree

14 files changed

+32
-79
lines changed

14 files changed

+32
-79
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,6 @@ public Boolean call() {
133133
}
134134
// Sleeping a while if there is a problem with the work, then go on with the next work.
135135
} while (success || BackOffUtils.next(sleeper, backOff));
136-
} catch (IOException e) { // Failure of BackOff.
137-
LOG.error("Already tried several attempts at working on tasks. Aborting.", e);
138136
} catch (InterruptedException e) {
139137
Thread.currentThread().interrupt();
140138
LOG.error("Interrupted during thread execution or sleep.", e);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,6 @@ private static Optional<StreamingConfigTask> fetchConfigWithRetry(
146146
if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
147147
return Optional.empty();
148148
}
149-
} catch (IOException ioe) {
150-
LOG.warn("Error backing off, will not retry: ", ioe);
151-
return Optional.empty();
152149
} catch (InterruptedException ie) {
153150
Thread.currentThread().interrupt();
154151
return Optional.empty();

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.beam.runners.dataflow.worker.windmill.client;
1919

2020
import com.google.errorprone.annotations.CanIgnoreReturnValue;
21-
import java.io.IOException;
2221
import java.io.PrintWriter;
2322
import java.util.Set;
2423
import java.util.concurrent.CountDownLatch;
@@ -211,8 +210,6 @@ private void startStream() {
211210
// Shutdown the stream to clean up any dangling resources and pending requests.
212211
shutdown();
213212
break;
214-
} catch (IOException ioe) {
215-
// Keep trying to create the stream.
216213
}
217214
}
218215
}
@@ -375,11 +372,7 @@ private class ResponseObserver implements StreamObserver<ResponseT> {
375372

376373
@Override
377374
public void onNext(ResponseT response) {
378-
try {
379-
backoff.reset();
380-
} catch (IOException e) {
381-
// Ignore.
382-
}
375+
backoff.reset();
383376
debugMetrics.recordResponse();
384377
onResponse(response);
385378
}
@@ -400,8 +393,6 @@ public void onError(Throwable t) {
400393
} catch (InterruptedException e) {
401394
Thread.currentThread().interrupt();
402395
return;
403-
} catch (IOException e) {
404-
// Ignore.
405396
}
406397

407398
executeSafely(AbstractWindmillStream.this::startStream);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels.localhostChannel;
2323

2424
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
25-
import java.io.IOException;
2625
import java.io.PrintWriter;
2726
import java.util.ArrayList;
2827
import java.util.HashSet;
@@ -291,10 +290,8 @@ private <ResponseT> ResponseT callWithBackoff(Supplier<ResponseT> function) {
291290
if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
292291
throw new WindmillRpcException(e);
293292
}
294-
} catch (IOException | InterruptedException i) {
295-
if (i instanceof InterruptedException) {
296-
Thread.currentThread().interrupt();
297-
}
293+
} catch (InterruptedException i) {
294+
Thread.currentThread().interrupt();
298295
WindmillRpcException rpcException = new WindmillRpcException(e);
299296
rpcException.addSuppressed(i);
300297
throw rpcException;

sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOff.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,17 @@
1717
*/
1818
package org.apache.beam.sdk.util;
1919

20-
import java.io.IOException;
20+
import org.apache.beam.sdk.annotations.Internal;
2121

22-
/**
23-
* Back-off policy when retrying an operation.
24-
*
25-
* <p><b>Note</b>: This interface is copied from Google API client library to avoid its dependency.
26-
*/
22+
/** Back-off policy when retrying an operation. */
23+
@Internal
2724
public interface BackOff {
2825

2926
/** Indicates that no more retries should be made for use in {@link #nextBackOffMillis()}. */
3027
long STOP = -1L;
3128

3229
/** Reset to initial state. */
33-
void reset() throws IOException;
30+
void reset();
3431

3532
/**
3633
* Gets the number of milliseconds to wait before retrying the operation or {@link #STOP} to
@@ -47,7 +44,7 @@ public interface BackOff {
4744
* }
4845
* </pre>
4946
*/
50-
long nextBackOffMillis() throws IOException;
47+
long nextBackOffMillis();
5148

5249
/**
5350
* Fixed back-off policy whose back-off time is always zero, meaning that the operation is retried
@@ -57,10 +54,10 @@ public interface BackOff {
5754
new BackOff() {
5855

5956
@Override
60-
public void reset() throws IOException {}
57+
public void reset() {}
6158

6259
@Override
63-
public long nextBackOffMillis() throws IOException {
60+
public long nextBackOffMillis() {
6461
return 0;
6562
}
6663
};
@@ -73,10 +70,10 @@ public long nextBackOffMillis() throws IOException {
7370
new BackOff() {
7471

7572
@Override
76-
public void reset() throws IOException {}
73+
public void reset() {}
7774

7875
@Override
79-
public long nextBackOffMillis() throws IOException {
76+
public long nextBackOffMillis() {
8077
return STOP;
8178
}
8279
};

sdks/java/core/src/main/java/org/apache/beam/sdk/util/BackOffUtils.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,10 @@
1717
*/
1818
package org.apache.beam.sdk.util;
1919

20-
import java.io.IOException;
20+
import org.apache.beam.sdk.annotations.Internal;
2121

22-
/**
23-
* Utilities for {@link BackOff}.
24-
*
25-
* <p><b>Note</b>: This is copied from Google API client library to avoid its dependency.
26-
*/
22+
/** Utilities for {@link BackOff}. */
23+
@Internal
2724
public final class BackOffUtils {
2825

2926
/**
@@ -39,8 +36,7 @@ public final class BackOffUtils {
3936
* BackOff#nextBackOffMillis()} did not return {@link BackOff#STOP}
4037
* @throws InterruptedException if any thread has interrupted the current thread
4138
*/
42-
public static boolean next(Sleeper sleeper, BackOff backOff)
43-
throws InterruptedException, IOException {
39+
public static boolean next(Sleeper sleeper, BackOff backOff) throws InterruptedException {
4440
long backOffTime = backOff.nextBackOffMillis();
4541
if (backOffTime == BackOff.STOP) {
4642
return false;

sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/RateLimitPolicyFactory.java

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818
package org.apache.beam.sdk.io.aws2.kinesis;
1919

20-
import java.io.IOException;
2120
import java.io.Serializable;
2221
import java.util.List;
2322
import java.util.function.Supplier;
@@ -27,8 +26,6 @@
2726
import org.apache.beam.sdk.util.Sleeper;
2827
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
2928
import org.joda.time.Duration;
30-
import org.slf4j.Logger;
31-
import org.slf4j.LoggerFactory;
3229

3330
/**
3431
* Implement this interface to create a {@code RateLimitPolicy}. Used to create a rate limiter for
@@ -91,7 +88,6 @@ public void onSuccess(List<KinesisRecord> records) throws InterruptedException {
9188
* response is empty or if the consumer is throttled by AWS.
9289
*/
9390
class DefaultRateLimiter implements RateLimitPolicy {
94-
private static final Logger LOG = LoggerFactory.getLogger(DefaultRateLimiter.class);
9591
private final Sleeper sleeper;
9692
private final BackOff emptySuccess;
9793
private final BackOff throttled;
@@ -122,25 +118,17 @@ public DefaultRateLimiter(
122118

123119
@Override
124120
public void onSuccess(List<KinesisRecord> records) throws InterruptedException {
125-
try {
126-
if (records.isEmpty()) {
127-
BackOffUtils.next(sleeper, emptySuccess);
128-
} else {
129-
emptySuccess.reset();
130-
}
131-
throttled.reset();
132-
} catch (IOException e) {
133-
LOG.warn("Error applying onSuccess rate limit policy", e);
121+
if (records.isEmpty()) {
122+
BackOffUtils.next(sleeper, emptySuccess);
123+
} else {
124+
emptySuccess.reset();
134125
}
126+
throttled.reset();
135127
}
136128

137129
@Override
138130
public void onThrottle(KinesisClientThrottledException e) throws InterruptedException {
139-
try {
140-
BackOffUtils.next(sleeper, throttled);
141-
} catch (IOException ioe) {
142-
LOG.warn("Error applying onThrottle rate limit policy", e);
143-
}
131+
BackOffUtils.next(sleeper, throttled);
144132
}
145133
}
146134
}

sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/ShardListingUtils.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ static List<Shard> listShardsAtPoint(
6060

6161
static ShardFilter buildShardFilterForStartingPoint(
6262
KinesisClient kinesisClient, String streamName, StartingPoint startingPoint)
63-
throws IOException, InterruptedException {
63+
throws InterruptedException {
6464
InitialPositionInStream position = startingPoint.getPosition();
6565
switch (position) {
6666
case LATEST:
@@ -78,7 +78,7 @@ static ShardFilter buildShardFilterForStartingPoint(
7878

7979
private static ShardFilter buildShardFilterForTimestamp(
8080
KinesisClient kinesisClient, String streamName, Instant startingPointTimestamp)
81-
throws IOException, InterruptedException {
81+
throws InterruptedException {
8282
StreamDescriptionSummary streamDescription = describeStreamSummary(kinesisClient, streamName);
8383

8484
Instant streamCreationTimestamp = TimeUtil.toJoda(streamDescription.streamCreationTimestamp());
@@ -103,8 +103,7 @@ private static ShardFilter buildShardFilterForTimestamp(
103103
}
104104

105105
private static StreamDescriptionSummary describeStreamSummary(
106-
KinesisClient kinesisClient, final String streamName)
107-
throws IOException, InterruptedException {
106+
KinesisClient kinesisClient, final String streamName) throws InterruptedException {
108107
// DescribeStreamSummary has limits that can be hit fairly easily if we are attempting
109108
// to configure multiple KinesisIO inputs in the same account. Retry up to
110109
// DESCRIBE_STREAM_SUMMARY_MAX_ATTEMPTS times if we end up hitting that limit.

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ void close() throws Exception {
293293
}
294294

295295
// commit the list of entities to datastore
296-
private void flushBatch() throws DatastoreException, IOException, InterruptedException {
296+
private void flushBatch() throws DatastoreException, InterruptedException {
297297
LOG.info("Writing batch of {} entities", entities.size());
298298
Sleeper sleeper = Sleeper.DEFAULT;
299299
BackOff backoff =

sdks/java/io/influxdb/src/test/java/org/apache/beam/sdk/io/influxdb/InfluxDbIOIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public void clear() {
9595
}
9696

9797
@Before
98-
public void initTest() throws IOException, InterruptedException {
98+
public void initTest() throws InterruptedException {
9999
BackOff backOff = FluentBackoff.DEFAULT.withMaxRetries(4).backoff();
100100
Query createQuery = new Query(String.format("CREATE DATABASE %s", options.getDatabaseName()));
101101
try (InfluxDB connection =

0 commit comments

Comments
 (0)