Skip to content

Commit 060cc1a

Browse files
authored
Merge pull request #34919: Refactor: separate SplitAndSizeRestriction from FnApiDoFnRunner
2 parents 9eb7b98 + 3b09453 commit 060cc1a

File tree

12 files changed

+1153
-550
lines changed

12 files changed

+1153
-550
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,11 +1211,11 @@ public interface MultiOutputReceiver {
12111211
*
12121212
* <ul>
12131213
* <li>The return type {@code WatermarkEstimatorStateT} defines the watermark state type used
1214-
* within this splittable DoFn. All other methods that use a {@link
1215-
* WatermarkEstimatorState @WatermarkEstimatorState} parameter must use the same type that
1216-
* is used here. It is suggested to use as narrow of a return type definition as possible
1217-
* (for example prefer to use a square type over a shape type as a square is a type of a
1218-
* shape).
1214+
* within this splittable DoFn. The return type is allowed to be nullable. All other methods
1215+
* that use a {@link WatermarkEstimatorState @WatermarkEstimatorState} parameter must use
1216+
* the same type that is used here. It is suggested to use as narrow of a return type
1217+
* definition as possible (for example prefer to use a square type over a shape type as a
1218+
* square is a type of a shape).
12191219
* <li>If one of its arguments is tagged with the {@link Element} annotation, then it will be
12201220
* passed the current element being processed; the argument must be of type {@code InputT}.
12211221
* Note that automatic conversion of {@link Row}s and {@link FieldAccess} parameters are

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
4444
import org.apache.beam.sdk.values.Row;
4545
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
46+
import org.checkerframework.checker.nullness.qual.Nullable;
4647
import org.joda.time.Instant;
4748

4849
/**
@@ -96,7 +97,7 @@ <RestrictionT> RestrictionT invokeGetInitialRestriction(
9697
void invokeSplitRestriction(ArgumentProvider<InputT, OutputT> arguments);
9798

9899
/** Invoke the {@link TruncateRestriction} method on the bound {@link DoFn}. */
99-
<RestrictionT> TruncateResult<RestrictionT> invokeTruncateRestriction(
100+
<RestrictionT> @Nullable TruncateResult<RestrictionT> invokeTruncateRestriction(
100101
ArgumentProvider<InputT, OutputT> arguments);
101102

102103
/**
@@ -117,7 +118,13 @@ <RestrictionT> TruncateResult<RestrictionT> invokeTruncateRestriction(
117118
<RestrictionT, PositionT> RestrictionTracker<RestrictionT, PositionT> invokeNewTracker(
118119
ArgumentProvider<InputT, OutputT> arguments);
119120

120-
/** Invoke the {@link DoFn.NewWatermarkEstimator} method on the bound {@link DoFn}. */
121+
/**
122+
* Invoke the {@link DoFn.NewWatermarkEstimator} method on the bound {@link DoFn}.
123+
*
124+
* <p>Note that since {@code WatermarkEstimatorStateT} is permitted to be a nullable type, if this
125+
* method returns {@code null} that is interpreted as a valid watermark estimator state, not the
126+
* absence of a state.
127+
*/
121128
@SuppressWarnings("TypeParameterUnusedInFormals")
122129
<WatermarkEstimatorStateT>
123130
WatermarkEstimator<WatermarkEstimatorStateT> invokeNewWatermarkEstimator(

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.beam.sdk.values.TypeDescriptor;
5454
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
5555
import org.checkerframework.checker.nullness.qual.Nullable;
56+
import org.checkerframework.dataflow.qual.Pure;
5657

5758
/**
5859
* Describes the signature of a {@link DoFn}, in particular, which features it uses, which extra
@@ -67,91 +68,118 @@
6768
})
6869
public abstract class DoFnSignature {
6970
/** Class of the original {@link DoFn} from which this signature was produced. */
71+
@Pure
7072
public abstract Class<? extends DoFn<?, ?>> fnClass();
7173

7274
/** Whether this {@link DoFn} does a bounded amount of work per element. */
75+
@Pure
7376
public abstract PCollection.IsBounded isBoundedPerElement();
7477

7578
/** Details about this {@link DoFn}'s {@link DoFn.ProcessElement} method. */
79+
@Pure
7680
public abstract ProcessElementMethod processElement();
7781

7882
/** Details about the state cells that this {@link DoFn} declares. Immutable. */
83+
@Pure
7984
public abstract Map<String, StateDeclaration> stateDeclarations();
8085

8186
/** Details about this {@link DoFn}'s {@link DoFn.StartBundle} method. */
87+
@Pure
8288
public abstract @Nullable BundleMethod startBundle();
8389

8490
/** Details about this {@link DoFn}'s {@link DoFn.FinishBundle} method. */
91+
@Pure
8592
public abstract @Nullable BundleMethod finishBundle();
8693

8794
/** Details about this {@link DoFn}'s {@link DoFn.Setup} method. */
95+
@Pure
8896
public abstract @Nullable LifecycleMethod setup();
8997

9098
/** Details about this {@link DoFn}'s {@link DoFn.Teardown} method. */
99+
@Pure
91100
public abstract @Nullable LifecycleMethod teardown();
92101

93102
/** Details about this {@link DoFn}'s {@link DoFn.OnWindowExpiration} method. */
103+
@Pure
94104
public abstract @Nullable OnWindowExpirationMethod onWindowExpiration();
95105

96106
/** Timer declarations present on the {@link DoFn} class. Immutable. */
107+
@Pure
97108
public abstract Map<String, TimerDeclaration> timerDeclarations();
98109

99110
/** TimerMap declarations present on the {@link DoFn} class. Immutable. */
111+
@Pure
100112
public abstract Map<String, TimerFamilyDeclaration> timerFamilyDeclarations();
101113

102114
/** Field access declaration. */
115+
@Pure
103116
public abstract @Nullable Map<String, FieldAccessDeclaration> fieldAccessDeclarations();
104117

105118
/** Details about this {@link DoFn}'s {@link DoFn.GetInitialRestriction} method. */
119+
@Pure
106120
public abstract @Nullable GetInitialRestrictionMethod getInitialRestriction();
107121

108122
/** Details about this {@link DoFn}'s {@link DoFn.SplitRestriction} method. */
123+
@Pure
109124
public abstract @Nullable SplitRestrictionMethod splitRestriction();
110125

111126
/** Details about this {@link DoFn}'s {@link TruncateRestriction} method. */
127+
@Pure
112128
public abstract @Nullable TruncateRestrictionMethod truncateRestriction();
113129

114130
/** Details about this {@link DoFn}'s {@link DoFn.GetRestrictionCoder} method. */
131+
@Pure
115132
public abstract @Nullable GetRestrictionCoderMethod getRestrictionCoder();
116133

117134
/** Details about this {@link DoFn}'s {@link DoFn.GetWatermarkEstimatorStateCoder} method. */
135+
@Pure
118136
public abstract @Nullable GetWatermarkEstimatorStateCoderMethod getWatermarkEstimatorStateCoder();
119137

120138
/** Details about this {@link DoFn}'s {@link DoFn.GetInitialWatermarkEstimatorState} method. */
139+
@Pure
121140
public abstract @Nullable GetInitialWatermarkEstimatorStateMethod
122141
getInitialWatermarkEstimatorState();
123142

124143
/** Details about this {@link DoFn}'s {@link DoFn.NewWatermarkEstimator} method. */
144+
@Pure
125145
public abstract @Nullable NewWatermarkEstimatorMethod newWatermarkEstimator();
126146

127147
/** Details about this {@link DoFn}'s {@link DoFn.NewTracker} method. */
148+
@Pure
128149
public abstract @Nullable NewTrackerMethod newTracker();
129150

130151
/** Details about this {@link DoFn}'s {@link DoFn.GetSize} method. */
152+
@Pure
131153
public abstract @Nullable GetSizeMethod getSize();
132154

133155
/** Details about this {@link DoFn}'s {@link DoFn.OnTimer} methods. */
156+
@Pure
134157
public abstract @Nullable Map<String, OnTimerMethod> onTimerMethods();
135158

136159
/** Details about this {@link DoFn}'s {@link DoFn.OnTimerFamily} methods. */
160+
@Pure
137161
public abstract @Nullable Map<String, OnTimerFamilyMethod> onTimerFamilyMethods();
138162

139163
/** @deprecated use {@link #usesState()}, it's cleaner */
140164
@Deprecated
165+
@Pure
141166
public boolean isStateful() {
142167
return stateDeclarations().size() > 0;
143168
}
144169

145170
/** Whether the {@link DoFn} described by this signature uses state. */
171+
@Pure
146172
public boolean usesState() {
147173
return stateDeclarations().size() > 0;
148174
}
149175

150176
/** Whether the {@link DoFn} described by this signature uses timers. */
177+
@Pure
151178
public boolean usesTimers() {
152179
return timerDeclarations().size() > 0 || timerFamilyDeclarations().size() > 0;
153180
}
154181

182+
@Pure
155183
static Builder builder() {
156184
return new AutoValue_DoFnSignature.Builder();
157185
}
@@ -208,12 +236,14 @@ abstract Builder setFieldAccessDeclarations(
208236

209237
abstract Builder setOnTimerFamilyMethods(Map<String, OnTimerFamilyMethod> onTimerFamilyMethods);
210238

239+
@Pure
211240
abstract DoFnSignature build();
212241
}
213242

214243
/** A method delegated to an annotated method of an underlying {@link DoFn}. */
215244
public interface DoFnMethod {
216245
/** The annotated method itself. */
246+
@Pure
217247
Method targetMethod();
218248
}
219249

sdks/java/harness/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ applyJavaNature(
4545
'AssignWindowsRunner': 'https://github.com/typetools/checker-framework/issues/3794',
4646
'WindowMergingFnRunner': 'https://github.com/typetools/checker-framework/issues/3794',
4747
'FnApiDoFnRunner': 'https://github.com/typetools/checker-framework/issues/5436',
48-
'SplittablePairWithRestrictionDoFnRunner': 'https://github.com/typetools/checker-framework/issues/5436',
4948
],
5049
automaticModuleName: 'org.apache.beam.fn.harness',
5150
testShadowJar: true,

0 commit comments

Comments
 (0)