@@ -6544,6 +6544,83 @@ _ = (p | 'Read per user' >> ReadPerUser()
65446544{{< code_sample "sdks/go/examples/snippets/04transforms.go" bag_state >}}
65456545{{< /highlight >}}
65466546
6547+ #### SetState
6548+
6549+ A common use case for state is to accumulate unique elements. ` SetState ` allows for accumulating an unordered set
6550+ of elements.
6551+
6552+ {{< highlight java >}}
6553+ PCollection<KV<String, ValueT>> perUser = readPerUser();
6554+ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
6555+ @StateId ("state") private final StateSpec<SetState<ValueT >> uniqueElements = StateSpecs.bag();
6556+
6557+ @ProcessElement public void process(
6558+ @Element KV<String, ValueT> element,
6559+ @StateId ("state") SetState<ValueT > state) {
6560+ // Add the current element to the set state for this key.
6561+ state.add(element.getValue());
6562+ if (shouldFetch()) {
6563+ // Occasionally we fetch and process the values.
6564+ Iterable<ValueT > values = state.read();
6565+ processValues(values);
6566+ state.clear(); // Clear the state for this key.
6567+ }
6568+ }
6569+ }));
6570+ {{< /highlight >}}
6571+ {{< highlight py >}}
6572+ class SetStateDoFn(DoFn):
6573+ UNIQUE_ELEMENTS = SetStateSpec('buffer', coders.VarIntCoder())
6574+
6575+ def process(self, element_pair, state=DoFn.StateParam(UNIQUE_ELEMENTS)):
6576+ state.add(element_pair[ 1] )
6577+ if should_fetch():
6578+ unique_elements = list(state.read())
6579+ process_values(unique_elements)
6580+ state.clear()
6581+
6582+ _ = (p | 'Read per user' >> ReadPerUser()
6583+ | 'Set state pardo' >> beam.ParDo(SetStateDoFn()))
6584+ {{< /highlight >}}
6585+
6586+ #### OrderListState
6587+
6588+ ` OrderListState ` state that accumulate elements in an ordered List.
6589+
6590+ {{< highlight java >}}
6591+ PCollection<KV<String, ValueT>> perUser = readPerUser();
6592+ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
6593+ @StateId ("state") private final StateSpec<OrderedListState<ValueT >> uniqueElements = StateSpecs.bag();
6594+
6595+ @ProcessElement public void process(
6596+ @Element KV<String, ValueT> element,
6597+ @StateId ("state") SetState<ValueT > state) {
6598+ // Add the current element to the set state for this key.
6599+ state.add(element.getValue());
6600+ if (shouldFetch()) {
6601+ // Occasionally we fetch and process the values.
6602+ Iterable<ValueT > values = state.read();
6603+ processValues(values);
6604+ state.clear(); // Clear the state for this key.
6605+ }
6606+ }
6607+ }));
6608+ {{< /highlight >}}
6609+ {{< highlight py >}}
6610+ class OrderedListStateDoFn(DoFn):
6611+ STATE_ELEMENTS = OrderedListStateSpec('buffer', coders.ListCoder())
6612+
6613+ def process(self, element_pair, state=DoFn.StateParam(STATE_ELEMENTS)):
6614+ state.add(element_pair[ 1] )
6615+ if should_fetch():
6616+ elements = list(state.read())
6617+ process_values(elements)
6618+ state.clear()
6619+
6620+ _ = (p | 'Read per user' >> ReadPerUser()
6621+ | 'Set state pardo' >> beam.ParDo(OrderedListStateDoFn()))
6622+ {{< /highlight >}}
6623+
65476624### 11.2. Deferred state reads {#deferred-state-reads}
65486625
65496626When a ` DoFn ` contains multiple state specifications, reading each one in order can be slow. Calling the ` read() ` function
0 commit comments