2222import static org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions .checkState ;
2323
2424import com .google .auto .service .AutoService ;
25- import com .google .auto .value .AutoValue ;
2625import java .io .IOException ;
2726import java .util .ArrayList ;
2827import java .util .Arrays ;
5756import org .apache .beam .runners .core .metrics .ShortIdMap ;
5857import org .apache .beam .runners .core .metrics .SimpleMonitoringInfoBuilder ;
5958import org .apache .beam .sdk .Pipeline ;
59+ import org .apache .beam .sdk .annotations .Internal ;
6060import org .apache .beam .sdk .coders .Coder ;
6161import org .apache .beam .sdk .coders .DoubleCoder ;
6262import org .apache .beam .sdk .coders .IterableCoder ;
134134@ SuppressWarnings ({
135135 "rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
136136})
137+ @ Internal
137138public class FnApiDoFnRunner <InputT , RestrictionT , PositionT , WatermarkEstimatorStateT , OutputT >
138139 implements FnApiStateAccessor .MutatingStateContext <Object , BoundedWindow > {
139140 /** A registrar which provides a factory to handle Java {@link DoFn}s. */
@@ -144,7 +145,6 @@ public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
144145 Factory factory = new Factory ();
145146 return ImmutableMap .<String , PTransformRunnerFactory >builder ()
146147 .put (PTransformTranslation .PAR_DO_TRANSFORM_URN , factory )
147- .put (PTransformTranslation .SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN , factory )
148148 .put (PTransformTranslation .SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN , factory )
149149 .put (
150150 PTransformTranslation .SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN , factory )
@@ -158,6 +158,9 @@ static class Factory<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT,
158158 @ Override
159159 public final void addRunnerForPTransform (Context context ) throws IOException {
160160
161+ FnApiStateAccessor <Object > stateAccessor =
162+ FnApiStateAccessor .Factory .factoryForPTransformContext (context ).create ();
163+
161164 FnApiDoFnRunner <InputT , RestrictionT , PositionT , WatermarkEstimatorStateT , OutputT > runner =
162165 new FnApiDoFnRunner <>(
163166 context .getPipelineOptions (),
@@ -175,7 +178,9 @@ public final void addRunnerForPTransform(Context context) throws IOException {
175178 context ::addBundleProgressReporter ,
176179 context .getSplitListener (),
177180 context .getBundleFinalizer (),
178- FnApiStateAccessor .Factory .factoryForPTransformContext (context ));
181+ stateAccessor );
182+
183+ stateAccessor .setKeyAndWindowContext (runner );
179184
180185 for (Map .Entry <String , KV <TimeDomain , Coder <Timer <Object >>>> entry :
181186 runner .timerFamilyInfos .entrySet ()) {
@@ -338,7 +343,7 @@ public final void addRunnerForPTransform(Context context) throws IOException {
338343 Consumer <BundleProgressReporter > addBundleProgressReporter ,
339344 BundleSplitListener splitListener ,
340345 BundleFinalizer bundleFinalizer ,
341- FnApiStateAccessor . Factory <Object > stateAccessorFactory ) {
346+ FnApiStateAccessor <Object > stateAccessor ) {
342347 this .pipelineOptions = pipelineOptions ;
343348 this .pTransformId = pTransformId ;
344349 this .pTransform = pTransform ;
@@ -480,24 +485,6 @@ public final void addRunnerForPTransform(Context context) throws IOException {
480485 this .processContext = new NonWindowObservingProcessBundleContext ();
481486 }
482487 break ;
483- case PTransformTranslation .SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN :
484- if ((doFnSignature .splitRestriction () != null
485- && doFnSignature .splitRestriction ().observesWindow ())
486- || (doFnSignature .newTracker () != null && doFnSignature .newTracker ().observesWindow ())
487- || (doFnSignature .getSize () != null && doFnSignature .getSize ().observesWindow ())
488- || !sideInputMapping .isEmpty ()) {
489- mainInputConsumer = this ::processElementForWindowObservingSplitRestriction ;
490- this .processContext =
491- new SizedRestrictionWindowObservingProcessBundleContext (
492- PTransformTranslation .SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN );
493-
494- } else {
495- mainInputConsumer = this ::processElementForSplitRestriction ;
496- this .processContext =
497- new SizedRestrictionNonWindowObservingProcessBundleContext (
498- PTransformTranslation .SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN );
499- }
500- break ;
501488 case PTransformTranslation .SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN :
502489 if ((doFnSignature .truncateRestriction () != null
503490 && doFnSignature .truncateRestriction ().observesWindow ())
@@ -675,7 +662,7 @@ private ByteString encodeProgress(double value) throws IOException {
675662 // no-op
676663 }
677664
678- this .stateAccessor = stateAccessorFactory . create ( this ) ;
665+ this .stateAccessor = stateAccessor ;
679666
680667 // Register as a consumer for each timer.
681668 this .outboundTimerReceivers = new HashMap <>();
@@ -748,80 +735,6 @@ private void processElementForWindowObservingParDo(WindowedValue<InputT> elem) {
748735 }
749736 }
750737
751- private void processElementForSplitRestriction (
752- WindowedValue <KV <InputT , KV <RestrictionT , WatermarkEstimatorStateT >>> elem ) {
753- currentElement = elem .withValue (elem .getValue ().getKey ());
754- currentRestriction = elem .getValue ().getValue ().getKey ();
755- currentWatermarkEstimatorState = elem .getValue ().getValue ().getValue ();
756- currentTrackerClaimed = new AtomicBoolean (false );
757- currentTracker =
758- RestrictionTrackers .observe (
759- doFnInvoker .invokeNewTracker (processContext ),
760- new ClaimObserver <PositionT >() {
761- private final AtomicBoolean claimed =
762- Preconditions .checkNotNull (currentTrackerClaimed );
763-
764- @ Override
765- public void onClaimed (PositionT position ) {
766- claimed .lazySet (true );
767- }
768-
769- @ Override
770- public void onClaimFailed (PositionT position ) {}
771- });
772- try {
773- doFnInvoker .invokeSplitRestriction (processContext );
774- } finally {
775- currentElement = null ;
776- currentRestriction = null ;
777- currentWatermarkEstimatorState = null ;
778- currentTracker = null ;
779- currentTrackerClaimed = null ;
780- }
781-
782- this .stateAccessor .finalizeState ();
783- }
784-
785- private void processElementForWindowObservingSplitRestriction (
786- WindowedValue <KV <InputT , KV <RestrictionT , WatermarkEstimatorStateT >>> elem ) {
787- currentElement = elem .withValue (elem .getValue ().getKey ());
788- currentRestriction = elem .getValue ().getValue ().getKey ();
789- currentWatermarkEstimatorState = elem .getValue ().getValue ().getValue ();
790- try {
791- Iterator <BoundedWindow > windowIterator =
792- (Iterator <BoundedWindow >) elem .getWindows ().iterator ();
793- while (windowIterator .hasNext ()) {
794- currentWindow = windowIterator .next ();
795- currentTrackerClaimed = new AtomicBoolean (false );
796- currentTracker =
797- RestrictionTrackers .observe (
798- doFnInvoker .invokeNewTracker (processContext ),
799- new ClaimObserver <PositionT >() {
800- private final AtomicBoolean claimed =
801- Preconditions .checkNotNull (currentTrackerClaimed );
802-
803- @ Override
804- public void onClaimed (PositionT position ) {
805- claimed .lazySet (true );
806- }
807-
808- @ Override
809- public void onClaimFailed (PositionT position ) {}
810- });
811- doFnInvoker .invokeSplitRestriction (processContext );
812- }
813- } finally {
814- currentElement = null ;
815- currentRestriction = null ;
816- currentWatermarkEstimatorState = null ;
817- currentWindow = null ;
818- currentTracker = null ;
819- currentTrackerClaimed = null ;
820- }
821-
822- this .stateAccessor .finalizeState ();
823- }
824-
825738 private void processElementForTruncateRestriction (
826739 WindowedValue <KV <KV <InputT , KV <RestrictionT , WatermarkEstimatorStateT >>, Double >> elem ) {
827740 currentElement = elem .withValue (elem .getValue ().getKey ().getKey ());
@@ -906,49 +819,6 @@ public void onClaimFailed(PositionT position) {}
906819 this .stateAccessor .finalizeState ();
907820 }
908821
909- /** Internal class to hold the primary and residual roots when converted to an input element. */
910- @ AutoValue
911- @ AutoValue .CopyAnnotations
912- abstract static class WindowedSplitResult {
913- public static WindowedSplitResult forRoots (
914- WindowedValue <?> primaryInFullyProcessedWindowsRoot ,
915- WindowedValue <?> primarySplitRoot ,
916- WindowedValue <?> residualSplitRoot ,
917- WindowedValue <?> residualInUnprocessedWindowsRoot ) {
918- return new AutoValue_FnApiDoFnRunner_WindowedSplitResult (
919- primaryInFullyProcessedWindowsRoot ,
920- primarySplitRoot ,
921- residualSplitRoot ,
922- residualInUnprocessedWindowsRoot );
923- }
924-
925- public abstract @ Nullable WindowedValue <?> getPrimaryInFullyProcessedWindowsRoot ();
926-
927- public abstract @ Nullable WindowedValue <?> getPrimarySplitRoot ();
928-
929- public abstract @ Nullable WindowedValue <?> getResidualSplitRoot ();
930-
931- public abstract @ Nullable WindowedValue <?> getResidualInUnprocessedWindowsRoot ();
932- }
933-
934- @ AutoValue
935- @ AutoValue .CopyAnnotations
936- abstract static class SplitResultsWithStopIndex {
937- public static SplitResultsWithStopIndex of (
938- WindowedSplitResult windowSplit ,
939- HandlesSplits .SplitResult downstreamSplit ,
940- int newWindowStopIndex ) {
941- return new AutoValue_FnApiDoFnRunner_SplitResultsWithStopIndex (
942- windowSplit , downstreamSplit , newWindowStopIndex );
943- }
944-
945- public abstract @ Nullable WindowedSplitResult getWindowSplit ();
946-
947- public abstract HandlesSplits .@ Nullable SplitResult getDownstreamSplit ();
948-
949- public abstract int getNewWindowStopIndex ();
950- }
951-
952822 private void processElementForWindowObservingSizedElementAndRestriction (
953823 WindowedValue <KV <KV <InputT , KV <RestrictionT , WatermarkEstimatorStateT >>, Double >> elem ) {
954824 currentElement = elem .withValue (elem .getValue ().getKey ().getKey ());
0 commit comments