Skip to content

Commit d567a9d

Browse files
committed
[FLINK-38462] Introduce state backend type identifier
1 parent 1c6574d commit d567a9d

File tree

18 files changed

+111
-0
lines changed

18 files changed

+111
-0
lines changed

flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,4 +338,11 @@ org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingSta
338338
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<
339339
IN, ACC, OUT>
340340
stateProperties);
341+
342+
/**
343+
* @return fixed lower-case string identifying the type of the underlying state backend, e.g.
344+
* rocksdb, hashmap, forst, batch.
345+
*/
346+
@Experimental
347+
String getBackendTypeIdentifier();
341348
}

flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,11 @@ org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingSta
287287
throw new UnsupportedOperationException();
288288
}
289289

290+
@Override
291+
public String getBackendTypeIdentifier() {
292+
return "mock";
293+
}
294+
290295
private class CountingIterator<T> implements Iterator<T> {
291296

292297
private final Iterator<T> iterator;

flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/MultiStateKeyIteratorTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,11 @@ public <N> Stream<Tuple2<Integer, N>> getKeysAndNamespaces(String state) {
324324
"Operations other than getKeys() are not supported on this testing StateBackend.");
325325
}
326326

327+
@Override
328+
public String getBackendTypeIdentifier() {
329+
return "test";
330+
}
331+
327332
@Nonnull
328333
@Override
329334
public SavepointResources<Integer> savepoint() throws UnsupportedOperationException {

flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.runtime.state;
2020

21+
import org.apache.flink.annotation.Experimental;
2122
import org.apache.flink.annotation.Internal;
2223
import org.apache.flink.api.common.state.InternalCheckpointListener;
2324
import org.apache.flink.api.common.state.v2.State;
@@ -140,4 +141,11 @@ default boolean isSafeToReuseKVState() {
140141

141142
@Override
142143
void dispose();
144+
145+
/**
146+
* @return fixed lower-case string identifying the type of the underlying state backend, e.g.
147+
* rocksdb, hashmap, or unknown.
148+
*/
149+
@Experimental
150+
String getBackendTypeIdentifier();
143151
}

flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,15 @@ org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingSta
219219
}
220220
}
221221

222+
@Override
223+
public String getBackendTypeIdentifier() {
224+
if (keyedStateBackend != null) {
225+
return keyedStateBackend.getBackendTypeIdentifier();
226+
} else {
227+
return asyncKeyedStateBackend.getBackendTypeIdentifier();
228+
}
229+
}
230+
222231
protected <S extends org.apache.flink.api.common.state.v2.State, SV> S getPartitionedState(
223232
org.apache.flink.api.common.state.v2.StateDescriptor<SV> stateDescriptor)
224233
throws Exception {

flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.runtime.state;
2020

21+
import org.apache.flink.annotation.Experimental;
2122
import org.apache.flink.api.common.state.State;
2223
import org.apache.flink.api.common.state.StateDescriptor;
2324
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -166,6 +167,13 @@ default boolean isSafeToReuseKVState() {
166167
return false;
167168
}
168169

170+
/**
171+
* @return fixed lower-case string identifying the type of the underlying state backend, e.g.
172+
* rocksdb, hashmap, forst, batch.
173+
*/
174+
@Experimental
175+
String getBackendTypeIdentifier();
176+
169177
/** Listener is given a callback when {@link #setCurrentKey} is called (key context changes). */
170178
@FunctionalInterface
171179
interface KeySelectionListener<K> {

flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.flink.runtime.state.SnapshotResult;
4747
import org.apache.flink.runtime.state.SnapshotStrategy;
4848
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
49+
import org.apache.flink.runtime.state.StateBackendLoader;
4950
import org.apache.flink.runtime.state.StateEntry;
5051
import org.apache.flink.runtime.state.StateSnapshotRestore;
5152
import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
@@ -406,6 +407,11 @@ public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
406407
return table.getKeysAndNamespaces();
407408
}
408409

410+
@Override
411+
public String getBackendTypeIdentifier() {
412+
return StateBackendLoader.HASHMAP_STATE_BACKEND_NAME;
413+
}
414+
409415
@Override
410416
@Nonnull
411417
public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(

flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ public void switchContext(@Nullable RecordContext<K> context) {
132132
@Override
133133
public void dispose() {}
134134

135+
@Override
136+
public String getBackendTypeIdentifier() {
137+
return keyedStateBackend.getBackendTypeIdentifier();
138+
}
139+
135140
@Override
136141
public void close() throws IOException {}
137142

flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,11 @@ public boolean deregisterKeySelectionListener(KeySelectionListener<K> listener)
229229
return keySelectionListeners.remove(listener);
230230
}
231231

232+
@Override
233+
public String getBackendTypeIdentifier() {
234+
return "batch";
235+
}
236+
232237
@Nonnull
233238
@Override
234239
public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(

flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,11 @@ public void dispose() {
171171
// do nothing
172172
}
173173

174+
@Override
175+
public String getBackendTypeIdentifier() {
176+
return "test";
177+
}
178+
174179
@Override
175180
public void close() {
176181
// do nothing
@@ -328,6 +333,11 @@ public void dispose() {
328333
delegatedKeyedStateBackend.dispose();
329334
}
330335

336+
@Override
337+
public String getBackendTypeIdentifier() {
338+
return "test";
339+
}
340+
331341
@Override
332342
public void close() throws IOException {
333343
super.close();

0 commit comments

Comments
 (0)