Skip to content

Commit 7a88e6f

Browse files
authored
[Dataflow Streaming] Move throwExceptionOnLargeOutput out of OperationalLimits (#32407)
1 parent 2a7755b commit 7a88e6f

File tree

7 files changed

+26
-34
lines changed

7 files changed

+26
-34
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,11 @@ public class OperationalLimits {
2727
public final long maxOutputKeyBytes;
2828
// Maximum size of a single output element's serialized value.
2929
public final long maxOutputValueBytes;
30-
// Whether to throw an exception when processing output that violates any of the given limits.
31-
public final boolean throwExceptionOnLargeOutput;
3230

33-
OperationalLimits(
34-
long maxWorkItemCommitBytes,
35-
long maxOutputKeyBytes,
36-
long maxOutputValueBytes,
37-
boolean throwExceptionOnLargeOutput) {
31+
OperationalLimits(long maxWorkItemCommitBytes, long maxOutputKeyBytes, long maxOutputValueBytes) {
3832
this.maxWorkItemCommitBytes = maxWorkItemCommitBytes;
3933
this.maxOutputKeyBytes = maxOutputKeyBytes;
4034
this.maxOutputValueBytes = maxOutputValueBytes;
41-
this.throwExceptionOnLargeOutput = throwExceptionOnLargeOutput;
4235
}
4336

4437
@AutoBuilder(ofClass = OperationalLimits.class)
@@ -49,16 +42,13 @@ public interface Builder {
4942

5043
Builder setMaxOutputValueBytes(long bytes);
5144

52-
Builder setThrowExceptionOnLargeOutput(boolean shouldThrow);
53-
5445
OperationalLimits build();
5546
}
5647

5748
public static Builder builder() {
5849
return new AutoBuilder_OperationalLimits_Builder()
5950
.setMaxWorkItemCommitBytes(Long.MAX_VALUE)
6051
.setMaxOutputKeyBytes(Long.MAX_VALUE)
61-
.setMaxOutputValueBytes(Long.MAX_VALUE)
62-
.setThrowExceptionOnLargeOutput(false);
52+
.setMaxOutputValueBytes(Long.MAX_VALUE);
6353
}
6454
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
445445
config ->
446446
onPipelineConfig(
447447
config,
448-
options,
449448
dispatcherClient::consumeWindmillDispatcherEndpoints,
450449
operationalLimits::set));
451450
computationStateCache = computationStateCacheFactory.apply(configFetcher);
@@ -515,7 +514,6 @@ static StreamingDataflowWorker forTesting(
515514
config ->
516515
onPipelineConfig(
517516
config,
518-
options,
519517
windmillServer::setWindmillServiceEndpoints,
520518
operationalLimits::set))
521519
: new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
@@ -598,7 +596,6 @@ static StreamingDataflowWorker forTesting(
598596

599597
private static void onPipelineConfig(
600598
StreamingEnginePipelineConfig config,
601-
DataflowWorkerHarnessOptions options,
602599
Consumer<ImmutableSet<HostAndPort>> consumeWindmillServiceEndpoints,
603600
Consumer<OperationalLimits> operationalLimits) {
604601

@@ -607,8 +604,6 @@ private static void onPipelineConfig(
607604
.setMaxWorkItemCommitBytes(config.maxWorkItemCommitBytes())
608605
.setMaxOutputKeyBytes(config.maxOutputKeyBytes())
609606
.setMaxOutputValueBytes(config.maxOutputValueBytes())
610-
.setThrowExceptionOnLargeOutput(
611-
DataflowRunner.hasExperiment(options, "throw_exceptions_on_large_output"))
612607
.build());
613608

614609
if (!config.windmillServiceEndpoints().isEmpty()) {

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
107107
private final ImmutableMap<String, String> stateNameMap;
108108
private final WindmillStateCache.ForComputation stateCache;
109109
private final ReaderCache readerCache;
110+
private final boolean throwExceptionOnLargeOutput;
110111
private volatile long backlogBytes;
111112

112113
/**
@@ -152,7 +153,8 @@ public StreamingModeExecutionContext(
152153
MetricsContainerRegistry<StreamingStepMetricsContainer> metricsContainerRegistry,
153154
DataflowExecutionStateTracker executionStateTracker,
154155
StreamingModeExecutionStateRegistry executionStateRegistry,
155-
long sinkByteLimit) {
156+
long sinkByteLimit,
157+
boolean throwExceptionOnLargeOutput) {
156158
super(
157159
counterFactory,
158160
metricsContainerRegistry,
@@ -165,6 +167,7 @@ public StreamingModeExecutionContext(
165167
this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
166168
this.stateCache = stateCache;
167169
this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN;
170+
this.throwExceptionOnLargeOutput = throwExceptionOnLargeOutput;
168171
}
169172

170173
@VisibleForTesting
@@ -181,7 +184,7 @@ public long getMaxOutputValueBytes() {
181184
}
182185

183186
public boolean throwExceptionsForLargeOutput() {
184-
return operationalLimits.throwExceptionOnLargeOutput;
187+
return throwExceptionOnLargeOutput;
185188
}
186189

187190
public boolean workIsFailed() {

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ final class ComputationWorkExecutorFactory {
6868
private static final Logger LOG = LoggerFactory.getLogger(ComputationWorkExecutorFactory.class);
6969
private static final String DISABLE_SINK_BYTE_LIMIT_EXPERIMENT =
7070
"disable_limiting_bundle_sink_bytes";
71+
// Whether to throw an exception when processing output that violates any of the operational
72+
// limits.
73+
private static final String THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT =
74+
"throw_exceptions_on_large_output";
7175

7276
private final DataflowWorkerHarnessOptions options;
7377
private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory;
@@ -90,6 +94,7 @@ final class ComputationWorkExecutorFactory {
9094

9195
private final long maxSinkBytes;
9296
private final IdGenerator idGenerator;
97+
private final boolean throwExceptionOnLargeOutput;
9398

9499
ComputationWorkExecutorFactory(
95100
DataflowWorkerHarnessOptions options,
@@ -113,6 +118,8 @@ final class ComputationWorkExecutorFactory {
113118
hasExperiment(options, DISABLE_SINK_BYTE_LIMIT_EXPERIMENT)
114119
? Long.MAX_VALUE
115120
: StreamingDataflowWorker.MAX_SINK_BYTES;
121+
this.throwExceptionOnLargeOutput =
122+
hasExperiment(options, THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT);
116123
}
117124

118125
private static Nodes.ParallelInstructionNode extractReadNode(
@@ -255,7 +262,8 @@ private StreamingModeExecutionContext createExecutionContext(
255262
stageInfo.metricsContainerRegistry(),
256263
executionStateTracker,
257264
stageInfo.executionStateRegistry(),
258-
maxSinkBytes);
265+
maxSinkBytes,
266+
throwExceptionOnLargeOutput);
259267
}
260268

261269
private DataflowMapTaskExecutor createMapTaskExecutor(

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1280,13 +1280,9 @@ public void testOutputKeyTooLargeException() throws Exception {
12801280

12811281
StreamingDataflowWorker worker =
12821282
makeWorker(
1283-
defaultWorkerParams()
1283+
defaultWorkerParams("--experiments=throw_exceptions_on_large_output")
12841284
.setInstructions(instructions)
1285-
.setOperationalLimits(
1286-
OperationalLimits.builder()
1287-
.setMaxOutputKeyBytes(15)
1288-
.setThrowExceptionOnLargeOutput(true)
1289-
.build())
1285+
.setOperationalLimits(OperationalLimits.builder().setMaxOutputKeyBytes(15).build())
12901286
.build());
12911287
worker.start();
12921288

@@ -1317,13 +1313,10 @@ public void testOutputValueTooLargeException() throws Exception {
13171313

13181314
StreamingDataflowWorker worker =
13191315
makeWorker(
1320-
defaultWorkerParams()
1316+
defaultWorkerParams("--experiments=throw_exceptions_on_large_output")
13211317
.setInstructions(instructions)
13221318
.setOperationalLimits(
1323-
OperationalLimits.builder()
1324-
.setMaxOutputValueBytes(15)
1325-
.setThrowExceptionOnLargeOutput(true)
1326-
.build())
1319+
OperationalLimits.builder().setMaxOutputValueBytes(15).build())
13271320
.build());
13281321
worker.start();
13291322

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ public void setUp() {
127127
PipelineOptionsFactory.create(),
128128
"test-work-item-id"),
129129
executionStateRegistry,
130-
Long.MAX_VALUE);
130+
Long.MAX_VALUE,
131+
/*throwExceptionOnLargeOutput=*/ false);
131132
}
132133

133134
private static Work createMockWork(Windmill.WorkItem workItem, Watermarks watermarks) {

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,8 @@ public void testReadUnboundedReader() throws Exception {
610610
PipelineOptionsFactory.create(),
611611
"test-work-item-id"),
612612
executionStateRegistry,
613-
Long.MAX_VALUE);
613+
Long.MAX_VALUE,
614+
/*throwExceptionOnLargeOutput=*/ false);
614615

615616
options.setNumWorkers(5);
616617
int maxElements = 10;
@@ -978,7 +979,8 @@ public void testFailedWorkItemsAbort() throws Exception {
978979
PipelineOptionsFactory.create(),
979980
"test-work-item-id"),
980981
executionStateRegistry,
981-
Long.MAX_VALUE);
982+
Long.MAX_VALUE,
983+
/*throwExceptionOnLargeOutput=*/ false);
982984

983985
options.setNumWorkers(5);
984986
int maxElements = 100;

0 commit comments

Comments
 (0)