Skip to content

Commit 4f80d2b

Browse files
authored
Merge pull request #990: [proxima-beam-core] configurable finish bundle timeout for ProximaIO
2 parents c5c3ef7 + b8e4e5e commit 4f80d2b

File tree

3 files changed

+23
-6
lines changed

3 files changed

+23
-6
lines changed

beam/core/src/main/java/cz/o2/proxima/beam/core/ProximaPipelineOptions.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,9 @@ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
5555
boolean getEnforceStableInputForRemoteConsumer();
5656

5757
void setEnforceStableInputForRemoteConsumer(boolean enforce);
58+
59+
@Default.Long(30000L)
60+
long getProximaIOWriteFinalizeTimeoutMs();
61+
62+
void setProximaIOWriteFinalizeTimeoutMs(long timeout);
5863
}

beam/core/src/main/java/cz/o2/proxima/beam/io/ProximaIO.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package cz.o2.proxima.beam.io;
1717

18+
import cz.o2.proxima.beam.core.ProximaPipelineOptions;
1819
import cz.o2.proxima.core.annotations.Experimental;
1920
import cz.o2.proxima.core.repository.RepositoryFactory;
2021
import cz.o2.proxima.core.storage.StreamElement;
@@ -74,21 +75,29 @@ private Write(RepositoryFactory repositoryFactory) {
7475

7576
@Override
7677
public PDone expand(PCollection<StreamElement> input) {
77-
input.apply("Write", ParDo.of(new WriteFn(repositoryFactory)));
78+
long bundleFinalizeTimeoutMs =
79+
input
80+
.getPipeline()
81+
.getOptions()
82+
.as(ProximaPipelineOptions.class)
83+
.getProximaIOWriteFinalizeTimeoutMs();
84+
input.apply("Write", ParDo.of(new WriteFn(bundleFinalizeTimeoutMs, repositoryFactory)));
7885
return PDone.in(input.getPipeline());
7986
}
8087
}
8188

8289
static class WriteFn extends DoFn<StreamElement, Void> {
8390

8491
private final RepositoryFactory repositoryFactory;
92+
private final long bundleFinalizeTimeoutMs;
8593

8694
private transient DirectDataOperator direct;
8795

8896
private transient Set<CompletableFuture<Pair<Boolean, Throwable>>> pendingWrites;
8997
private transient AtomicInteger missingResponses;
9098

91-
WriteFn(RepositoryFactory repositoryFactory) {
99+
WriteFn(long bundleFinalizeTimeoutMs, RepositoryFactory repositoryFactory) {
100+
this.bundleFinalizeTimeoutMs = bundleFinalizeTimeoutMs;
92101
this.repositoryFactory = repositoryFactory;
93102
}
94103

@@ -113,7 +122,7 @@ public void startBundle() {
113122
public void finishBundle() {
114123
long startTime = System.currentTimeMillis();
115124
while (missingResponses.get() > 0) {
116-
if (System.currentTimeMillis() - startTime > 5000) {
125+
if (System.currentTimeMillis() - startTime > bundleFinalizeTimeoutMs) {
117126
throw new IllegalStateException("Failed to flush bundle within timeout of 5s");
118127
}
119128
// clone to avoid ConcurrentModificationException
@@ -124,7 +133,10 @@ public void finishBundle() {
124133
}
125134
Optional<Pair<Boolean, Throwable>> failedFuture =
126135
unfinished.stream()
127-
.map(f -> ExceptionUtils.uncheckedFactory(() -> f.get(5, TimeUnit.SECONDS)))
136+
.map(
137+
f ->
138+
ExceptionUtils.uncheckedFactory(
139+
() -> f.get(bundleFinalizeTimeoutMs, TimeUnit.MILLISECONDS)))
128140
.filter(p -> !p.getFirst())
129141
// this will be retried
130142
.filter(p -> !(p.getSecond() instanceof TransactionRejectedException))

beam/core/src/test/java/cz/o2/proxima/beam/io/ProximaIOWriteFnTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class ProximaIOWriteFnTest {
4949
Repository.ofTest(ConfigFactory.load("test-reference.conf").resolve());
5050
private final EntityDescriptor gateway = repository.getEntity("gateway");
5151
private final AttributeDescriptor<byte[]> status = gateway.getAttribute("status");
52-
private final WriteFn writeFn = new WriteFn(repository.asFactory());
52+
private final WriteFn writeFn = new WriteFn(1000L, repository.asFactory());
5353
private RandomAccessReader reader;
5454

5555
@Before
@@ -88,7 +88,7 @@ public void testTransactionRejection() {
8888
AtomicInteger written = new AtomicInteger();
8989
OnlineAttributeWriter mockWriter = createSerializableWriter(fails, written);
9090
WriteFn modifiedWriteFn =
91-
new WriteFn(repository.asFactory()) {
91+
new WriteFn(30000L, repository.asFactory()) {
9292
@Override
9393
OnlineAttributeWriter getWriterForElement(StreamElement element) {
9494
return mockWriter;

0 commit comments

Comments
 (0)