1515 */
1616package cz .o2 .proxima .beam .io ;
1717
18+ import cz .o2 .proxima .beam .core .ProximaPipelineOptions ;
1819import cz .o2 .proxima .core .annotations .Experimental ;
1920import cz .o2 .proxima .core .repository .RepositoryFactory ;
2021import cz .o2 .proxima .core .storage .StreamElement ;
@@ -74,21 +75,29 @@ private Write(RepositoryFactory repositoryFactory) {
7475
7576 @ Override
7677 public PDone expand (PCollection <StreamElement > input ) {
77- input .apply ("Write" , ParDo .of (new WriteFn (repositoryFactory )));
78+ long bundleFinalizeTimeoutMs =
79+ input
80+ .getPipeline ()
81+ .getOptions ()
82+ .as (ProximaPipelineOptions .class )
83+ .getProximaIOWriteFinalizeTimeoutMs ();
84+ input .apply ("Write" , ParDo .of (new WriteFn (bundleFinalizeTimeoutMs , repositoryFactory )));
7885 return PDone .in (input .getPipeline ());
7986 }
8087 }
8188
8289 static class WriteFn extends DoFn <StreamElement , Void > {
8390
8491 private final RepositoryFactory repositoryFactory ;
92+ private final long bundleFinalizeTimeoutMs ;
8593
8694 private transient DirectDataOperator direct ;
8795
8896 private transient Set <CompletableFuture <Pair <Boolean , Throwable >>> pendingWrites ;
8997 private transient AtomicInteger missingResponses ;
9098
91- WriteFn (RepositoryFactory repositoryFactory ) {
99+ WriteFn (long bundleFinalizeTimeoutMs , RepositoryFactory repositoryFactory ) {
100+ this .bundleFinalizeTimeoutMs = bundleFinalizeTimeoutMs ;
92101 this .repositoryFactory = repositoryFactory ;
93102 }
94103
@@ -113,7 +122,7 @@ public void startBundle() {
113122 public void finishBundle () {
114123 long startTime = System .currentTimeMillis ();
115124 while (missingResponses .get () > 0 ) {
116- if (System .currentTimeMillis () - startTime > 5000 ) {
125+ if (System .currentTimeMillis () - startTime > bundleFinalizeTimeoutMs ) {
117126 throw new IllegalStateException ("Failed to flush bundle within timeout of 5s" );
118127 }
119128 // clone to avoid ConcurrentModificationException
@@ -124,7 +133,10 @@ public void finishBundle() {
124133 }
125134 Optional <Pair <Boolean , Throwable >> failedFuture =
126135 unfinished .stream ()
127- .map (f -> ExceptionUtils .uncheckedFactory (() -> f .get (5 , TimeUnit .SECONDS )))
136+ .map (
137+ f ->
138+ ExceptionUtils .uncheckedFactory (
139+ () -> f .get (bundleFinalizeTimeoutMs , TimeUnit .MILLISECONDS )))
128140 .filter (p -> !p .getFirst ())
129141 // this will be retried
130142 .filter (p -> !(p .getSecond () instanceof TransactionRejectedException ))
0 commit comments