Skip to content

Commit e4b97d1

Browse files
authored
Merge pull request #14 from SLNE-Development/codex/github-mention-add-removeif-to-syncset-and-synclist
SyncList: use atomic ReplaceAll delta for removeIf to fix replication ordering
2 parents 1132e9f + 0c4d12f commit e4b97d1

File tree

1 file changed

+19
-14
lines changed
  • src/main/kotlin/dev/slne/surf/redis/sync/list

1 file changed

+19
-14
lines changed

src/main/kotlin/dev/slne/surf/redis/sync/list/SyncList.kt

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -175,10 +175,9 @@ class SyncList<T : Any> internal constructor(
175175
/**
176176
* Removes all elements from the list that match the given [predicate] and replicates the changes.
177177
*
178-
* This operation uses atomic replication: the entire list is cleared and remaining elements
179-
* are re-added. This ensures consistency across distributed nodes since the operation is
180-
* replicated as a Clear delta followed by individual Add deltas, avoiding index-based
181-
* replication issues.
178+
* This operation uses atomic replication: the entire list is replaced in a single delta.
179+
* This ensures consistency across distributed nodes since the operation cannot interleave
180+
* with other deltas in the middle of a Clear/Add sequence, avoiding ordering divergence.
182181
*
183182
* @param predicate the predicate to test each element against
184183
* @return `true` if any elements were removed, `false` if no elements matched the predicate
@@ -195,16 +194,10 @@ class SyncList<T : Any> internal constructor(
195194

196195
if (!hadRemovals) return false
197196

198-
// Replicate using Clear + multiple Add operations for atomic consistency
197+
// Replicate using a single replace-all delta for atomic consistency
199198
scope.launch {
200-
// First, clear the list on all nodes
201-
publishLocalDelta(Delta.Clear)
202-
203-
// Then re-add remaining elements in order
204-
remaining.forEach { element ->
205-
val elementJson = api.json.encodeToString(elementSerializer, element)
206-
publishLocalDelta(Delta.Add(elementJson))
207-
}
199+
val snapshotJson = api.json.encodeToString(snapshotSerializer, remaining)
200+
publishLocalDelta(Delta.ReplaceAll(snapshotJson))
208201
}
209202

210203
// Notify listeners - single Cleared event for simplicity
@@ -378,6 +371,15 @@ class SyncList<T : Any> internal constructor(
378371
lock.write { list.clear() }
379372
notifyListeners(listeners, SyncListChange.Cleared)
380373
}
374+
375+
is Delta.ReplaceAll -> {
376+
val snapshot = api.json.decodeFromString(snapshotSerializer, delta.snapshotJson)
377+
lock.write {
378+
list.clear()
379+
list.addAll(snapshot)
380+
}
381+
notifyListeners(listeners, SyncListChange.Cleared)
382+
}
381383
}
382384
}
383385

@@ -403,5 +405,8 @@ class SyncList<T : Any> internal constructor(
403405

404406
@Serializable
405407
data object Clear : Delta
408+
409+
@Serializable
410+
data class ReplaceAll(val snapshotJson: String) : Delta
406411
}
407-
}
412+
}

0 commit comments

Comments
 (0)