88import java .util .ArrayList ;
99import java .util .Collection ;
1010import java .util .Collections ;
11+ import java .util .HashMap ;
1112import java .util .HashSet ;
1213import java .util .List ;
14+ import java .util .Map ;
1315import java .util .Set ;
1416import java .util .concurrent .ConcurrentHashMap ;
1517import java .util .concurrent .ConcurrentMap ;
18+ import java .util .concurrent .atomic .AtomicInteger ;
1619import java .util .concurrent .locks .Lock ;
1720import java .util .concurrent .locks .ReadWriteLock ;
1821import java .util .concurrent .locks .ReentrantReadWriteLock ;
22+ import org .redisson .api .RSet ;
1923import org .redisson .api .RSetMultimap ;
2024import org .redisson .api .RTopic ;
2125import org .redisson .api .RedissonClient ;
@@ -37,10 +41,9 @@ public class SubscriptionCatalog {
3741 private final NodeSelector nodeSelector ;
3842 private final int listenerId ;
3943 private final RTopic topic ;
40-
4144 private final ConcurrentMap <String , Set <RegistrationInfo >> localSubs = new ConcurrentHashMap <>();
4245 private final ConcurrentMap <String , Set <RegistrationInfo >> ownSubs = new ConcurrentHashMap <>();
43- private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock ();
46+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock (true );
4447 private final Throttling throttling ;
4548
4649 /**
@@ -219,31 +222,103 @@ public void removeAllForNodes(Set<String> nodeIds) {
219222 * Remove subscriptions for nodes that are not part of the <code>availableNodeIds</code>
220223 * collection. Own subscriptions are never removed.
221224 *
222- * <p>Unknown nodes with lingering state can observed in clusters that has scaled down to one and
223- * then crashes. If a new node is not available to receive events when node entries expire in
225+ * <p>Unknown nodes with lingering state can be observed in clusters that has scaled down to one
226+ * and then crashes. If a new node is not available to receive events when node entries expire in
224227 * Redis, the subscriptions will remain registered in Redis.
225228 *
226229 * @param self the node ID of this process
227230 * @param availableNodeIds a set of available nodes
228231 */
229232 public void removeUnknownSubs (String self , Collection <String > availableNodeIds ) {
230- Set <String > known = new HashSet <>(availableNodeIds );
231- known .add (self );
232233
233- Set <String > updated = new HashSet <>();
234- subsMap
235- .entries ()
236- .forEach (
237- entry -> {
238- if (!known .contains (entry .getValue ().nodeId ())) {
239- log .warn (
240- "Remove lingering subscriptions from unknown node [{}]" ,
241- entry .getValue ().nodeId ());
242- subsMap .remove (entry .getKey (), entry .getValue ());
243- updated .add (entry .getKey ());
244- }
245- });
246- updated .forEach (topic ::publish );
234+ // Build set of known nodes (cluster members + self)
235+ Set <String > knownNodes = new HashSet <>(availableNodeIds );
236+ knownNodes .add (self );
237+
238+ // Group stale subs by subscription key (address)
239+ Map <String , List <RegistrationInfo >> cleanupByKey = new HashMap <>();
240+
241+ for (Map .Entry <String , RegistrationInfo > entry : subsMap .entries ()) {
242+ RegistrationInfo info = entry .getValue ();
243+ if (!knownNodes .contains (info .nodeId ())) {
244+ cleanupByKey .computeIfAbsent (entry .getKey (), k -> new ArrayList <>()).add (info );
245+ }
246+ }
247+
248+ if (cleanupByKey .isEmpty ()) {
249+ return ;
250+ }
251+
252+ // Bulk remove all unknown subscriptions
253+ List <String > updatedKeys = bulkRemoveUnknownSubsByKey (cleanupByKey , subsMap );
254+
255+ // Notify all updated keys across the cluster nodes
256+ updatedKeys .forEach (topic ::publish );
257+ }
258+
259+ /**
260+ * Bulk remove unknown subscriptions grouped by key (address) to reduce round trips to Redis. If a
261+ * bulk deletion fails, this method falls back to individual deletion per entry.
262+ *
263+ * @param cleanupByKey the stale subscriptions to remove, grouped by key (address)
264+ * @param subscriptions the subscriptions map in Redis
265+ * @return the updated keys (addresses)
266+ */
267+ static List <String > bulkRemoveUnknownSubsByKey (
268+ Map <String , List <RegistrationInfo >> cleanupByKey ,
269+ RSetMultimap <String , RegistrationInfo > subscriptions ) {
270+ // Stats
271+ AtomicInteger totalRemoved = new AtomicInteger ();
272+ AtomicInteger totalFailed = new AtomicInteger ();
273+ Map <String , Integer > removedPerNode = new HashMap <>();
274+
275+ List <String > updatedKeys = new ArrayList <>();
276+ for (Map .Entry <String , List <RegistrationInfo >> entry : cleanupByKey .entrySet ()) {
277+ String key = entry .getKey ();
278+ List <RegistrationInfo > staleValues = entry .getValue ();
279+ try {
280+ RSet <RegistrationInfo > set = subscriptions .get (key );
281+
282+ // Bulk remove all (single round trip to Redis)
283+ set .removeAll (staleValues );
284+
285+ // Mark as updated because stale entries existed
286+ updatedKeys .add (key );
287+
288+ // Stats
289+ for (RegistrationInfo info : staleValues ) {
290+ removedPerNode .merge (info .nodeId (), 1 , Integer ::sum );
291+ totalRemoved .incrementAndGet ();
292+ }
293+ } catch (Exception e ) {
294+ // Fallback to safe per-value removal
295+ log .warn ("Bulk removal failed for key [{}], retrying individually" , key , e );
296+
297+ for (RegistrationInfo info : staleValues ) {
298+ boolean ok = subscriptions .remove (key , info );
299+ if (ok ) {
300+ removedPerNode .merge (info .nodeId (), 1 , Integer ::sum );
301+ totalRemoved .incrementAndGet ();
302+ } else {
303+ totalFailed .incrementAndGet ();
304+ }
305+ }
306+
307+ updatedKeys .add (key );
308+ }
309+ }
310+
311+ // Logging summary
312+ log .warn (
313+ "Removed {} lingering subscriptions from {} unknown node(s). Breakdown: {}" ,
314+ totalRemoved .get (),
315+ removedPerNode .size (),
316+ removedPerNode );
317+
318+ if (totalFailed .get () > 0 ) {
319+ log .warn ("Failed to remove {} subscriptions" , totalFailed .get ());
320+ }
321+ return updatedKeys ;
247322 }
248323
249324 /** Republish subscriptions that belongs to the current node (in which this is executed). */
0 commit comments