Skip to content

Commit 1daa50d

Browse files
authored
Merge pull request #12 from SLNE-Development/copilot/add-removeif-to-syncset-synclists
Add removeIf to SyncSet and SyncList
2 parents 1ec4771 + e4b97d1 commit 1daa50d

File tree

4 files changed

+436
-1
lines changed

4 files changed

+436
-1
lines changed

src/main/kotlin/dev/slne/surf/redis/sync/list/SyncList.kt

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,40 @@ class SyncList<T : Any> internal constructor(
172172
return removed
173173
}
174174

175+
/**
176+
* Removes all elements from the list that match the given [predicate] and replicates the changes.
177+
*
178+
* This operation uses atomic replication: the entire list is replaced in a single delta.
179+
* This ensures consistency across distributed nodes since the operation cannot interleave
180+
* with other deltas in the middle of a Clear/Add sequence, avoiding ordering divergence.
181+
*
182+
* @param predicate the predicate to test each element against
183+
* @return `true` if any elements were removed, `false` if no elements matched the predicate
184+
*/
185+
fun removeIf(predicate: (T) -> Boolean): Boolean {
186+
// Perform filtering and capture the results under write lock
187+
val (remaining, hadRemovals) = lock.write {
188+
val filtered = list.filterNot(predicate)
189+
val removals = list.size != filtered.size
190+
list.clear()
191+
list.addAll(filtered)
192+
filtered to removals
193+
}
194+
195+
if (!hadRemovals) return false
196+
197+
// Replicate using a single replace-all delta for atomic consistency
198+
scope.launch {
199+
val snapshotJson = api.json.encodeToString(snapshotSerializer, remaining)
200+
publishLocalDelta(Delta.ReplaceAll(snapshotJson))
201+
}
202+
203+
// Notify listeners - single Cleared event for simplicity
204+
notifyListeners(listeners, SyncListChange.Cleared)
205+
206+
return true
207+
}
208+
175209
/**
176210
* Clears the list.
177211
*
@@ -188,6 +222,20 @@ class SyncList<T : Any> internal constructor(
188222
notifyListeners(listeners, SyncListChange.Cleared)
189223
}
190224

225+
/**
226+
* Registers a change listener.
227+
*
228+
* The listener is invoked for all local and remote changes.
229+
*/
230+
fun addListener(listener: (SyncListChange) -> Unit) {
231+
listeners += listener
232+
}
233+
234+
/** Removes a previously registered listener. */
235+
fun removeListener(listener: (SyncListChange) -> Unit) {
236+
listeners -= listener
237+
}
238+
191239
override suspend fun loadSnapshot() {
192240
val async = api.connection.async()
193241
val snapshotJson = async.get(dataKey).await()
@@ -323,6 +371,15 @@ class SyncList<T : Any> internal constructor(
323371
lock.write { list.clear() }
324372
notifyListeners(listeners, SyncListChange.Cleared)
325373
}
374+
375+
is Delta.ReplaceAll -> {
376+
val snapshot = api.json.decodeFromString(snapshotSerializer, delta.snapshotJson)
377+
lock.write {
378+
list.clear()
379+
list.addAll(snapshot)
380+
}
381+
notifyListeners(listeners, SyncListChange.Cleared)
382+
}
326383
}
327384
}
328385

@@ -348,5 +405,8 @@ class SyncList<T : Any> internal constructor(
348405

349406
@Serializable
350407
data object Clear : Delta
408+
409+
@Serializable
410+
data class ReplaceAll(val snapshotJson: String) : Delta
351411
}
352-
}
412+
}

src/main/kotlin/dev/slne/surf/redis/sync/set/SyncSet.kt

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,46 @@ class SyncSet<T : Any> internal constructor(
134134
return true
135135
}
136136

