Skip to content

Commit 6327aab

Browse files
authored
Multimap streaming (#36714)
* Changes multimap state key() tests to not care about order. There is no guarantee on the order keys are returned. Also fixes a couple warnings from other FnApi tests. * Adds Multimap user state support to the Java FnApi harness. Also adds a missing FnApi state proto to get all of the entries of a multimap. This type of access is part of the state API (and supported by the non-portable harness), but was not present in the protos. * Adds FnApi binding for entries() method. * Removes extra unchecked warning supression. I had added it because it turned my entire file in IntelliJ yellow. * Removes extra comma. * Removes not-needed @nonnull annotations. * Update FnApiStateAccessor.java Removes non-needed @nonnull annotations. * Changes multimap entries() iterable to put values for the same key from the backend and local adds together. Also needed to make maybePrefetchable public. * Adds a test that prefetching multimap entries results in a StateRequest sent across FnApi. * Adds an environment capability for multimap state and sets in for the java sdk. * Removes checks for persistent state types that are now supported.
1 parent 89947fd commit 6327aab

File tree

2 files changed

+81
-33
lines changed

2 files changed

+81
-33
lines changed

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,6 @@
108108
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
109109
import org.apache.beam.sdk.runners.TransformHierarchy;
110110
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
111-
import org.apache.beam.sdk.state.MapState;
112-
import org.apache.beam.sdk.state.MultimapState;
113-
import org.apache.beam.sdk.state.SetState;
114111
import org.apache.beam.sdk.transforms.Combine;
115112
import org.apache.beam.sdk.transforms.Combine.CombineFn;
116113
import org.apache.beam.sdk.transforms.Combine.GroupedValues;
@@ -1191,7 +1188,7 @@ private List<RunnerApi.ArtifactInformation> getDefaultArtifacts() {
11911188
String dataflowWorkerJar = options.getDataflowWorkerJar();
11921189
if (dataflowWorkerJar != null && !dataflowWorkerJar.isEmpty() && !useUnifiedWorker(options)) {
11931190
// Put the user specified worker jar at the start of the classpath, to be consistent with the
1194-
// built in worker order.
1191+
// built-in worker order.
11951192
pathsToStageBuilder.add("dataflow-worker.jar=" + dataflowWorkerJar);
11961193
}
11971194
pathsToStageBuilder.addAll(options.getFilesToStage());
@@ -2212,7 +2209,7 @@ private static void translate(
22122209
PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN,
22132210
byteArrayToJsonString(serializeToByteArray(new IdentityMessageFn())));
22142211

2215-
// Using a GlobalWindowCoder as a place holder because GlobalWindowCoder is known coder.
2212+
// Using a GlobalWindowCoder as a placeholder because GlobalWindowCoder is known coder.
22162213
stepContext.addEncodingInput(
22172214
WindowedValues.getFullCoder(VoidCoder.of(), GlobalWindow.Coder.INSTANCE));
22182215
stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
@@ -2625,7 +2622,7 @@ static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT>
26252622
transform) {
26262623
// By default, if numShards is not set WriteFiles will produce one file per bundle. In
26272624
// streaming, there are large numbers of small bundles, resulting in many tiny files.
2628-
// Instead we pick max workers * 2 to ensure full parallelism, but prevent too-many files.
2625+
// Instead, we pick max workers * 2 to ensure full parallelism, but prevent too-many files.
26292626
// (current_num_workers * 2 might be a better choice, but that value is not easily available
26302627
// today).
26312628
// If the user does not set either numWorkers or maxNumWorkers, default to 10 shards.
@@ -2732,38 +2729,13 @@ static boolean useUnifiedWorker(DataflowPipelineOptions options) {
27322729

27332730
static void verifyDoFnSupported(
27342731
DoFn<?, ?> fn, boolean streaming, DataflowPipelineOptions options) {
2735-
if (!streaming && DoFnSignatures.usesMultimapState(fn)) {
2736-
throw new UnsupportedOperationException(
2737-
String.format(
2738-
"%s does not currently support %s in batch mode",
2739-
DataflowRunner.class.getSimpleName(), MultimapState.class.getSimpleName()));
2740-
}
27412732
if (streaming && DoFnSignatures.requiresTimeSortedInput(fn)) {
27422733
throw new UnsupportedOperationException(
27432734
String.format(
27442735
"%s does not currently support @RequiresTimeSortedInput in streaming mode.",
27452736
DataflowRunner.class.getSimpleName()));
27462737
}
27472738
boolean isUnifiedWorker = useUnifiedWorker(options);
2748-
2749-
if (DoFnSignatures.usesMultimapState(fn) && isUnifiedWorker) {
2750-
throw new UnsupportedOperationException(
2751-
String.format(
2752-
"%s does not currently support %s running using streaming on unified worker",
2753-
DataflowRunner.class.getSimpleName(), MultimapState.class.getSimpleName()));
2754-
}
2755-
if (DoFnSignatures.usesSetState(fn) && streaming && isUnifiedWorker) {
2756-
throw new UnsupportedOperationException(
2757-
String.format(
2758-
"%s does not currently support %s when using streaming on unified worker",
2759-
DataflowRunner.class.getSimpleName(), SetState.class.getSimpleName()));
2760-
}
2761-
if (DoFnSignatures.usesMapState(fn) && streaming && isUnifiedWorker) {
2762-
throw new UnsupportedOperationException(
2763-
String.format(
2764-
"%s does not currently support %s when using streaming on unified worker",
2765-
DataflowRunner.class.getSimpleName(), MapState.class.getSimpleName()));
2766-
}
27672739
if (DoFnSignatures.usesBundleFinalizer(fn) && !isUnifiedWorker) {
27682740
throw new UnsupportedOperationException(
27692741
String.format(

runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,11 @@
133133
import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
134134
import org.apache.beam.sdk.runners.TransformHierarchy;
135135
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
136+
import org.apache.beam.sdk.state.BagState;
137+
import org.apache.beam.sdk.state.MapState;
138+
import org.apache.beam.sdk.state.MultimapState;
139+
import org.apache.beam.sdk.state.OrderedListState;
140+
import org.apache.beam.sdk.state.SetState;
136141
import org.apache.beam.sdk.state.StateSpec;
137142
import org.apache.beam.sdk.state.StateSpecs;
138143
import org.apache.beam.sdk.state.ValueState;
@@ -2511,7 +2516,7 @@ public void testEnableAllowDuplicatesForRedistributeWithALO() throws IOException
25112516
options.setDataflowServiceOptions(ImmutableList.of("streaming_mode_at_least_once"));
25122517
Pipeline pipeline = Pipeline.create(options);
25132518

2514-
ImmutableList<KV<String, Integer>> abitraryKVs =
2519+
ImmutableList<KV<String, Integer>> arbitraryKVs =
25152520
ImmutableList.of(
25162521
KV.of("k1", 3),
25172522
KV.of("k5", Integer.MAX_VALUE),
@@ -2522,7 +2527,7 @@ public void testEnableAllowDuplicatesForRedistributeWithALO() throws IOException
25222527
KV.of("k3", 0));
25232528
PCollection<KV<String, Integer>> input =
25242529
pipeline.apply(
2525-
Create.of(abitraryKVs).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())));
2530+
Create.of(arbitraryKVs).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())));
25262531
// The allowDuplicates for Redistribute is false by default.
25272532
PCollection<KV<String, Integer>> output = input.apply(Redistribute.byKey());
25282533
pipeline.run();
@@ -2684,4 +2689,75 @@ public Writer<Void, Object> createWriter() {
26842689
};
26852690
}
26862691
}
2692+
2693+
@Test
2694+
public void testBatchStateSupported() throws IOException {
2695+
DataflowPipelineOptions options = buildPipelineOptions();
2696+
options.setRunner(DataflowRunner.class);
2697+
Pipeline p = Pipeline.create(options);
2698+
p.apply(Create.of(KV.of(13, 42)))
2699+
.apply(
2700+
ParDo.of(
2701+
new DoFn<KV<Integer, Integer>, Void>() {
2702+
@StateId("value")
2703+
private final StateSpec<ValueState<Void>> valueState = StateSpecs.value();
2704+
2705+
@StateId("bag")
2706+
private final StateSpec<BagState<Void>> bagState = StateSpecs.bag();
2707+
2708+
@StateId("set")
2709+
private final StateSpec<SetState<Void>> setState = StateSpecs.set();
2710+
2711+
@StateId("map")
2712+
private final StateSpec<MapState<Void, Void>> mapState = StateSpecs.map();
2713+
2714+
@StateId("multimap")
2715+
private final StateSpec<MultimapState<Void, Void>> multimapState =
2716+
StateSpecs.multimap();
2717+
2718+
@StateId("ordered list")
2719+
private final StateSpec<OrderedListState<Void>> orderedListState =
2720+
StateSpecs.orderedList(VoidCoder.of());
2721+
2722+
@ProcessElement
2723+
public void process() {}
2724+
}));
2725+
p.run();
2726+
}
2727+
2728+
@Test
2729+
public void testStreamingStateSupported() throws IOException {
2730+
DataflowPipelineOptions options = buildPipelineOptions();
2731+
options.setRunner(DataflowRunner.class);
2732+
options.setStreaming(true);
2733+
Pipeline p = Pipeline.create(options);
2734+
p.apply(Create.of(KV.of(13, 42)))
2735+
.apply(
2736+
ParDo.of(
2737+
new DoFn<KV<Integer, Integer>, Void>() {
2738+
@StateId("value")
2739+
private final StateSpec<ValueState<Void>> valueState = StateSpecs.value();
2740+
2741+
@StateId("bag")
2742+
private final StateSpec<BagState<Void>> bagState = StateSpecs.bag();
2743+
2744+
@StateId("set")
2745+
private final StateSpec<SetState<Void>> setState = StateSpecs.set();
2746+
2747+
@StateId("map")
2748+
private final StateSpec<MapState<Void, Void>> mapState = StateSpecs.map();
2749+
2750+
@StateId("multimap")
2751+
private final StateSpec<MultimapState<Void, Void>> multimapState =
2752+
StateSpecs.multimap();
2753+
2754+
@StateId("ordered list")
2755+
private final StateSpec<OrderedListState<Void>> orderedListState =
2756+
StateSpecs.orderedList(VoidCoder.of());
2757+
2758+
@ProcessElement
2759+
public void process() {}
2760+
}));
2761+
p.run();
2762+
}
26872763
}

0 commit comments

Comments
 (0)