Skip to content

Commit 544e5c0

Browse files
committed
Refactor: separate SplittableSplitAndSizeRestriction
This is a further step in untangling FnApiDoFnRunner which implements a variety of loosely-related transforms, which mostly have in common that they call a user's DoFn but otherwise are disjoint.
1 parent 1de0d1f commit 544e5c0

File tree

6 files changed

+1071
-506
lines changed

6 files changed

+1071
-506
lines changed

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java

Lines changed: 0 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2323

2424
import com.google.auto.service.AutoService;
25-
import com.google.auto.value.AutoValue;
2625
import java.io.IOException;
2726
import java.util.ArrayList;
2827
import java.util.Arrays;
@@ -144,7 +143,6 @@ public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
144143
Factory factory = new Factory();
145144
return ImmutableMap.<String, PTransformRunnerFactory>builder()
146145
.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, factory)
147-
.put(PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN, factory)
148146
.put(PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN, factory)
149147
.put(
150148
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN, factory)
@@ -480,24 +478,6 @@ public final void addRunnerForPTransform(Context context) throws IOException {
480478
this.processContext = new NonWindowObservingProcessBundleContext();
481479
}
482480
break;
483-
case PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN:
484-
if ((doFnSignature.splitRestriction() != null
485-
&& doFnSignature.splitRestriction().observesWindow())
486-
|| (doFnSignature.newTracker() != null && doFnSignature.newTracker().observesWindow())
487-
|| (doFnSignature.getSize() != null && doFnSignature.getSize().observesWindow())
488-
|| !sideInputMapping.isEmpty()) {
489-
mainInputConsumer = this::processElementForWindowObservingSplitRestriction;
490-
this.processContext =
491-
new SizedRestrictionWindowObservingProcessBundleContext(
492-
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN);
493-
494-
} else {
495-
mainInputConsumer = this::processElementForSplitRestriction;
496-
this.processContext =
497-
new SizedRestrictionNonWindowObservingProcessBundleContext(
498-
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN);
499-
}
500-
break;
501481
case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
502482
if ((doFnSignature.truncateRestriction() != null
503483
&& doFnSignature.truncateRestriction().observesWindow())
@@ -748,80 +728,6 @@ private void processElementForWindowObservingParDo(WindowedValue<InputT> elem) {
748728
}
749729
}
750730

751-
private void processElementForSplitRestriction(
752-
WindowedValue<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>> elem) {
753-
currentElement = elem.withValue(elem.getValue().getKey());
754-
currentRestriction = elem.getValue().getValue().getKey();
755-
currentWatermarkEstimatorState = elem.getValue().getValue().getValue();
756-
currentTrackerClaimed = new AtomicBoolean(false);
757-
currentTracker =
758-
RestrictionTrackers.observe(
759-
doFnInvoker.invokeNewTracker(processContext),
760-
new ClaimObserver<PositionT>() {
761-
private final AtomicBoolean claimed =
762-
Preconditions.checkNotNull(currentTrackerClaimed);
763-
764-
@Override
765-
public void onClaimed(PositionT position) {
766-
claimed.lazySet(true);
767-
}
768-
769-
@Override
770-
public void onClaimFailed(PositionT position) {}
771-
});
772-
try {
773-
doFnInvoker.invokeSplitRestriction(processContext);
774-
} finally {
775-
currentElement = null;
776-
currentRestriction = null;
777-
currentWatermarkEstimatorState = null;
778-
currentTracker = null;
779-
currentTrackerClaimed = null;
780-
}
781-
782-
this.stateAccessor.finalizeState();
783-
}
784-
785-
private void processElementForWindowObservingSplitRestriction(
786-
WindowedValue<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>> elem) {
787-
currentElement = elem.withValue(elem.getValue().getKey());
788-
currentRestriction = elem.getValue().getValue().getKey();
789-
currentWatermarkEstimatorState = elem.getValue().getValue().getValue();
790-
try {
791-
Iterator<BoundedWindow> windowIterator =
792-
(Iterator<BoundedWindow>) elem.getWindows().iterator();
793-
while (windowIterator.hasNext()) {
794-
currentWindow = windowIterator.next();
795-
currentTrackerClaimed = new AtomicBoolean(false);
796-
currentTracker =
797-
RestrictionTrackers.observe(
798-
doFnInvoker.invokeNewTracker(processContext),
799-
new ClaimObserver<PositionT>() {
800-
private final AtomicBoolean claimed =
801-
Preconditions.checkNotNull(currentTrackerClaimed);
802-
803-
@Override
804-
public void onClaimed(PositionT position) {
805-
claimed.lazySet(true);
806-
}
807-
808-
@Override
809-
public void onClaimFailed(PositionT position) {}
810-
});
811-
doFnInvoker.invokeSplitRestriction(processContext);
812-
}
813-
} finally {
814-
currentElement = null;
815-
currentRestriction = null;
816-
currentWatermarkEstimatorState = null;
817-
currentWindow = null;
818-
currentTracker = null;
819-
currentTrackerClaimed = null;
820-
}
821-
822-
this.stateAccessor.finalizeState();
823-
}
824-
825731
private void processElementForTruncateRestriction(
826732
WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
827733
currentElement = elem.withValue(elem.getValue().getKey().getKey());
@@ -906,49 +812,6 @@ public void onClaimFailed(PositionT position) {}
906812
this.stateAccessor.finalizeState();
907813
}
908814

