11package org.session.libsession.network.snode
22
3+ import android.os.SystemClock
34import kotlinx.coroutines.CoroutineScope
45import kotlinx.coroutines.launch
56import kotlinx.coroutines.sync.Mutex
@@ -27,18 +28,28 @@ class SnodeDirectory @Inject constructor(
2728
2829 companion object {
2930 private const val MINIMUM_SNODE_POOL_COUNT = 12
31+ private const val MINIMUM_SNODE_REFRESH_COUNT = 3
3032 private const val SEED_NODE_PORT = 4443
3133
34+ private const val POOL_REFRESH_INTERVAL_MS = 2 * 60 * 60 * 1000L // 2h
35+
3236 private const val KEY_IP = " public_ip"
3337 private const val KEY_PORT = " storage_port"
3438 private const val KEY_X25519 = " pubkey_x25519"
3539 private const val KEY_ED25519 = " pubkey_ed25519"
3640 private const val KEY_VERSION = " storage_server_version"
3741 }
3842
39- // todo ONION we need to add the "refresh every 2h plus intersection" rules
43+ /* *
44+ * Single mutex for any operation that can persist/replace the pool (bootstrap OR refresh).
45+ * This prevents refresh/bootstrap races overwriting each other.
46+ */
47+ private val poolWriteMutex = Mutex ()
48+
49+ // Refresh state (non-blocking trigger + real exclusion inside mutex)
50+ @Volatile private var snodePoolRefreshing = false
4051
41- private val poolMutex = Mutex ()
52+ @Volatile private var lastRefreshElapsedMs : Long = 0L
4253
4354 private val seedNodePool: Set <String > = when (prefs.getEnvironment()) {
4455 Environment .DEV_NET -> setOf (" http://sesh-net.local:1280" )
@@ -73,8 +84,9 @@ class SnodeDirectory @Inject constructor(
7384
7485 fun getSnodePool (): Set <Snode > = storage.getSnodePool()
7586
76- fun updateSnodePool (newPool : Set <Snode >) {
87+ private fun persistSnodePool (newPool : Set <Snode >) {
7788 storage.setSnodePool(newPool)
89+ lastRefreshElapsedMs = SystemClock .elapsedRealtime()
7890 }
7991
8092 /* *
@@ -90,78 +102,85 @@ class SnodeDirectory @Inject constructor(
90102 suspend fun ensurePoolPopulated (
91103 minCount : Int = MINIMUM_SNODE_POOL_COUNT
92104 ): Set <Snode > {
93- // 1. Fast path: Optimistic check (no lock)
94105 val current = getSnodePool()
106+
95107 if (current.size >= minCount) {
108+ // ensure we set the refresh timestamp in case we are starting the app
109+ // with already cached snodes
110+ if (lastRefreshElapsedMs == 0L ) {
111+ lastRefreshElapsedMs = SystemClock .elapsedRealtime()
112+ }
96113 return current
97114 }
98115
99- // 2. Slow path: Acquire lock
100- return poolMutex.withLock {
101- // 3. Double-check: Did someone populate it while we were waiting?
116+ return poolWriteMutex.withLock {
102117 val freshCurrent = getSnodePool()
103- if (freshCurrent.size >= minCount) {
104- return @withLock freshCurrent
105- }
118+ if (freshCurrent.size >= minCount) return @withLock freshCurrent
106119
107- val target = seedNodePool.random()
108- Log .d(" SnodeDirectory" , " Populating snode pool using seed node: $target " )
109-
110- val url = " $target /json_rpc"
111- val responseBytes = HTTP .execute(
112- HTTP .Verb .POST ,
113- url = url,
114- parameters = getRandomSnodeParams,
115- useSeedNodeConnection = true
116- )
117-
118- val json = runCatching {
119- JsonUtil .fromJson(responseBytes, Map ::class .java)
120- }.getOrNull() ? : buildMap<String , Any > {
121- this [" result" ] = responseBytes.toString(Charsets .UTF_8 )
122- }
120+ val seeded = fetchSnodePoolFromSeed()
121+ if (seeded.isEmpty()) throw IllegalStateException (" Seed node returned empty snode pool" )
123122
124- @Suppress(" UNCHECKED_CAST" )
125- val intermediate = json[" result" ] as ? Map <* , * >
126- ? : throw IllegalStateException (" Failed to update snode pool, 'result' was null." )
127- .also { Log .d(" SnodeDirectory" , " Failed to update snode pool, intermediate was null." ) }
128-
129- @Suppress(" UNCHECKED_CAST" )
130- val rawSnodes = intermediate[" service_node_states" ] as ? List <* >
131- ? : throw IllegalStateException (" Failed to update snode pool, 'service_node_states' was null." )
132- .also { Log .d(" SnodeDirectory" , " Failed to update snode pool, rawSnodes was null." ) }
133-
134- val newPool = rawSnodes.asSequence()
135- .mapNotNull { it as ? Map <* , * > }
136- .mapNotNull { raw ->
137- createSnode(
138- address = raw[KEY_IP ] as ? String ,
139- port = raw[KEY_PORT ] as ? Int ,
140- ed25519Key = raw[KEY_ED25519 ] as ? String ,
141- x25519Key = raw[KEY_X25519 ] as ? String ,
142- version = (raw[KEY_VERSION ] as ? List <* >)
143- ?.filterIsInstance<Int >()
144- ?.let (Snode ::Version )
145- ).also {
146- if (it == null ) {
147- Log .d(
148- " SnodeDirectory" ,
149- " Failed to parse snode from: ${raw.prettifiedDescription()} ."
150- )
151- }
152- }
153- }
154- .toSet()
123+ Log .d(" SnodeDirectory" , " Persisting snode pool with ${seeded.size} snodes (seed bootstrap)." )
124+ persistSnodePool(seeded)
125+ seeded
126+ }
127+ }
155128
156- if (newPool.isEmpty()) {
157- throw IllegalStateException (" Seed node returned empty snode pool" )
158- }
129+ private suspend fun fetchSnodePoolFromSeed (): Set <Snode > {
130+ val target = seedNodePool.random()
131+ Log .d(" SnodeDirectory" , " Fetching snode pool using seed node: $target " )
132+ return fetchSnodePool(target, fromSeed = true )
133+ }
159134
160- Log .d(" SnodeDirectory" , " Persisting snode pool with ${newPool.size} snodes." )
161- updateSnodePool(newPool)
135+ private suspend fun fetchSnodePoolFromSnode (snode : Snode ): Set <Snode > {
136+ val target = " ${snode.address} :${snode.port} "
137+ Log .d(" SnodeDirectory" , " Fetching snode pool using snode: $target " )
138+ return fetchSnodePool(target, fromSeed = false )
139+ }
162140
163- newPool
141+ private suspend fun fetchSnodePool (target : String , fromSeed : Boolean ): Set <Snode > {
142+ val url = " $target /json_rpc"
143+ val responseBytes = HTTP .execute(
144+ HTTP .Verb .POST ,
145+ url = url,
146+ parameters = getRandomSnodeParams,
147+ useSeedNodeConnection = fromSeed
148+ )
149+
150+ val json = runCatching {
151+ JsonUtil .fromJson(responseBytes, Map ::class .java)
152+ }.getOrNull() ? : buildMap<String , Any > {
153+ this [" result" ] = responseBytes.toString(Charsets .UTF_8 )
164154 }
155+
156+ @Suppress(" UNCHECKED_CAST" )
157+ val intermediate = json[" result" ] as ? Map <* , * >
158+ ? : throw IllegalStateException (" Failed to update snode pool, 'result' was null." )
159+ .also { Log .d(" SnodeDirectory" , " Failed to update snode pool, intermediate was null." ) }
160+
161+ @Suppress(" UNCHECKED_CAST" )
162+ val rawSnodes = intermediate[" service_node_states" ] as ? List <* >
163+ ? : throw IllegalStateException (" Failed to update snode pool, 'service_node_states' was null." )
164+ .also { Log .d(" SnodeDirectory" , " Failed to update snode pool, rawSnodes was null." ) }
165+
166+ return rawSnodes.asSequence()
167+ .mapNotNull { it as ? Map <* , * > }
168+ .mapNotNull { raw ->
169+ createSnode(
170+ address = raw[KEY_IP ] as ? String ,
171+ port = raw[KEY_PORT ] as ? Int ,
172+ ed25519Key = raw[KEY_ED25519 ] as ? String ,
173+ x25519Key = raw[KEY_X25519 ] as ? String ,
174+ version = (raw[KEY_VERSION ] as ? List <* >)
175+ ?.filterIsInstance<Int >()
176+ ?.let (Snode ::Version )
177+ ).also {
178+ if (it == null ) {
179+ Log .d(" SnodeDirectory" , " Failed to parse snode from: ${raw.prettifiedDescription()} ." )
180+ }
181+ }
182+ }
183+ .toSet()
165184 }
166185
167186 /* *
@@ -220,7 +239,8 @@ class SnodeDirectory @Inject constructor(
220239 val current = getSnodePool()
221240 val hit = current.firstOrNull { it.publicKeySet?.ed25519Key == ed25519Key } ? : return
222241 Log .w(" SnodeDirectory" , " Dropping snode from pool (ed25519=$ed25519Key ): $hit " )
223- updateSnodePool(current - hit)
242+ storage.setSnodePool(current - hit)
243+ // NOTE: do NOT touch lastRefreshElapsedMs here; dropping isn’t a “refresh”.
224244 }
225245
226246 fun updateForkInfo (newForkInfo : ForkInfo ) {
@@ -233,8 +253,108 @@ class SnodeDirectory @Inject constructor(
233253 }
234254 }
235255
236- fun getSnodeByKey (ed25519Key : String? ): Snode ? {
237- if (ed25519Key == null ) return null
256+ fun getSnodeByKey (ed25519Key : String? ): Snode ? {
257+ if (ed25519Key == null ) return null
238258 return getSnodePool().firstOrNull { it.publicKeySet?.ed25519Key == ed25519Key }
239259 }
240- }
260+
261+ // snode pool refresh logic
262+
263+ /* *
264+ * Non-blocking trigger.
265+ *
266+ * IMPORTANT: does nothing until we have successfully seeded at least once
267+ * (lastRefreshElapsedMs != 0L).
268+ */
269+ fun refreshPoolIfStaleAsync () {
270+ // Don’t refresh until we’ve successfully seeded at least once
271+ if (lastRefreshElapsedMs == 0L ) return
272+
273+ val now = SystemClock .elapsedRealtime()
274+ if (snodePoolRefreshing) return
275+ if (now - lastRefreshElapsedMs < POOL_REFRESH_INTERVAL_MS ) return
276+
277+ scope.launch { refreshPoolFromSnodes() }
278+ }
279+
280+ private suspend fun refreshPoolFromSnodes () {
281+ poolWriteMutex.withLock {
282+ // Re-check staleness INSIDE the lock to avoid “double refresh” races
283+ if (lastRefreshElapsedMs == 0L ) return // still not seeded
284+ val now = SystemClock .elapsedRealtime()
285+ if (now - lastRefreshElapsedMs < POOL_REFRESH_INTERVAL_MS ) return
286+
287+ if (snodePoolRefreshing) return
288+ snodePoolRefreshing = true
289+
290+ try {
291+ val current = getSnodePool()
292+
293+ // If pool has less than 3 snodes, refresh from seed
294+ if (current.size < MINIMUM_SNODE_REFRESH_COUNT ) {
295+ val seeded = fetchSnodePoolFromSeed()
296+ if (seeded.isNotEmpty()) {
297+ Log .d(" SnodeDirectory" , " Refreshing pool from seed (pool too small). New size=${seeded.size} " )
298+ persistSnodePool(seeded)
299+ }
300+ return
301+ }
302+
303+ // Otherwise fetch from 3 random snodes (no special filtering requested)
304+ val results = mutableListOf<Set <Snode >>()
305+ val attempts = current.shuffled().iterator()
306+
307+ while (results.size < MINIMUM_SNODE_REFRESH_COUNT && attempts.hasNext()) {
308+ val snode = attempts.next()
309+ val fetched = runCatching { fetchSnodePoolFromSnode(snode) }.getOrNull()
310+ if (! fetched.isNullOrEmpty()) results + = fetched
311+ }
312+
313+ if (results.size < MINIMUM_SNODE_REFRESH_COUNT ) {
314+ // Could not fetch 3 pools reliably, fallback to seed
315+ val seeded = fetchSnodePoolFromSeed()
316+ if (seeded.isNotEmpty()) {
317+ Log .d(" SnodeDirectory" , " Refreshing pool from seed (3-snode fetch failed). New size=${seeded.size} " )
318+ persistSnodePool(seeded)
319+ }
320+ return
321+ }
322+
323+ val intersected = intersectByEd25519(results)
324+
325+ // If intersection is empty, fallback to seed
326+ if (intersected.isEmpty()) {
327+ val seeded = fetchSnodePoolFromSeed()
328+ if (seeded.isNotEmpty()) {
329+ Log .d(" SnodeDirectory" , " Intersection empty; refreshing pool from seed instead. New size=${seeded.size} " )
330+ persistSnodePool(seeded)
331+ }
332+ return
333+ }
334+
335+ Log .d(" SnodeDirectory" , " Refreshing pool via 3-node intersection. New size=${intersected.size} " )
336+ persistSnodePool(intersected)
337+
338+ } finally {
339+ snodePoolRefreshing = false
340+ }
341+ }
342+ }
343+
344+ /* *
345+ * Get the intersection of snodes from the various snode pool results
346+ */
347+ private fun intersectByEd25519 (pools : List <Set <Snode >>): Set <Snode > {
348+ if (pools.isEmpty()) return emptySet()
349+
350+ val candidates = pools.first()
351+ val otherPoolKeys = pools.drop(1 ).map { pool ->
352+ pool.mapNotNull { it.publicKeySet?.ed25519Key }.toSet()
353+ }
354+
355+ return candidates.filter { snode ->
356+ val key = snode.publicKeySet?.ed25519Key ? : return @filter false
357+ otherPoolKeys.all { it.contains(key) }
358+ }.toSet()
359+ }
360+ }
0 commit comments