Skip to content

Commit 0c10658

Browse files
authored
[Dataflow Streaming] Intern encoded tags across keys (#36313)
* Introduce InternedByteString class
1 parent 0f6b605 commit 0c10658

File tree

14 files changed

+468
-311
lines changed

14 files changed

+468
-311
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.dataflow.worker.util.common.worker;
19+
20+
import java.util.Objects;
21+
import javax.annotation.Nullable;
22+
import javax.annotation.concurrent.ThreadSafe;
23+
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
24+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Interner;
25+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Interners;
26+
27+
/*
28+
* Weakly Interned ByteStrings.
29+
* Used to save memory and GC pressure by sharing ByteStrings,
30+
* that are repeated commonly. Encoded stateTags are an example that are Interned.
31+
* */
32+
@ThreadSafe
33+
public class InternedByteString {
34+
35+
private static final int MAP_CONCURRENCY =
36+
Math.max(4, Runtime.getRuntime().availableProcessors());
37+
private static final Interner<InternedByteString> ENCODED_KEY_INTERNER =
38+
Interners.newBuilder().weak().concurrencyLevel(MAP_CONCURRENCY).build();
39+
40+
// ints don't tear and it is safe to cache without synchronization.
41+
// Defaults to 0.
42+
private int hashCode;
43+
private final ByteString byteString;
44+
45+
private InternedByteString(ByteString byteString) {
46+
this.byteString = byteString;
47+
}
48+
49+
public ByteString byteString() {
50+
return byteString;
51+
}
52+
53+
@Override
54+
public int hashCode() {
55+
if (hashCode == 0) {
56+
hashCode = byteString.hashCode();
57+
}
58+
return hashCode;
59+
}
60+
61+
@Override
62+
public boolean equals(@Nullable Object o) {
63+
if (this == o) {
64+
return true;
65+
}
66+
67+
if (!(o instanceof InternedByteString)) {
68+
return false;
69+
}
70+
InternedByteString that = (InternedByteString) o;
71+
return hashCode() == that.hashCode() && Objects.equals(byteString, that.byteString);
72+
}
73+
74+
public static InternedByteString of(ByteString value) {
75+
return ENCODED_KEY_INTERNER.intern(new InternedByteString(value));
76+
}
77+
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java

Lines changed: 52 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.beam.runners.core.StateTable;
2525
import org.apache.beam.runners.core.StateTag;
2626
import org.apache.beam.runners.core.StateTags;
27+
import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
2728
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache.ForKeyAndFamily;
2829
import org.apache.beam.sdk.coders.BooleanCoder;
2930
import org.apache.beam.sdk.coders.Coder;
@@ -36,6 +37,7 @@
3637
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
3738

3839
final class CachingStateTable extends StateTable {
40+
3941
private final String stateFamily;
4042
private final WindmillStateReader reader;
4143
private final WindmillStateCache.ForKeyAndFamily cache;
@@ -84,23 +86,14 @@ protected StateTag.StateBinder binderForNamespace(StateNamespace namespace, Stat
8486
public <T> BagState<T> bindBag(StateTag<BagState<T>> address, Coder<T> elemCoder) {
8587
StateTag<BagState<T>> resolvedAddress =
8688
isSystemTable ? StateTags.makeSystemTagInternal(address) : address;
89+
InternedByteString encodedKey = windmillStateTagUtil.encodeKey(namespace, resolvedAddress);
8790

88-
WindmillBag<T> result =
89-
cache
90-
.get(namespace, resolvedAddress)
91-
.map(bagState -> (WindmillBag<T>) bagState)
92-
.orElseGet(
93-
() ->
94-
new WindmillBag<>(
95-
namespace,
96-
resolvedAddress,
97-
stateFamily,
98-
elemCoder,
99-
isNewKey,
100-
windmillStateTagUtil));
101-
102-
result.initializeForWorkItem(reader, scopedReadStateSupplier);
103-
return result;
91+
@Nullable WindmillBag<T> bag = (WindmillBag<T>) cache.get(namespace, encodedKey);
92+
if (bag == null) {
93+
bag = new WindmillBag<>(namespace, encodedKey, stateFamily, elemCoder, isNewKey);
94+
}
95+
bag.initializeForWorkItem(reader, scopedReadStateSupplier);
96+
return bag;
10497
}
10598

10699
@Override
@@ -123,20 +116,13 @@ public <KeyT, ValueT> AbstractWindmillMap<KeyT, ValueT> bindMap(
123116
new WindmillMapViaMultimap<>(
124117
bindMultimap(internalMultimapAddress, keyCoder, valueCoder));
125118
} else {
126-
result =
127-
cache
128-
.get(namespace, spec)
129-
.map(mapState -> (AbstractWindmillMap<KeyT, ValueT>) mapState)
130-
.orElseGet(
131-
() ->
132-
new WindmillMap<>(
133-
namespace,
134-
spec,
135-
stateFamily,
136-
keyCoder,
137-
valueCoder,
138-
isNewKey,
139-
windmillStateTagUtil));
119+
InternedByteString encodedKey = windmillStateTagUtil.encodeKey(namespace, spec);
120+
result = (AbstractWindmillMap<KeyT, ValueT>) cache.get(namespace, encodedKey);
121+
if (result == null) {
122+
result =
123+
new WindmillMap<>(
124+
namespace, encodedKey, stateFamily, keyCoder, valueCoder, isNewKey);
125+
}
140126
}
141127
result.initializeForWorkItem(reader, scopedReadStateSupplier);
142128
return result;
@@ -147,20 +133,14 @@ public <KeyT, ValueT> WindmillMultimap<KeyT, ValueT> bindMultimap(
147133
StateTag<MultimapState<KeyT, ValueT>> spec,
148134
Coder<KeyT> keyCoder,
149135
Coder<ValueT> valueCoder) {
136+
InternedByteString encodedKey = windmillStateTagUtil.encodeKey(namespace, spec);
150137
WindmillMultimap<KeyT, ValueT> result =
151-
cache
152-
.get(namespace, spec)
153-
.map(multimapState -> (WindmillMultimap<KeyT, ValueT>) multimapState)
154-
.orElseGet(
155-
() ->
156-
new WindmillMultimap<>(
157-
namespace,
158-
spec,
159-
stateFamily,
160-
keyCoder,
161-
valueCoder,
162-
isNewKey,
163-
windmillStateTagUtil));
138+
(WindmillMultimap<KeyT, ValueT>) cache.get(namespace, encodedKey);
139+
if (result == null) {
140+
result =
141+
new WindmillMultimap<>(
142+
namespace, encodedKey, stateFamily, keyCoder, valueCoder, isNewKey);
143+
}
164144
result.initializeForWorkItem(reader, scopedReadStateSupplier);
165145
return result;
166146
}
@@ -169,21 +149,21 @@ public <KeyT, ValueT> WindmillMultimap<KeyT, ValueT> bindMultimap(
169149
public <T> OrderedListState<T> bindOrderedList(
170150
StateTag<OrderedListState<T>> spec, Coder<T> elemCoder) {
171151
StateTag<OrderedListState<T>> specOrInternalTag = addressOrInternalTag(spec);
152+
InternedByteString encodedKey =
153+
windmillStateTagUtil.encodeKey(namespace, specOrInternalTag);
172154

173-
WindmillOrderedList<T> result =
174-
cache
175-
.get(namespace, specOrInternalTag)
176-
.map(orderedList -> (WindmillOrderedList<T>) orderedList)
177-
.orElseGet(
178-
() ->
179-
new WindmillOrderedList<>(
180-
Optional.ofNullable(derivedStateTable).orElse(CachingStateTable.this),
181-
namespace,
182-
specOrInternalTag,
183-
stateFamily,
184-
elemCoder,
185-
isNewKey,
186-
windmillStateTagUtil));
155+
WindmillOrderedList<T> result = (WindmillOrderedList<T>) cache.get(namespace, encodedKey);
156+
if (result == null) {
157+
result =
158+
new WindmillOrderedList<>(
159+
Optional.ofNullable(derivedStateTable).orElse(CachingStateTable.this),
160+
namespace,
161+
encodedKey,
162+
specOrInternalTag,
163+
stateFamily,
164+
elemCoder,
165+
isNewKey);
166+
}
187167

188168
result.initializeForWorkItem(reader, scopedReadStateSupplier);
189169
return result;
@@ -193,21 +173,15 @@ public <T> OrderedListState<T> bindOrderedList(
193173
public WatermarkHoldState bindWatermark(
194174
StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) {
195175
StateTag<WatermarkHoldState> addressOrInternalTag = addressOrInternalTag(address);
176+
InternedByteString encodedKey =
177+
windmillStateTagUtil.encodeKey(namespace, addressOrInternalTag);
196178

197-
WindmillWatermarkHold result =
198-
cache
199-
.get(namespace, addressOrInternalTag)
200-
.map(watermarkHold -> (WindmillWatermarkHold) watermarkHold)
201-
.orElseGet(
202-
() ->
203-
new WindmillWatermarkHold(
204-
namespace,
205-
address,
206-
stateFamily,
207-
timestampCombiner,
208-
isNewKey,
209-
windmillStateTagUtil));
210-
179+
WindmillWatermarkHold result = (WindmillWatermarkHold) cache.get(namespace, encodedKey);
180+
if (result == null) {
181+
result =
182+
new WindmillWatermarkHold(
183+
namespace, encodedKey, stateFamily, timestampCombiner, isNewKey);
184+
}
211185
result.initializeForWorkItem(reader, scopedReadStateSupplier);
212186
return result;
213187
}
@@ -248,21 +222,13 @@ CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
248222
@Override
249223
public <T> ValueState<T> bindValue(StateTag<ValueState<T>> address, Coder<T> coder) {
250224
StateTag<ValueState<T>> addressOrInternalTag = addressOrInternalTag(address);
225+
InternedByteString encodedKey =
226+
windmillStateTagUtil.encodeKey(namespace, addressOrInternalTag);
251227

252-
WindmillValue<T> result =
253-
cache
254-
.get(namespace, addressOrInternalTag)
255-
.map(value -> (WindmillValue<T>) value)
256-
.orElseGet(
257-
() ->
258-
new WindmillValue<>(
259-
namespace,
260-
addressOrInternalTag,
261-
stateFamily,
262-
coder,
263-
isNewKey,
264-
windmillStateTagUtil));
265-
228+
WindmillValue<T> result = (WindmillValue<T>) cache.get(namespace, encodedKey);
229+
if (result == null) {
230+
result = new WindmillValue<>(namespace, encodedKey, stateFamily, coder, isNewKey);
231+
}
266232
result.initializeForWorkItem(reader, scopedReadStateSupplier);
267233
return result;
268234
}
@@ -274,6 +240,7 @@ private <T extends State> StateTag<T> addressOrInternalTag(StateTag<T> address)
274240
}
275241

276242
static class Builder {
243+
277244
private final String stateFamily;
278245
private final WindmillStateReader reader;
279246
private final WindmillStateCache.ForKeyAndFamily cache;

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.concurrent.ExecutionException;
2525
import java.util.concurrent.Future;
2626
import org.apache.beam.runners.core.StateNamespace;
27-
import org.apache.beam.runners.core.StateTag;
27+
import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
2828
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
2929
import org.apache.beam.sdk.coders.Coder;
3030
import org.apache.beam.sdk.state.BagState;
@@ -41,8 +41,7 @@
4141
public class WindmillBag<T> extends SimpleWindmillState implements BagState<T> {
4242

4343
private final StateNamespace namespace;
44-
private final StateTag<BagState<T>> address;
45-
private final ByteString stateKey;
44+
private final InternedByteString stateKey;
4645
private final String stateFamily;
4746
private final Coder<T> elemCoder;
4847

@@ -60,14 +59,12 @@ public class WindmillBag<T> extends SimpleWindmillState implements BagState<T> {
6059

6160
WindmillBag(
6261
StateNamespace namespace,
63-
StateTag<BagState<T>> address,
62+
InternedByteString encodeKey,
6463
String stateFamily,
6564
Coder<T> elemCoder,
66-
boolean isNewKey,
67-
WindmillStateTagUtil windmillStateTagUtil) {
65+
boolean isNewKey) {
6866
this.namespace = namespace;
69-
this.address = address;
70-
this.stateKey = windmillStateTagUtil.encodeKey(namespace, address);
67+
this.stateKey = encodeKey;
7168
this.stateFamily = stateFamily;
7269
this.elemCoder = elemCoder;
7370
if (isNewKey) {
@@ -183,7 +180,7 @@ public Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKeyA
183180
}
184181

185182
if (bagUpdatesBuilder != null) {
186-
bagUpdatesBuilder.setTag(stateKey).setStateFamily(stateFamily);
183+
bagUpdatesBuilder.setTag(stateKey.byteString()).setStateFamily(stateFamily);
187184
}
188185

189186
if (cachedValues != null) {
@@ -194,7 +191,7 @@ public Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKeyA
194191
}
195192
// We now know the complete bag contents, and any read on it will yield a
196193
// cached value, so cache it for future reads.
197-
cache.put(namespace, address, this, encodedSize + stateKey.size());
194+
cache.put(namespace, stateKey, this, encodedSize + stateKey.byteString().size());
198195
}
199196

200197
// Don't reuse the localAdditions object; we don't want future changes to it to
@@ -205,6 +202,8 @@ public Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKeyA
205202
}
206203

207204
private Future<Iterable<T>> getFuture() {
208-
return cachedValues != null ? null : reader.bagFuture(stateKey, stateFamily, elemCoder);
205+
return cachedValues != null
206+
? null
207+
: reader.bagFuture(stateKey.byteString(), stateFamily, elemCoder);
209208
}
210209
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillCombiningState.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.beam.runners.core.StateNamespace;
2727
import org.apache.beam.runners.core.StateTag;
2828
import org.apache.beam.runners.core.StateTags;
29+
import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
2930
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
3031
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache.ForKeyAndFamily;
3132
import org.apache.beam.sdk.coders.Coder;
@@ -61,20 +62,13 @@ class WindmillCombiningState<InputT, AccumT, OutputT> extends WindmillState
6162
boolean isNewKey,
6263
WindmillStateTagUtil windmillStateTagUtil) {
6364
StateTag<BagState<AccumT>> internalBagAddress = StateTags.convertToBagTagInternal(address);
64-
this.bag =
65-
cache
66-
.get(namespace, internalBagAddress)
67-
.map(state -> (WindmillBag<AccumT>) state)
68-
.orElseGet(
69-
() ->
70-
new WindmillBag<>(
71-
namespace,
72-
internalBagAddress,
73-
stateFamily,
74-
accumCoder,
75-
isNewKey,
76-
windmillStateTagUtil));
65+
InternedByteString encodeKey = windmillStateTagUtil.encodeKey(namespace, internalBagAddress);
7766

67+
WindmillBag<AccumT> bag = (WindmillBag<AccumT>) cache.get(namespace, encodeKey);
68+
if (bag == null) {
69+
bag = new WindmillBag<>(namespace, encodeKey, stateFamily, accumCoder, isNewKey);
70+
}
71+
this.bag = bag;
7872
this.combineFn = combineFn;
7973
this.localAdditionsAccumulator = combineFn.createAccumulator();
8074
this.hasLocalAdditions = false;

0 commit comments

Comments
 (0)