909-
/** Internal class to hold the primary and residual roots when converted to an input element. */
910-
@AutoValue
911-
@AutoValue.CopyAnnotations
912-
abstract static class WindowedSplitResult {
913-
public static WindowedSplitResult forRoots(
914-
WindowedValue<?> primaryInFullyProcessedWindowsRoot,
915-
WindowedValue<?> primarySplitRoot,
916-
WindowedValue<?> residualSplitRoot,
917-
WindowedValue<?> residualInUnprocessedWindowsRoot) {
918-
return new AutoValue_FnApiDoFnRunner_WindowedSplitResult(
919-
primaryInFullyProcessedWindowsRoot,
920-
primarySplitRoot,
921-
residualSplitRoot,
922-
residualInUnprocessedWindowsRoot);
923-
}
924-
925-
public abstract @Nullable WindowedValue<?> getPrimaryInFullyProcessedWindowsRoot();
926-
927-
public abstract @Nullable WindowedValue<?> getPrimarySplitRoot();
928-
929-
public abstract @Nullable WindowedValue<?> getResidualSplitRoot();
930-
931-
public abstract @Nullable WindowedValue<?> getResidualInUnprocessedWindowsRoot();
932-
}
933-
934-
@AutoValue
935-
@AutoValue.CopyAnnotations
936-
abstract static class SplitResultsWithStopIndex {
937-
public static SplitResultsWithStopIndex of(
938-
WindowedSplitResult windowSplit,
939-
HandlesSplits.SplitResult downstreamSplit,
940-
int newWindowStopIndex) {
941-
return new AutoValue_FnApiDoFnRunner_SplitResultsWithStopIndex(
942-
windowSplit, downstreamSplit, newWindowStopIndex);
943-
}
944-
945-
public abstract @Nullable WindowedSplitResult getWindowSplit();
946-
947-
public abstract HandlesSplits.@Nullable SplitResult getDownstreamSplit();
948-
949-
public abstract int getNewWindowStopIndex();
950-
}
951-
952815
private void processElementForWindowObservingSizedElementAndRestriction(
953816
WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
954817
currentElement = elem.withValue(elem.getValue().getKey().getKey());
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.fn.harness;
19+
20+
import com.google.auto.value.AutoValue;
21+
import org.checkerframework.checker.nullness.qual.Nullable;
22+
23+
@AutoValue
24+
@AutoValue.CopyAnnotations
25+
abstract class SplitResultsWithStopIndex {
26+
public static SplitResultsWithStopIndex of(
27+
WindowedSplitResult windowSplit,
28+
HandlesSplits.SplitResult downstreamSplit,
29+
int newWindowStopIndex) {
30+
return new AutoValue_SplitResultsWithStopIndex(
31+
windowSplit, downstreamSplit, newWindowStopIndex);
32+
}
33+
34+
public abstract @Nullable WindowedSplitResult getWindowSplit();
35+
36+
public abstract HandlesSplits.@Nullable SplitResult getDownstreamSplit();
37+
38+
public abstract int getNewWindowStopIndex();
39+
}

0 commit comments

Comments
 (0)