137+
/**
138+
* Removes all elements from the set that match the given [predicate] and replicates the changes.
139+
*
140+
* Each matching element is removed under a write lock, and each removal is replicated as a
141+
* separate delta to ensure consistency across nodes. Deltas are published sequentially
142+
* to maintain order.
143+
*
144+
* @param predicate the predicate to test each element against
145+
* @return `true` if any elements were removed, `false` if no elements matched the predicate
146+
*/
147+
fun removeIf(predicate: (T) -> Boolean): Boolean {
148+
val removedElements = mutableListOf<T>()
149+
150+
lock.write {
151+
val iterator = set.iterator()
152+
while (iterator.hasNext()) {
153+
val element = iterator.next()
154+
if (predicate(element)) {
155+
iterator.remove()
156+
removedElements.add(element)
157+
}
158+
}
159+
}
160+
161+
if (removedElements.isEmpty()) return false
162+
163+
// Publish deltas sequentially to maintain order
164+
scope.launch {
165+
removedElements.forEach { element ->
166+
val elementJson = api.json.encodeToString(elementSerializer, element)
167+
publishLocalDelta(Delta.Remove(elementJson))
168+
}
169+
}
170+
171+
removedElements.forEach { element ->
172+
notifyListeners(listeners, SyncSetChange.Removed(element))
173+
}
174+
return true
175+
}
176+
137177
/**
138178
* Clears the set and replicates the change.
139179
*
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
package dev.slne.surf.redis.sync
2+
3+
import dev.slne.surf.redis.RedisApi
4+
import dev.slne.surf.redis.RedisTestBase
5+
import dev.slne.surf.redis.sync.list.SyncListChange
6+
import io.lettuce.core.RedisURI
7+
import kotlinx.coroutines.delay
8+
import kotlinx.coroutines.test.runTest
9+
import kotlinx.serialization.builtins.serializer
10+
import org.junit.jupiter.api.Test
11+
import kotlin.test.assertEquals
12+
import kotlin.test.assertFalse
13+
import kotlin.test.assertTrue
14+
15+
class SyncListTest : RedisTestBase() {
16+
17+
@Test
18+
fun `removeIf removes matching elements`() = runTest {
19+
val syncList = redisApi.createSyncList("test-list-removeif-1", String.serializer())
20+
delay(100) // Allow initialization
21+
22+
// Add elements
23+
syncList.add("apple")
24+
syncList.add("banana")
25+
syncList.add("cherry")
26+
syncList.add("apricot")
27+
delay(100) // Allow replication
28+
29+
assertEquals(4, syncList.size())
30+
31+
// Remove elements starting with 'a'
32+
val removed = syncList.removeIf { it.startsWith("a") }
33+
assertTrue(removed, "Should have removed elements")
34+
delay(100) // Allow replication
35+
36+
assertEquals(2, syncList.size(), "Should have 2 elements left")
37+
assertEquals("banana", syncList.get(0))
38+
assertEquals("cherry", syncList.get(1))
39+
}
40+
41+
@Test
42+
fun `removeIf returns false when no elements match`() = runTest {
43+
val syncList = redisApi.createSyncList("test-list-removeif-2", String.serializer())
44+
delay(100)
45+
46+
syncList.add("apple")
47+
syncList.add("banana")
48+
delay(100)
49+
50+
// Try to remove elements starting with 'z'
51+
val removed = syncList.removeIf { it.startsWith("z") }
52+
assertFalse(removed, "Should not have removed any elements")
53+
delay(100)
54+
55+
assertEquals(2, syncList.size(), "All elements should still be present")
56+
}
57+
58+
@Test
59+
fun `removeIf triggers listener notification`() = runTest {
60+
val syncList = redisApi.createSyncList("test-list-removeif-3", String.serializer())
61+
delay(100)
62+
63+
var clearedCalled = false
64+
syncList.addListener { change ->
65+
if (change is SyncListChange.Cleared) {
66+
clearedCalled = true
67+
}
68+
}
69+
70+
syncList.add("apple")
71+
syncList.add("banana")
72+
syncList.add("cherry")
73+
delay(100)
74+
75+
syncList.removeIf { it.startsWith("a") || it == "cherry" }
76+
delay(100)
77+
78+
assertTrue(clearedCalled, "Should have notified with Cleared event")
79+
assertEquals(1, syncList.size(), "Should have 1 element left")
80+
assertEquals("banana", syncList.get(0))
81+
}
82+
83+
@Test
84+
fun `removeIf on empty list returns false`() = runTest {
85+
val syncList = redisApi.createSyncList("test-list-removeif-4", String.serializer())
86+
delay(100)
87+
88+
val removed = syncList.removeIf { true }
89+
assertFalse(removed, "Should not have removed any elements from empty list")
90+
}
91+
92+
@Test
93+
fun `removeIf preserves correct order`() = runTest {
94+
val syncList = redisApi.createSyncList("test-list-removeif-5", Int.serializer())
95+
delay(100)
96+
97+
// Add numbers 1-10
98+
for (i in 1..10) {
99+
syncList.add(i)
100+
}
101+
delay(100)
102+
103+
assertEquals(10, syncList.size())
104+
105+
// Remove even numbers
106+
val removed = syncList.removeIf { it % 2 == 0 }
107+
assertTrue(removed, "Should have removed elements")
108+
delay(100)
109+
110+
assertEquals(5, syncList.size(), "Should have 5 odd numbers left")
111+
assertEquals(1, syncList.get(0))
112+
assertEquals(3, syncList.get(1))
113+
assertEquals(5, syncList.get(2))
114+
assertEquals(7, syncList.get(3))
115+
assertEquals(9, syncList.get(4))
116+
}
117+
118+
@Test
119+
fun `removeIf handles adjacent removals`() = runTest {
120+
val syncList = redisApi.createSyncList("test-list-removeif-6", String.serializer())
121+
delay(100)
122+
123+
syncList.add("a")
124+
syncList.add("a")
125+
syncList.add("b")
126+
syncList.add("a")
127+
syncList.add("a")
128+
delay(100)
129+
130+
assertEquals(5, syncList.size())
131+
132+
// Remove all 'a' elements
133+
val removed = syncList.removeIf { it == "a" }
134+
assertTrue(removed, "Should have removed elements")
135+
delay(100)
136+
137+
assertEquals(1, syncList.size(), "Should have 1 element left")
138+
assertEquals("b", syncList.get(0))
139+
}
140+
141+
@Test
142+
fun `removeIf replicates correctly across multiple nodes`() = runTest {
143+
// Create two nodes connected to the same Redis instance
144+
val node1Api = RedisApi.create(RedisURI.create(redisContainer.redisURI))
145+
node1Api.freezeAndConnect()
146+
147+
val node2Api = RedisApi.create(RedisURI.create(redisContainer.redisURI))
148+
node2Api.freezeAndConnect()
149+
150+
try {
151+
val list1 = node1Api.createSyncList("test-list-multinode-1", String.serializer())
152+
val list2 = node2Api.createSyncList("test-list-multinode-1", String.serializer())
153+
delay(200) // Allow both to initialize and sync
154+
155+
// Add elements from node1
156+
list1.add("apple")
157+
list1.add("banana")
158+
list1.add("cherry")
159+
list1.add("apricot")
160+
list1.add("date")
161+
delay(200) // Allow replication
162+
163+
// Verify both nodes see the same data
164+
assertEquals(5, list1.size(), "Node 1 should have 5 elements")
165+
assertEquals(5, list2.size(), "Node 2 should have 5 elements")
166+
167+
// Remove elements from node1
168+
val removed = list1.removeIf { it.startsWith("a") }
169+
assertTrue(removed, "Should have removed elements")
170+
delay(300) // Allow replication of Clear + Add deltas
171+
172+
// Verify both nodes have the same final state
173+
assertEquals(3, list1.size(), "Node 1 should have 3 elements left")
174+
assertEquals(3, list2.size(), "Node 2 should have 3 elements left")
175+
176+
// Check order is preserved on both nodes
177+
assertEquals("banana", list1.get(0))
178+
assertEquals("cherry", list1.get(1))
179+
assertEquals("date", list1.get(2))
180+
181+
assertEquals("banana", list2.get(0))
182+
assertEquals("cherry", list2.get(1))
183+
assertEquals("date", list2.get(2))
184+
} finally {
185+
node1Api.disconnect()
186+
node2Api.disconnect()
187+
}
188+
}
189+
}

0 commit comments

Comments
 (0)