diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index be2aade96e41..7e23182042c9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -108,9 +108,6 @@ import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.runners.TransformHierarchy.Node; -import org.apache.beam.sdk.state.MapState; -import org.apache.beam.sdk.state.MultimapState; -import org.apache.beam.sdk.state.SetState; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.GroupedValues; @@ -1191,7 +1188,7 @@ private List getDefaultArtifacts() { String dataflowWorkerJar = options.getDataflowWorkerJar(); if (dataflowWorkerJar != null && !dataflowWorkerJar.isEmpty() && !useUnifiedWorker(options)) { // Put the user specified worker jar at the start of the classpath, to be consistent with the - // built in worker order. + // built-in worker order. pathsToStageBuilder.add("dataflow-worker.jar=" + dataflowWorkerJar); } pathsToStageBuilder.addAll(options.getFilesToStage()); @@ -2212,7 +2209,7 @@ private static void translate( PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, byteArrayToJsonString(serializeToByteArray(new IdentityMessageFn()))); - // Using a GlobalWindowCoder as a place holder because GlobalWindowCoder is known coder. + // Using a GlobalWindowCoder as a placeholder because GlobalWindowCoder is known coder. stepContext.addEncodingInput( WindowedValues.getFullCoder(VoidCoder.of(), GlobalWindow.Coder.INSTANCE)); stepContext.addInput(PropertyNames.PARALLEL_INPUT, input); @@ -2625,7 +2622,7 @@ static class StreamingShardedWriteFactory transform) { // By default, if numShards is not set WriteFiles will produce one file per bundle. In // streaming, there are large numbers of small bundles, resulting in many tiny files. - // Instead we pick max workers * 2 to ensure full parallelism, but prevent too-many files. + // Instead, we pick max workers * 2 to ensure full parallelism, but prevent too-many files. // (current_num_workers * 2 might be a better choice, but that value is not easily available // today). // If the user does not set either numWorkers or maxNumWorkers, default to 10 shards. @@ -2732,12 +2729,6 @@ static boolean useUnifiedWorker(DataflowPipelineOptions options) { static void verifyDoFnSupported( DoFn fn, boolean streaming, DataflowPipelineOptions options) { - if (!streaming && DoFnSignatures.usesMultimapState(fn)) { - throw new UnsupportedOperationException( - String.format( - "%s does not currently support %s in batch mode", - DataflowRunner.class.getSimpleName(), MultimapState.class.getSimpleName())); - } if (streaming && DoFnSignatures.requiresTimeSortedInput(fn)) { throw new UnsupportedOperationException( String.format( @@ -2745,25 +2736,6 @@ static void verifyDoFnSupported( DataflowRunner.class.getSimpleName())); } boolean isUnifiedWorker = useUnifiedWorker(options); - - if (DoFnSignatures.usesMultimapState(fn) && isUnifiedWorker) { - throw new UnsupportedOperationException( - String.format( - "%s does not currently support %s running using streaming on unified worker", - DataflowRunner.class.getSimpleName(), MultimapState.class.getSimpleName())); - } - if (DoFnSignatures.usesSetState(fn) && streaming && isUnifiedWorker) { - throw new UnsupportedOperationException( - String.format( - "%s does not currently support %s when using streaming on unified worker", - DataflowRunner.class.getSimpleName(), SetState.class.getSimpleName())); - } - if (DoFnSignatures.usesMapState(fn) && streaming && isUnifiedWorker) { - throw new UnsupportedOperationException( - String.format( - "%s does not currently support %s when using streaming on unified worker", - DataflowRunner.class.getSimpleName(), MapState.class.getSimpleName())); - } if (DoFnSignatures.usesBundleFinalizer(fn) && !isUnifiedWorker) { throw new UnsupportedOperationException( String.format( diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index db8fbd525ac1..b944a300d5f2 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -133,6 +133,11 @@ import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.MultimapState; +import org.apache.beam.sdk.state.OrderedListState; +import org.apache.beam.sdk.state.SetState; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.state.ValueState; @@ -2511,7 +2516,7 @@ public void testEnableAllowDuplicatesForRedistributeWithALO() throws IOException options.setDataflowServiceOptions(ImmutableList.of("streaming_mode_at_least_once")); Pipeline pipeline = Pipeline.create(options); - ImmutableList> abitraryKVs = + ImmutableList> arbitraryKVs = ImmutableList.of( KV.of("k1", 3), KV.of("k5", Integer.MAX_VALUE), @@ -2522,7 +2527,7 @@ public void testEnableAllowDuplicatesForRedistributeWithALO() throws IOException KV.of("k3", 0)); PCollection> input = pipeline.apply( - Create.of(abitraryKVs).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + Create.of(arbitraryKVs).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); // The allowDuplicates for Redistribute is false by default. PCollection> output = input.apply(Redistribute.byKey()); pipeline.run(); @@ -2684,4 +2689,75 @@ public Writer createWriter() { }; } } + + @Test + public void testBatchStateSupported() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setRunner(DataflowRunner.class); + Pipeline p = Pipeline.create(options); + p.apply(Create.of(KV.of(13, 42))) + .apply( + ParDo.of( + new DoFn, Void>() { + @StateId("value") + private final StateSpec> valueState = StateSpecs.value(); + + @StateId("bag") + private final StateSpec> bagState = StateSpecs.bag(); + + @StateId("set") + private final StateSpec> setState = StateSpecs.set(); + + @StateId("map") + private final StateSpec> mapState = StateSpecs.map(); + + @StateId("multimap") + private final StateSpec> multimapState = + StateSpecs.multimap(); + + @StateId("ordered list") + private final StateSpec> orderedListState = + StateSpecs.orderedList(VoidCoder.of()); + + @ProcessElement + public void process() {} + })); + p.run(); + } + + @Test + public void testStreamingStateSupported() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setRunner(DataflowRunner.class); + options.setStreaming(true); + Pipeline p = Pipeline.create(options); + p.apply(Create.of(KV.of(13, 42))) + .apply( + ParDo.of( + new DoFn, Void>() { + @StateId("value") + private final StateSpec> valueState = StateSpecs.value(); + + @StateId("bag") + private final StateSpec> bagState = StateSpecs.bag(); + + @StateId("set") + private final StateSpec> setState = StateSpecs.set(); + + @StateId("map") + private final StateSpec> mapState = StateSpecs.map(); + + @StateId("multimap") + private final StateSpec> multimapState = + StateSpecs.multimap(); + + @StateId("ordered list") + private final StateSpec> orderedListState = + StateSpecs.orderedList(VoidCoder.of()); + + @ProcessElement + public void process() {} + })); + p.run(); + } }