3030import org .apache .beam .sdk .transforms .DoFn .WindowedContext ;
3131import org .apache .beam .sdk .transforms .GroupByKey ;
3232import org .apache .beam .sdk .util .VarInt ;
33+ import org .apache .beam .sdk .values .DrainMode ;
3334import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .MoreObjects ;
3435import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions ;
3536import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .ImmutableMap ;
@@ -140,16 +141,32 @@ private static byte encodedByte(boolean isFirst, boolean isLast, Timing timing)
140141 return result ;
141142 }
142143
144+ private static byte encodeExtendedMetadataByte (DrainMode drainMode ) {
145+ return (byte ) (drainMode .ordinal () << 6 );
146+ }
147+
148+ private static DrainMode drainModeFromExtendedMetadata (byte extendedMetadataByte ) {
149+ return DrainMode .values ()[(extendedMetadataByte >>> 6 ) & 0b11];
150+ }
151+
143152 private static final ImmutableMap <Byte , PaneInfo > BYTE_TO_PANE_INFO ;
144153
145154 static {
146155 ImmutableMap .Builder <Byte , PaneInfo > decodingBuilder = ImmutableMap .builder ();
147156 for (Timing timing : Timing .values ()) {
148157 long onTimeIndex = timing == Timing .EARLY ? -1 : 0 ;
149- register (decodingBuilder , new PaneInfo (true , true , timing , 0 , onTimeIndex ));
150- register (decodingBuilder , new PaneInfo (true , false , timing , 0 , onTimeIndex ));
151- register (decodingBuilder , new PaneInfo (false , true , timing , -1 , onTimeIndex ));
152- register (decodingBuilder , new PaneInfo (false , false , timing , -1 , onTimeIndex ));
158+ register (
159+ decodingBuilder ,
160+ new PaneInfo (true , true , timing , DrainMode .NOT_DRAINING , 0 , onTimeIndex ));
161+ register (
162+ decodingBuilder ,
163+ new PaneInfo (true , false , timing , DrainMode .NOT_DRAINING , 0 , onTimeIndex ));
164+ register (
165+ decodingBuilder ,
166+ new PaneInfo (false , true , timing , DrainMode .NOT_DRAINING , -1 , onTimeIndex ));
167+ register (
168+ decodingBuilder ,
169+ new PaneInfo (false , false , timing , DrainMode .NOT_DRAINING , -1 , onTimeIndex ));
153170 }
154171 BYTE_TO_PANE_INFO = decodingBuilder .build ();
155172 }
@@ -158,11 +175,17 @@ private static void register(ImmutableMap.Builder<Byte, PaneInfo> builder, PaneI
158175 builder .put (info .encodedByte , info );
159176 }
160177
178+ // Byte containing the encoding tag, timing, isFirst, and isLast.
179+ // These bytes are cached for all known possibilities to avoid repeat processing
161180 private final byte encodedByte ;
162181
182+ // Extended metadata byte, containing drain mode
183+ private final byte extendedMetadataByte ;
184+
163185 private final boolean isFirst ;
164186 private final boolean isLast ;
165187 private final Timing timing ;
188+ private final DrainMode drainMode ;
166189 private final long index ;
167190 private final long nonSpeculativeIndex ;
168191
@@ -177,18 +200,28 @@ private static void register(ImmutableMap.Builder<Byte, PaneInfo> builder, PaneI
177200 public static final PaneInfo ON_TIME_AND_ONLY_FIRING =
178201 PaneInfo .createPane (true , true , Timing .ON_TIME , 0 , 0 );
179202
180- private PaneInfo (boolean isFirst , boolean isLast , Timing timing , long index , long onTimeIndex ) {
203+ private PaneInfo (
204+ boolean isFirst ,
205+ boolean isLast ,
206+ Timing timing ,
207+ DrainMode drainMode ,
208+ long index ,
209+ long onTimeIndex ) {
181210 this .encodedByte = encodedByte (isFirst , isLast , timing );
211+ this .extendedMetadataByte =
212+ drainMode == DrainMode .DRAINING ? encodeExtendedMetadataByte (drainMode ) : 0x00 ;
182213 this .isFirst = isFirst ;
183214 this .isLast = isLast ;
184215 this .timing = timing ;
216+ this .drainMode = drainMode ;
185217 this .index = index ;
186218 this .nonSpeculativeIndex = onTimeIndex ;
187219 }
188220
189221 public static PaneInfo createPane (boolean isFirst , boolean isLast , Timing timing ) {
190222 checkArgument (isFirst , "Indices must be provided for non-first pane info." );
191- return createPane (isFirst , isLast , timing , 0 , timing == Timing .EARLY ? -1 : 0 );
223+ return createPane (
224+ isFirst , isLast , timing , DrainMode .NOT_DRAINING , 0 , timing == Timing .EARLY ? -1 : 0 );
192225 }
193226
194227 /** Factory method to create a {@link PaneInfo} with the specified parameters. */
@@ -197,7 +230,22 @@ public static PaneInfo createPane(
197230 if (isFirst || timing == Timing .UNKNOWN ) {
198231 return checkNotNull (BYTE_TO_PANE_INFO .get (encodedByte (isFirst , isLast , timing )));
199232 } else {
200- return new PaneInfo (isFirst , isLast , timing , index , onTimeIndex );
233+ return new PaneInfo (isFirst , isLast , timing , DrainMode .NOT_DRAINING , index , onTimeIndex );
234+ }
235+ }
236+
237+ /** Factory method to create a {@link PaneInfo} with the specified parameters. */
238+ public static PaneInfo createPane (
239+ boolean isFirst ,
240+ boolean isLast ,
241+ Timing timing ,
242+ DrainMode drainMode ,
243+ long index ,
244+ long onTimeIndex ) {
245+ if (drainMode != DrainMode .DRAINING && (isFirst || timing == Timing .UNKNOWN )) {
246+ return checkNotNull (BYTE_TO_PANE_INFO .get (encodedByte (isFirst , isLast , timing )));
247+ } else {
248+ return new PaneInfo (isFirst , isLast , timing , drainMode , index , onTimeIndex );
201249 }
202250 }
203251
@@ -241,6 +289,14 @@ public long getIndex() {
241289 return index ;
242290 }
243291
292+ /**
293+ * Indicates whether this element resulted from an aggregation that fired during a drain
294+ * operation.
295+ */
296+ public DrainMode getDrainMode () {
297+ return drainMode ;
298+ }
299+
244300 /**
245301 * The zero-based index of this trigger firing among non-speculative panes.
246302 *
@@ -253,13 +309,17 @@ public long getNonSpeculativeIndex() {
253309 return nonSpeculativeIndex ;
254310 }
255311
256- int getEncodedByte () {
312+ byte getEncodedByte () {
257313 return encodedByte ;
258314 }
259315
316+ byte getExtendedMetadataByte () {
317+ return extendedMetadataByte ;
318+ }
319+
260320 @ Override
261321 public int hashCode () {
262- return Objects .hash (encodedByte , index , nonSpeculativeIndex );
322+ return Objects .hash (encodedByte , extendedMetadataByte , index , nonSpeculativeIndex );
263323 }
264324
265325 @ Override
@@ -270,6 +330,7 @@ public boolean equals(@Nullable Object obj) {
270330 } else if (obj instanceof PaneInfo ) {
271331 PaneInfo that = (PaneInfo ) obj ;
272332 return this .encodedByte == that .encodedByte
333+ && this .extendedMetadataByte == that .extendedMetadataByte
273334 && this .index == that .index
274335 && this .nonSpeculativeIndex == that .nonSpeculativeIndex ;
275336 } else {
@@ -290,6 +351,7 @@ public String toString() {
290351 .add ("timing" , timing )
291352 .add ("index" , index )
292353 .add ("onTimeIndex" , nonSpeculativeIndex != -1 ? nonSpeculativeIndex : null )
354+ .add ("drainMode" , drainMode == DrainMode .DRAINING ? "DRAINING" : null )
293355 .toString ();
294356 }
295357
@@ -298,7 +360,8 @@ public static class PaneInfoCoder extends AtomicCoder<PaneInfo> {
298360 private enum Encoding {
299361 FIRST ,
300362 ONE_INDEX ,
301- TWO_INDICES ;
363+ TWO_INDICES ,
364+ EXTENDED_METADATA ;
302365
303366 // NOTE: Do not reorder fields. The ordinal is used as part of
304367 // the encoding.
@@ -311,12 +374,15 @@ private enum Encoding {
311374 }
312375
313376 public static Encoding fromTag (byte b ) {
314- return Encoding .values ()[b >> 4 ];
377+ return Encoding .values ()[( b >>> 4 ) & 0xF ];
315378 }
316379 }
317380
318381 private Encoding chooseEncoding (PaneInfo value ) {
319- if ((value .index == 0 && value .nonSpeculativeIndex == 0 ) || value .timing == Timing .UNKNOWN ) {
382+ if (value .drainMode == DrainMode .DRAINING ) {
383+ return Encoding .EXTENDED_METADATA ;
384+ } else if ((value .index == 0 && value .nonSpeculativeIndex == 0 )
385+ || value .timing == Timing .UNKNOWN ) {
320386 return Encoding .FIRST ;
321387 } else if (value .index == value .nonSpeculativeIndex || value .timing == Timing .EARLY ) {
322388 return Encoding .ONE_INDEX ;
@@ -350,6 +416,12 @@ public void encode(PaneInfo value, final OutputStream outStream)
350416 VarInt .encode (value .index , outStream );
351417 VarInt .encode (value .nonSpeculativeIndex , outStream );
352418 break ;
419+ case EXTENDED_METADATA :
420+ outStream .write (value .encodedByte | encoding .tag );
421+ outStream .write (value .extendedMetadataByte );
422+ VarInt .encode (value .index , outStream );
423+ VarInt .encode (value .nonSpeculativeIndex , outStream );
424+ break ;
353425 default :
354426 throw new CoderException ("Unknown encoding " + encoding );
355427 }
@@ -360,6 +432,7 @@ public PaneInfo decode(final InputStream inStream) throws CoderException, IOExce
360432 byte keyAndTag = (byte ) inStream .read ();
361433 PaneInfo base = Preconditions .checkNotNull (BYTE_TO_PANE_INFO .get ((byte ) (keyAndTag & 0x0F )));
362434 long index , onTimeIndex ;
435+ DrainMode drainMode = DrainMode .NOT_DRAINING ;
363436 switch (Encoding .fromTag (keyAndTag )) {
364437 case FIRST :
365438 return base ;
@@ -371,10 +444,16 @@ public PaneInfo decode(final InputStream inStream) throws CoderException, IOExce
371444 index = VarInt .decodeLong (inStream );
372445 onTimeIndex = VarInt .decodeLong (inStream );
373446 break ;
447+ case EXTENDED_METADATA :
448+ byte extendedMetadata = (byte ) inStream .read ();
449+ drainMode = drainModeFromExtendedMetadata (extendedMetadata );
450+ index = VarInt .decodeLong (inStream );
451+ onTimeIndex = VarInt .decodeLong (inStream );
452+ break ;
374453 default :
375454 throw new CoderException ("Unknown encoding " + (keyAndTag & 0xF0 ));
376455 }
377- return new PaneInfo (base .isFirst , base .isLast , base .timing , index , onTimeIndex );
456+ return new PaneInfo (base .isFirst , base .isLast , base .timing , drainMode , index , onTimeIndex );
378457 }
379458
380459 @ Override
0 commit comments