1717 */
1818package org .apache .beam .sdk .fn .data ;
1919
20+ import java .time .Duration ;
2021import java .util .HashSet ;
2122import java .util .concurrent .CompletableFuture ;
2223import java .util .concurrent .ConcurrentHashMap ;
23- import java .util .concurrent .ConcurrentMap ;
2424import java .util .concurrent .ExecutionException ;
25+ import java .util .concurrent .TimeUnit ;
26+ import java .util .concurrent .TimeoutException ;
2527import java .util .function .Consumer ;
2628import org .apache .beam .model .fnexecution .v1 .BeamFnApi ;
2729import org .apache .beam .model .pipeline .v1 .Endpoints ;
3032import org .apache .beam .vendor .grpc .v1p60p1 .io .grpc .stub .StreamObserver ;
3133import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .annotations .VisibleForTesting ;
3234import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .MoreObjects ;
35+ import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .cache .Cache ;
36+ import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .cache .CacheBuilder ;
3337import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .ImmutableList ;
3438import org .checkerframework .checker .nullness .qual .Nullable ;
3539import org .slf4j .Logger ;
4953 */
5054public class BeamFnDataGrpcMultiplexer implements AutoCloseable {
5155 private static final Logger LOG = LoggerFactory .getLogger (BeamFnDataGrpcMultiplexer .class );
56+ private static final Duration POISONED_INSTRUCTION_ID_CACHE_TIMEOUT = Duration .ofMinutes (20 );
5257 private final Endpoints .@ Nullable ApiServiceDescriptor apiServiceDescriptor ;
5358 private final StreamObserver <BeamFnApi .Elements > inboundObserver ;
5459 private final StreamObserver <BeamFnApi .Elements > outboundObserver ;
55- private final ConcurrentMap <
60+ private final ConcurrentHashMap <
5661 /*instructionId=*/ String , CompletableFuture <CloseableFnDataReceiver <BeamFnApi .Elements >>>
5762 receivers ;
58- private final ConcurrentMap <String , Boolean > erroredInstructionIds ;
63+ private final Cache </*instructionId=*/ String , /*unused=*/ Boolean > poisonedInstructionIds ;
64+
65+ private static class PoisonedException extends RuntimeException {
66+ public PoisonedException () {
67+ super ("Instruction poisoned" );
68+ }
69+ };
5970
6071 public BeamFnDataGrpcMultiplexer (
6172 Endpoints .@ Nullable ApiServiceDescriptor apiServiceDescriptor ,
@@ -64,7 +75,8 @@ public BeamFnDataGrpcMultiplexer(
6475 baseOutboundObserverFactory ) {
6576 this .apiServiceDescriptor = apiServiceDescriptor ;
6677 this .receivers = new ConcurrentHashMap <>();
67- this .erroredInstructionIds = new ConcurrentHashMap <>();
78+ this .poisonedInstructionIds =
79+ CacheBuilder .newBuilder ().expireAfterWrite (POISONED_INSTRUCTION_ID_CACHE_TIMEOUT ).build ();
6880 this .inboundObserver = new InboundObserver ();
6981 this .outboundObserver =
7082 outboundObserverFactory .outboundObserverFor (baseOutboundObserverFactory , inboundObserver );
@@ -87,29 +99,70 @@ public StreamObserver<BeamFnApi.Elements> getOutboundObserver() {
8799 return outboundObserver ;
88100 }
89101
90- private CompletableFuture <CloseableFnDataReceiver <BeamFnApi .Elements >> receiverFuture (
91- String instructionId ) {
92- return receivers .computeIfAbsent (instructionId , (unused ) -> new CompletableFuture <>());
93- }
94-
95102 /**
96103 * Registers a consumer for the specified instruction id.
97104 *
98105 * <p>The {@link BeamFnDataGrpcMultiplexer} partitions {@link BeamFnApi.Elements} with multiple
99106 * instruction ids ensuring that the receiver will only see {@link BeamFnApi.Elements} with a
100107 * single instruction id.
101108 *
102- * <p>The caller must {@link #unregisterConsumer unregister the consumer} when they no longer wish
103- * to receive messages.
109+ * <p>The caller must either {@link #unregisterConsumer unregister the consumer} when all messages
110+ * have been processed or {@link #poisonInstructionId(String) poison the instruction} if messages
111+ * for the instruction should be dropped.
104112 */
105113 public void registerConsumer (
106114 String instructionId , CloseableFnDataReceiver <BeamFnApi .Elements > receiver ) {
107- receiverFuture (instructionId ).complete (receiver );
115+ receivers .compute (
116+ instructionId ,
117+ (unused , existing ) -> {
118+ if (existing != null ) {
119+ if (!existing .complete (receiver )) {
120+ throw new IllegalArgumentException ("Instruction id was registered twice" );
121+ }
122+ return existing ;
123+ }
124+ if (poisonedInstructionIds .getIfPresent (instructionId ) != null ) {
125+ throw new IllegalArgumentException ("Instruction id was poisoned" );
126+ }
127+ return CompletableFuture .completedFuture (receiver );
128+ });
108129 }
109130
110- /** Unregisters a consumer. */
131+ /** Unregisters a previously registered consumer. */
111132 public void unregisterConsumer (String instructionId ) {
112- receivers .remove (instructionId );
133+ @ Nullable
134+ CompletableFuture <CloseableFnDataReceiver <BeamFnApi .Elements >> receiverFuture =
135+ receivers .remove (instructionId );
136+ if (receiverFuture != null && !receiverFuture .isDone ()) {
137+ // The future must have been inserted by the inbound observer since registerConsumer completes
138+ // the future.
139+ throw new IllegalArgumentException ("Unregistering consumer which was not registered." );
140+ }
141+ }
142+
143+ /**
144+ * Poisons an instruction id.
145+ *
146+ * <p>Any records for the instruction on the inbound observer will be dropped for the next {@link
147+ * #POISONED_INSTRUCTION_ID_CACHE_TIMEOUT}.
148+ */
149+ public void poisonInstructionId (String instructionId ) {
150+ poisonedInstructionIds .put (instructionId , Boolean .TRUE );
151+ @ Nullable
152+ CompletableFuture <CloseableFnDataReceiver <BeamFnApi .Elements >> receiverFuture =
153+ receivers .remove (instructionId );
154+ if (receiverFuture != null ) {
155+ // Completing exceptionally has no effect if the future was already notified. In that case
156+ // whatever registered the receiver needs to handle cancelling it.
157+ receiverFuture .completeExceptionally (new PoisonedException ());
158+ if (!receiverFuture .isCompletedExceptionally ()) {
159+ try {
160+ receiverFuture .get ().close ();
161+ } catch (Exception e ) {
162+ LOG .warn ("Unexpected error closing existing observer" );
163+ }
164+ }
165+ }
113166 }
114167
115168 @ VisibleForTesting
@@ -210,27 +263,42 @@ public void onNext(BeamFnApi.Elements value) {
210263 }
211264
212265 private void forwardToConsumerForInstructionId (String instructionId , BeamFnApi .Elements value ) {
213- if (erroredInstructionIds .containsKey (instructionId )) {
214- LOG .debug ("Ignoring inbound data for failed instruction {}" , instructionId );
215- return ;
216- }
217- CompletableFuture <CloseableFnDataReceiver <BeamFnApi .Elements >> consumerFuture =
218- receiverFuture (instructionId );
219- if (!consumerFuture .isDone ()) {
220- LOG .debug (
221- "Received data for instruction {} without consumer ready. "
222- + "Waiting for consumer to be registered." ,
223- instructionId );
224- }
225266 CloseableFnDataReceiver <BeamFnApi .Elements > consumer ;
226267 try {
227- consumer = consumerFuture .get ();
228-
268+ CompletableFuture <CloseableFnDataReceiver <BeamFnApi .Elements >> consumerFuture =
269+ receivers .computeIfAbsent (
270+ instructionId ,
271+ (unused ) -> {
272+ if (poisonedInstructionIds .getIfPresent (instructionId ) != null ) {
273+ throw new PoisonedException ();
274+ }
275+ LOG .debug (
276+ "Received data for instruction {} without consumer ready. "
277+ + "Waiting for consumer to be registered." ,
278+ instructionId );
279+ return new CompletableFuture <>();
280+ });
281+ // The consumer may not be registered until the bundle processor is fully constructed so we
282+ // conservatively set
283+ // a high timeout. Poisoning will prevent this for occurring for consumers that will not be
284+ // registered.
285+ consumer = consumerFuture .get (3 , TimeUnit .HOURS );
229286 /*
230287 * TODO: On failure we should fail any bundles that were impacted eagerly
231288 * instead of relying on the Runner harness to do all the failure handling.
232289 */
233- } catch (ExecutionException | InterruptedException e ) {
290+ } catch (TimeoutException e ) {
291+ LOG .error (
292+ "Timed out waiting to observe consumer data stream for instruction {}" ,
293+ instructionId ,
294+ e );
295+ outboundObserver .onError (e );
296+ return ;
297+ } catch (ExecutionException | InterruptedException | PoisonedException e ) {
298+ if (e instanceof PoisonedException || e .getCause () instanceof PoisonedException ) {
299+ LOG .debug ("Received data for poisoned instruction {}. Dropping input." , instructionId );
300+ return ;
301+ }
234302 LOG .error (
235303 "Client interrupted during handling of data for instruction {}" , instructionId , e );
236304 outboundObserver .onError (e );
@@ -240,10 +308,11 @@ private void forwardToConsumerForInstructionId(String instructionId, BeamFnApi.E
240308 outboundObserver .onError (e );
241309 return ;
242310 }
311+
243312 try {
244313 consumer .accept (value );
245314 } catch (Exception e ) {
246- erroredInstructionIds . put (instructionId , true );
315+ poisonInstructionId (instructionId );
247316 }
248317 }
249318
0 commit comments