Skip to content

Commit 940d746

Browse files
[FLINK-37398][state] Add proper message when state processor API is used with changelog
1 parent b633f98 commit 940d746

File tree

2 files changed

+8
-5
lines changed

2 files changed

+8
-5
lines changed

flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public StateChangelogStorage<?> getStateChangelogStorage() {
124124
@Override
125125
public StateChangelogStorageView<?> getStateChangelogStorageView(
126126
Configuration configuration, ChangelogStateHandle changelogStateHandle) {
127-
return null;
127+
throw new UnsupportedOperationException("State processor api does not support changelog.");
128128
}
129129

130130
@Nullable

flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
2828
import org.apache.flink.runtime.state.changelog.StateChange;
2929
import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
30+
import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
3031
import org.apache.flink.state.changelog.ChangelogKeyedStateBackend;
3132
import org.apache.flink.util.CloseableIterator;
3233
import org.apache.flink.util.Preconditions;
@@ -39,6 +40,8 @@
3940
import java.util.Objects;
4041
import java.util.stream.Collectors;
4142

43+
import static java.util.Objects.requireNonNull;
44+
4245
/**
4346
* Restores {@link ChangelogKeyedStateBackend} from the provided {@link ChangelogStateBackendHandle
4447
* handles}.
@@ -97,11 +100,11 @@ private static <T extends ChangelogStateHandle> void readBackendHandle(
97100
Map<Short, StateID> stateIds = new HashMap<>();
98101
for (ChangelogStateHandle changelogHandle :
99102
backendHandle.getNonMaterializedStateHandles()) {
103+
StateChangelogStorageView<?> stateChangelogStorageView =
104+
taskStateManager.getStateChangelogStorageView(configuration, changelogHandle);
105+
requireNonNull(stateChangelogStorageView, "Changelog storage view must not be null.");
100106
StateChangelogHandleReader<T> changelogHandleReader =
101-
(StateChangelogHandleReader<T>)
102-
taskStateManager
103-
.getStateChangelogStorageView(configuration, changelogHandle)
104-
.createReader();
107+
(StateChangelogHandleReader<T>) stateChangelogStorageView.createReader();
105108
try (CloseableIterator<StateChange> changes =
106109
changelogHandleReader.getChanges((T) changelogHandle)) {
107110
while (changes.hasNext()) {

0 commit comments

Comments
 (0)