Skip to content

Commit 712d29d

Browse files
committed
Refactor: separate SplittableSplitAndSizeRestriction
This is a further step in untangling FnApiDoFnRunner which implements a variety of loosely-related transforms, which mostly have in common that they call a user's DoFn but otherwise are disjoint.
1 parent 692fea4 commit 712d29d

File tree

10 files changed

+1160
-548
lines changed

10 files changed

+1160
-548
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,11 +1211,11 @@ public interface MultiOutputReceiver {
12111211
*
12121212
* <ul>
12131213
* <li>The return type {@code WatermarkEstimatorStateT} defines the watermark state type used
1214-
* within this splittable DoFn. All other methods that use a {@link
1215-
* WatermarkEstimatorState @WatermarkEstimatorState} parameter must use the same type that
1216-
* is used here. It is suggested to use as narrow of a return type definition as possible
1217-
* (for example prefer to use a square type over a shape type as a square is a type of a
1218-
* shape).
1214+
* within this splittable DoFn. The return type is allowed to be nullable. All other methods
1215+
* that use a {@link WatermarkEstimatorState @WatermarkEstimatorState} parameter must use
1216+
* the same type that is used here. It is suggested to use as narrow of a return type
1217+
* definition as possible (for example prefer to use a square type over a shape type as a
1218+
* square is a type of a shape).
12191219
* <li>If one of its arguments is tagged with the {@link Element} annotation, then it will be
12201220
* passed the current element being processed; the argument must be of type {@code InputT}.
12211221
* Note that automatic conversion of {@link Row}s and {@link FieldAccess} parameters are

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
4444
import org.apache.beam.sdk.values.Row;
4545
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
46+
import org.checkerframework.checker.nullness.qual.Nullable;
4647
import org.joda.time.Instant;
4748

4849
/**
@@ -96,7 +97,7 @@ <RestrictionT> RestrictionT invokeGetInitialRestriction(
9697
void invokeSplitRestriction(ArgumentProvider<InputT, OutputT> arguments);
9798

9899
/** Invoke the {@link TruncateRestriction} method on the bound {@link DoFn}. */
99-
<RestrictionT> TruncateResult<RestrictionT> invokeTruncateRestriction(
100+
<RestrictionT> @Nullable TruncateResult<RestrictionT> invokeTruncateRestriction(
100101
ArgumentProvider<InputT, OutputT> arguments);
101102

102103
/**
@@ -117,7 +118,13 @@ <RestrictionT> TruncateResult<RestrictionT> invokeTruncateRestriction(
117118
<RestrictionT, PositionT> RestrictionTracker<RestrictionT, PositionT> invokeNewTracker(
118119
ArgumentProvider<InputT, OutputT> arguments);
119120

120-
/** Invoke the {@link DoFn.NewWatermarkEstimator} method on the bound {@link DoFn}. */
121+
/**
122+
* Invoke the {@link DoFn.NewWatermarkEstimator} method on the bound {@link DoFn}.
123+
*
124+
* <p>Note that since {@code WatermarkEstimatorStateT} is permitted to be a nullable type, if this
125+
* method returns {@code null} that is interpreted as a valid watermark estimator state, not the
126+
* absence of a state.
127+
*/
121128
@SuppressWarnings("TypeParameterUnusedInFormals")
122129
<WatermarkEstimatorStateT>
123130
WatermarkEstimator<WatermarkEstimatorStateT> invokeNewWatermarkEstimator(

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java

Lines changed: 10 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2323

2424
import com.google.auto.service.AutoService;
25-
import com.google.auto.value.AutoValue;
2625
import java.io.IOException;
2726
import java.util.ArrayList;
2827
import java.util.Arrays;
@@ -57,6 +56,7 @@
5756
import org.apache.beam.runners.core.metrics.ShortIdMap;
5857
import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
5958
import org.apache.beam.sdk.Pipeline;
59+
import org.apache.beam.sdk.annotations.Internal;
6060
import org.apache.beam.sdk.coders.Coder;
6161
import org.apache.beam.sdk.coders.DoubleCoder;
6262
import org.apache.beam.sdk.coders.IterableCoder;
@@ -134,6 +134,7 @@
134134
@SuppressWarnings({
135135
"rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
136136
})
137+
@Internal
137138
public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT, OutputT>
138139
implements FnApiStateAccessor.MutatingStateContext<Object, BoundedWindow> {
139140
/** A registrar which provides a factory to handle Java {@link DoFn}s. */
@@ -144,7 +145,6 @@ public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
144145
Factory factory = new Factory();
145146
return ImmutableMap.<String, PTransformRunnerFactory>builder()
146147
.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, factory)
147-
.put(PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN, factory)
148148
.put(PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN, factory)
149149
.put(
150150
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN, factory)
@@ -158,6 +158,9 @@ static class Factory<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT,
158158
@Override
159159
public final void addRunnerForPTransform(Context context) throws IOException {
160160

161+
FnApiStateAccessor<Object> stateAccessor =
162+
FnApiStateAccessor.Factory.factoryForPTransformContext(context).create();
163+
161164
FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT, OutputT> runner =
162165
new FnApiDoFnRunner<>(
163166
context.getPipelineOptions(),
@@ -175,7 +178,9 @@ public final void addRunnerForPTransform(Context context) throws IOException {
175178
context::addBundleProgressReporter,
176179
context.getSplitListener(),
177180
context.getBundleFinalizer(),
178-
FnApiStateAccessor.Factory.factoryForPTransformContext(context));
181+
stateAccessor);
182+
183+
stateAccessor.setKeyAndWindowContext(runner);
179184

180185
for (Map.Entry<String, KV<TimeDomain, Coder<Timer<Object>>>> entry :
181186
runner.timerFamilyInfos.entrySet()) {
@@ -338,7 +343,7 @@ public final void addRunnerForPTransform(Context context) throws IOException {
338343
Consumer<BundleProgressReporter> addBundleProgressReporter,
339344
BundleSplitListener splitListener,
340345
BundleFinalizer bundleFinalizer,
341-
FnApiStateAccessor.Factory<Object> stateAccessorFactory) {
346+
FnApiStateAccessor<Object> stateAccessor) {
342347
this.pipelineOptions = pipelineOptions;
343348
this.pTransformId = pTransformId;
344349
this.pTransform = pTransform;
@@ -480,24 +485,6 @@ public final void addRunnerForPTransform(Context context) throws IOException {
480485
this.processContext = new NonWindowObservingProcessBundleContext();
481486
}
482487
break;
483-
case PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN:
484-
if ((doFnSignature.splitRestriction() != null
485-
&& doFnSignature.splitRestriction().observesWindow())
486-
|| (doFnSignature.newTracker() != null && doFnSignature.newTracker().observesWindow())
487-
|| (doFnSignature.getSize() != null && doFnSignature.getSize().observesWindow())
488-
|| !sideInputMapping.isEmpty()) {
489-
mainInputConsumer = this::processElementForWindowObservingSplitRestriction;
490-
this.processContext =
491-
new SizedRestrictionWindowObservingProcessBundleContext(
492-
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN);
493-
494-
} else {
495-
mainInputConsumer = this::processElementForSplitRestriction;
496-
this.processContext =
497-
new SizedRestrictionNonWindowObservingProcessBundleContext(
498-
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN);
499-
}
500-
break;
501488
case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
502489
if ((doFnSignature.truncateRestriction() != null
503490
&& doFnSignature.truncateRestriction().observesWindow())
@@ -675,7 +662,7 @@ private ByteString encodeProgress(double value) throws IOException {
675662
// no-op
676663
}
677664

678-
this.stateAccessor = stateAccessorFactory.create(this);
665+
this.stateAccessor = stateAccessor;
679666

680667
// Register as a consumer for each timer.
681668
this.outboundTimerReceivers = new HashMap<>();
@@ -748,80 +735,6 @@ private void processElementForWindowObservingParDo(WindowedValue<InputT> elem) {
748735
}
749736
}
750737

751-
private void processElementForSplitRestriction(
752-
WindowedValue<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>> elem) {
753-
currentElement = elem.withValue(elem.getValue().getKey());
754-
currentRestriction = elem.getValue().getValue().getKey();
755-
currentWatermarkEstimatorState = elem.getValue().getValue().getValue();
756-
currentTrackerClaimed = new AtomicBoolean(false);
757-
currentTracker =
758-
RestrictionTrackers.observe(
759-
doFnInvoker.invokeNewTracker(processContext),
760-
new ClaimObserver<PositionT>() {
761-
private final AtomicBoolean claimed =
762-
Preconditions.checkNotNull(currentTrackerClaimed);
763-
764-
@Override
765-
public void onClaimed(PositionT position) {
766-
claimed.lazySet(true);
767-
}
768-
769-
@Override
770-
public void onClaimFailed(PositionT position) {}
771-
});
772-
try {
773-
doFnInvoker.invokeSplitRestriction(processContext);
774-
} finally {
775-
currentElement = null;
776-
currentRestriction = null;
777-
currentWatermarkEstimatorState = null;
778-
currentTracker = null;
779-
currentTrackerClaimed = null;
780-
}
781-
782-
this.stateAccessor.finalizeState();
783-
}
784-
785-
private void processElementForWindowObservingSplitRestriction(
786-
WindowedValue<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>> elem) {
787-
currentElement = elem.withValue(elem.getValue().getKey());
788-
currentRestriction = elem.getValue().getValue().getKey();
789-
currentWatermarkEstimatorState = elem.getValue().getValue().getValue();
790-
try {
791-
Iterator<BoundedWindow> windowIterator =
792-
(Iterator<BoundedWindow>) elem.getWindows().iterator();
793-
while (windowIterator.hasNext()) {
794-
currentWindow = windowIterator.next();
795-
currentTrackerClaimed = new AtomicBoolean(false);
796-
currentTracker =
797-
RestrictionTrackers.observe(
798-
doFnInvoker.invokeNewTracker(processContext),
799-
new ClaimObserver<PositionT>() {
800-
private final AtomicBoolean claimed =
801-
Preconditions.checkNotNull(currentTrackerClaimed);
802-
803-
@Override
804-
public void onClaimed(PositionT position) {
805-
claimed.lazySet(true);
806-
}
807-
808-
@Override
809-
public void onClaimFailed(PositionT position) {}
810-
});
811-
doFnInvoker.invokeSplitRestriction(processContext);
812-
}
813-
} finally {
814-
currentElement = null;
815-
currentRestriction = null;
816-
currentWatermarkEstimatorState = null;
817-
currentWindow = null;
818-
currentTracker = null;
819-
currentTrackerClaimed = null;
820-
}
821-
822-
this.stateAccessor.finalizeState();
823-
}
824-
825738
private void processElementForTruncateRestriction(
826739
WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
827740
currentElement = elem.withValue(elem.getValue().getKey().getKey());
@@ -906,49 +819,6 @@ public void onClaimFailed(PositionT position) {}
906819
this.stateAccessor.finalizeState();
907820
}
908821

909-
/** Internal class to hold the primary and residual roots when converted to an input element. */
910-
@AutoValue
911-
@AutoValue.CopyAnnotations
912-
abstract static class WindowedSplitResult {
913-
public static WindowedSplitResult forRoots(
914-
WindowedValue<?> primaryInFullyProcessedWindowsRoot,
915-
WindowedValue<?> primarySplitRoot,
916-
WindowedValue<?> residualSplitRoot,
917-
WindowedValue<?> residualInUnprocessedWindowsRoot) {
918-
return new AutoValue_FnApiDoFnRunner_WindowedSplitResult(
919-
primaryInFullyProcessedWindowsRoot,
920-
primarySplitRoot,
921-
residualSplitRoot,
922-
residualInUnprocessedWindowsRoot);
923-
}
924-
925-
public abstract @Nullable WindowedValue<?> getPrimaryInFullyProcessedWindowsRoot();
926-
927-
public abstract @Nullable WindowedValue<?> getPrimarySplitRoot();
928-
929-
public abstract @Nullable WindowedValue<?> getResidualSplitRoot();
930-
931-
public abstract @Nullable WindowedValue<?> getResidualInUnprocessedWindowsRoot();
932-
}
933-
934-
@AutoValue
935-
@AutoValue.CopyAnnotations
936-
abstract static class SplitResultsWithStopIndex {
937-
public static SplitResultsWithStopIndex of(
938-
WindowedSplitResult windowSplit,
939-
HandlesSplits.SplitResult downstreamSplit,
940-
int newWindowStopIndex) {
941-
return new AutoValue_FnApiDoFnRunner_SplitResultsWithStopIndex(
942-
windowSplit, downstreamSplit, newWindowStopIndex);
943-
}
944-
945-
public abstract @Nullable WindowedSplitResult getWindowSplit();
946-
947-
public abstract HandlesSplits.@Nullable SplitResult getDownstreamSplit();
948-
949-
public abstract int getNewWindowStopIndex();
950-
}
951-
952822
private void processElementForWindowObservingSizedElementAndRestriction(
953823
WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
954824
currentElement = elem.withValue(elem.getValue().getKey().getKey());
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.fn.harness;
19+
20+
import com.google.auto.value.AutoValue;
21+
import org.apache.beam.sdk.annotations.Internal;
22+
import org.checkerframework.checker.nullness.qual.Nullable;
23+
24+
@AutoValue
25+
@AutoValue.CopyAnnotations
26+
@Internal
27+
abstract class SplitResultsWithStopIndex {
28+
public static SplitResultsWithStopIndex of(
29+
WindowedSplitResult windowSplit,
30+
HandlesSplits.SplitResult downstreamSplit,
31+
int newWindowStopIndex) {
32+
return new AutoValue_SplitResultsWithStopIndex(
33+
windowSplit, downstreamSplit, newWindowStopIndex);
34+
}
35+
36+
public abstract @Nullable WindowedSplitResult getWindowSplit();
37+
38+
public abstract HandlesSplits.@Nullable SplitResult getDownstreamSplit();
39+
40+
public abstract int getNewWindowStopIndex();
41+
}

0 commit comments

Comments
 (0)