-
Notifications
You must be signed in to change notification settings - Fork 456
Closed
Copy link
Labels
Milestone
Description
Search before asking
- I searched in the issues and found nothing similar.
Fluss version
0.8.0 (latest release)
Please describe the bug 🐞
PR #1375 changes the KV snapshot commit to ZooKeeper to be asynchronous by moving the call to completedSnapshotStore.add(completedSnapshot) into an ioExecutor.
However, CompletedSnapshotStore is not thread-safe, and this change introduces a concurrency issue, as the store may now be accessed simultaneously from different I/O executor threads.
We should make CompletedSnapshotStore thread-safe. Otherwise, race conditions or data corruption may occur.
See CoordinatorEventProcessor#tryProcessCommitKvSnapshot:
// commit the kv snapshot asynchronously
ioExecutor.execute(
() -> {
try {
TableBucket tb = event.getTableBucket();
CompletedSnapshot completedSnapshot =
event.getAddCompletedSnapshotData().getCompletedSnapshot();
// add completed snapshot
CompletedSnapshotStore completedSnapshotStore =
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(tb);
// this involves IO operation (ZK), so we do it in ioExecutor
completedSnapshotStore.add(completedSnapshot);
coordinatorEventManager.put(
new NotifyKvSnapshotOffsetEvent(
tb, completedSnapshot.getLogOffset()));
callback.complete(new CommitKvSnapshotResponse());
} catch (Exception e) {
callback.completeExceptionally(e);
}
});Solution
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!