Skip to content

Commit 59ad255

Browse files
committed
[core] Introduce RetryWaiter to simplify FileStoreCommitImpl
1 parent d5546ba commit 59ad255

File tree

2 files changed

+51
-21
lines changed

2 files changed

+51
-21
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.utils;
20+
21+
import java.util.concurrent.ThreadLocalRandom;
22+
import java.util.concurrent.TimeUnit;
23+
24+
/** A waiter for retry. */
25+
public class RetryWaiter {
26+
27+
private final long minRetryWait;
28+
private final long maxRetryWait;
29+
30+
public RetryWaiter(long minRetryWait, long maxRetryWait) {
31+
this.minRetryWait = minRetryWait;
32+
this.maxRetryWait = maxRetryWait;
33+
}
34+
35+
public void retryWait(int retryCount) {
36+
int retryWait = (int) Math.min(minRetryWait * Math.pow(2, retryCount), maxRetryWait);
37+
ThreadLocalRandom random = ThreadLocalRandom.current();
38+
retryWait += random.nextInt(Math.max(1, (int) (retryWait * 0.2)));
39+
try {
40+
TimeUnit.MILLISECONDS.sleep(retryWait);
41+
} catch (InterruptedException ie) {
42+
Thread.currentThread().interrupt();
43+
throw new RuntimeException(ie);
44+
}
45+
}
46+
}

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.apache.paimon.utils.IOUtils;
7272
import org.apache.paimon.utils.InternalRowPartitionComputer;
7373
import org.apache.paimon.utils.Pair;
74+
import org.apache.paimon.utils.RetryWaiter;
7475
import org.apache.paimon.utils.SnapshotManager;
7576

7677
import org.slf4j.Logger;
@@ -88,8 +89,6 @@
8889
import java.util.Objects;
8990
import java.util.Optional;
9091
import java.util.Set;
91-
import java.util.concurrent.ThreadLocalRandom;
92-
import java.util.concurrent.TimeUnit;
9392
import java.util.stream.Collectors;
9493

9594
import static java.util.Collections.emptyList;
@@ -152,8 +151,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
152151
private final StatsFileHandler statsFileHandler;
153152
private final BucketMode bucketMode;
154153
private final long commitTimeout;
155-
private final long commitMinRetryWait;
156-
private final long commitMaxRetryWait;
154+
private final RetryWaiter retryWaiter;
157155
private final int commitMaxRetries;
158156
private final InternalRowPartitionComputer partitionComputer;
159157
private final boolean rowTrackingEnabled;
@@ -223,8 +221,7 @@ public FileStoreCommitImpl(
223221
this.commitCallbacks = commitCallbacks;
224222
this.commitMaxRetries = commitMaxRetries;
225223
this.commitTimeout = commitTimeout;
226-
this.commitMinRetryWait = commitMinRetryWait;
227-
this.commitMaxRetryWait = commitMaxRetryWait;
224+
this.retryWaiter = new RetryWaiter(commitMinRetryWait, commitMaxRetryWait);
228225
this.partitionComputer =
229226
new InternalRowPartitionComputer(
230227
options.partitionDefaultName(),
@@ -720,7 +717,7 @@ private int tryCommit(
720717
throw new RuntimeException(message, retryResult.exception);
721718
}
722719

723-
commitRetryWait(retryCount);
720+
retryWaiter.retryWait(retryCount);
724721
retryCount++;
725722
}
726723
return retryCount + 1;
@@ -1070,7 +1067,7 @@ public void compactManifest() {
10701067
commitTimeout, retryCount));
10711068
}
10721069

1073-
commitRetryWait(retryCount);
1070+
retryWaiter.retryWait(retryCount);
10741071
retryCount++;
10751072
}
10761073
}
@@ -1155,19 +1152,6 @@ private boolean commitSnapshotImpl(Snapshot newSnapshot, List<PartitionEntry> de
11551152
}
11561153
}
11571154

1158-
private void commitRetryWait(int retryCount) {
1159-
int retryWait =
1160-
(int) Math.min(commitMinRetryWait * Math.pow(2, retryCount), commitMaxRetryWait);
1161-
ThreadLocalRandom random = ThreadLocalRandom.current();
1162-
retryWait += random.nextInt(Math.max(1, (int) (retryWait * 0.2)));
1163-
try {
1164-
TimeUnit.MILLISECONDS.sleep(retryWait);
1165-
} catch (InterruptedException ie) {
1166-
Thread.currentThread().interrupt();
1167-
throw new RuntimeException(ie);
1168-
}
1169-
}
1170-
11711155
@Override
11721156
public void close() {
11731157
IOUtils.closeAllQuietly(commitCallbacks);

0 commit comments

Comments
 (0)