Skip to content

Orphaned Indexed Sessions, and Reverse References #3453

@dreamstar-enterprises

Description

@dreamstar-enterprises

I cannot help by feel there should be a better way of doing this with Spring, since when a sessions expires the indexed reference and reverse references (which have no TTL) remain there indefinitely.

The below just felt like too many lines of code to write, unless there is an inbuilt way of doing this with Spring Session and or Spring Redis?

  • cleanupOrphanedIndexedKeysCursor
  • findOrphanedKeysInBatch
  • prepareCleanupOperationsForBatch
  • executeIndexedCleanupOperations

Also see:
spring-projects/spring-data-redis#3195


/**
 * Redis session cleanup component that handles expired session removal and orphaned data cleanup.
 *
 * ## 🧹 **Core Responsibilities**
 *
 * **Expired Session Cleanup:**
 * - Removes expired session entries from Redis ZSet (`spring:session:sessions:expirations`)
 * - Processes sessions in batches (500 at a time) for memory efficiency
 * - Uses time-based scoring for oldest-first cleanup order
 *
 * **Orphaned Index Cleanup:**
 * - Finds and removes orphaned reverse index keys (`*:sessions:*:idx`)
 * - Cleans up dangling references in Redis Sets that point to deleted sessions
 * - Uses cursor-based scanning to handle large datasets without memory issues
 *
 * **Distributed Coordination:**
 * - Implements Redis-based distributed locking to ensure single-instance execution
 * - Prevents duplicate cleanup work across multiple application instances
 * - Uses atomic Lua scripts for safe lock acquisition and release
 *
 * ## âš¡ **Performance Strategy**
 *
 * **Batch Operations:**
 * - MGET for bulk session existence checking (single network round-trip)
 * - SREM for batched removal from Redis Sets (grouped by target set)
 * - UNLINK for non-blocking deletion of index keys (Lua script batching)
 *
 * **Memory Bounded:**
 * - Cursor-based SCAN with configurable limits (1000 keys max per cycle)
 * - Small batch processing (100 keys per scan, 20 keys per operation batch)
 * - No full key enumeration - always incremental processing
 *
 * **Time Controlled:**
 * - 5-minute cleanup cycles with distributed locking
 * - Early termination on errors or completion
 * - Graceful degradation with fallback strategies
 *
 * ## 🔄 **Two-Phase Cleanup Process**
 *
 * **Phase 1 - Expired Sessions:**
 * ```
 * 1. Query expired entries from ZSet (time range: 5 days back to now)
 * 2. Remove found session IDs from expiration tracking ZSet
 * 3. Log cleanup results for monitoring
 * ```
 *
 * **Phase 2 - Orphaned Indexes:**
 * ```
 * 1. Scan for index keys (*:sessions:*:idx) using Redis cursor
 * 2. Extract session IDs from index key names and check if main session keys exist (batched by scan batch size, using MGET)
 * 3. For orphaned indexes:
 *    a. Get reverse references (SMEMBERS on index key)
 *    b. Remove session ID from all referencing sets (batch by member, using SREM)
 *    c. Delete the orphaned index key (UNLINK)
 * ```
 *
 * @property redisOperations Redis operations for string-based commands and Lua scripts
 * @property redisTemplate Redis operations for generic type operations and complex queries
 * @property springSessionProperties Spring Session configuration for namespaces and settings
 *
 * @since 1.0.0
 * @see org.springframework.session.data.redis.ReactiveRedisIndexedSessionRepository
 * @see org.springframework.scheduling.annotation.Scheduled
 */
@Component
@EnableScheduling
internal class SessionEvicter(
    private val redisOperations: ReactiveRedisOperations<String, String>,
    private val redisTemplate: ReactiveRedisTemplate<String, Any>,
    private val  springSessionProperties: SpringSessionProperties,
) {

    private companion object {
        private const val CLEANUP_DURATION_SECONDS = 300L
        private const val LOCK_KEY = "session-cleanup-lock"
        private val LOCK_EXPIRY: Duration = Duration.ofSeconds(CLEANUP_DURATION_SECONDS)
        private val LOCK_RELEASE_BUFFER_SECONDS = 10L
        private const val BATCH_SIZE = 500
        private const val RETENTION_DAYS = 5L
        private val logger = LoggerFactory.getLogger(SessionEvicter::class.java)
    }

    private val redisKeyExpirations = springSessionProperties.redis?.expiredSessionsNamespace
        ?: "spring:session:sessions:expirations"
    private val redisKeyNameSpace = springSessionProperties.redis?.sessionNamespace
        ?: "spring:session:sessions"
    private val lockNamespace = "$redisKeyNameSpace:sessions:"

    /**
     * Represents the context for a cleanup operation.
     */
    private data class CleanupContext(
        val now: Instant,
        val pastDays: Instant,
        val range: Range<Double>,
        val limit: Limit
    )

    /**
     * Represents data structure for cleanup operations
     */
    private data class CleanupOperation(
        val indexKey: String,
        val sessionId: String,
        val membersToCleanFrom: List<String>
    )

    /**
     * Scheduled entry point for session cleanup operations.
     *
     * ## What This Function Does
     * 1. **Generates Lock ID**: Creates unique UUID for this cleanup attempt
     * 2. **Acquires Lock**: Attempts distributed lock to prevent concurrent cleanup
     * 3. **Executes Cleanup**: Runs full cleanup process if lock acquired
     * 4. **Handles Errors**: Logs failures without disrupting scheduler
     *
     * ## Threading Model
     * - **Scheduler Thread**: Used only to launch coroutine (non-blocking)
     * - **Coroutine Scope**: `CoroutineScope(Dispatchers.IO)` for I/O operations
     * - **Background Execution**: Cleanup runs on IO thread pool, not scheduler thread
     * - **Non-blocking**: Spring scheduler remains available for other tasks
     *
     * ## Scheduling
     * - **Frequency**: Every 5 minutes (300 seconds)
     * - **Isolated**: Errors don't affect future scheduled executions
     *
     * ## Concurrency Control
     * - **Distributed Lock**: Only one BFF instance performs cleanup at a time
     * - **Skip on Busy**: Gracefully skips if another instance is cleaning
     * - **Lock-based Coordination**: Uses Redis for cross-instance synchronization
     *
     * ## Error Handling
     * - Logs all errors for debugging
     * - Continues scheduling despite failures
     * - Isolates cleanup errors from Spring scheduler
     *
     * @see CLEANUP_DURATION_SECONDS for scheduling interval
     * @see acquireLock for distributed locking mechanism
     * @see executeCleanupProcess for the actual cleanup implementation
     */
    @Scheduled(fixedRate = CLEANUP_DURATION_SECONDS, timeUnit = TimeUnit.SECONDS)
    fun cleanup() {

        // Launch coroutine in background scope to avoid blocking the scheduler
        CoroutineScope(Dispatchers.IO).launch {
            try {
                val lockValue = UUID.randomUUID().toString()
                logger.debug("Starting cleanup cycle with lock value: {}", lockValue)

                val acquired = acquireLock(lockValue)
                if (acquired) {
                    executeCleanupProcess(lockValue)
                } else {
                    logger.debug("Lock not acquired, skipping cleanup")
                }
            } catch (error: Exception) {
                logger.error("Error during cleanup process", error)
            }
        }
    }


    /**
     * Executes the complete cleanup process with lock management.
     *
     * The process flow:
     * 1. Performs main session cleanup
     * 2. Cleans up orphaned indexed keys
     * 3. Waits for a buffer period before lock release
     * 4. Releases the distributed lock
     *
     * Error handling:
     * - Logs any errors during the process
     * - Ensures lock release even on failure
     * - Returns empty Mono on error after cleanup
     *
     * @param lockValue The unique value used to identify this lock instance
     */
    private suspend fun executeCleanupProcess(lockValue: String) {
        try {
            performCleanup()
            logger.debug("Main cleanup completed successfully")

            cleanupOrphanedIndexedKeysCursor()
            logger.debug("Orphaned keys cleanup completed")

            val bufferDelaySeconds = CLEANUP_DURATION_SECONDS - LOCK_RELEASE_BUFFER_SECONDS
            delay(bufferDelaySeconds * 1000)

            releaseLock(lockValue)
        } catch (error: Exception) {
            logger.error("Error while executing cleanup process", error)
            releaseLock(lockValue)
        }
    }


    /**
     * Attempts to acquire a distributed Redis lock for cleanup coordination.
     *
     * ## Why Distributed Locking?
     * In multi-instance deployments, only one instance should perform session cleanup
     * to avoid duplicate work and potential Redis contention. This lock ensures
     * mutual exclusion across all application instances.
     *
     * ## How It Works
     * Uses Redis `SET` command with conditional options to atomically:
     * 1. **Set the lock key** only if it doesn't exist (`NX` = "Not eXists")
     * 2. **Set expiration** to prevent deadlock if holder crashes (`EX` = "EXpire")
     * 3. **Store unique value** to identify the lock holder (UUID)
     *
     * ## Lua Script Breakdown
     * ```lua
     * return redis.call('set', KEYS[1], ARGV[1], 'NX', 'EX', ARGV[2])
     * ```
     *
     * **Parameters:**
     * - `KEYS[1]`: Lock key (e.g., `spring:session:sessions:session-cleanup-lock`)
     * - `ARGV[1]`: Unique lock value (UUID to identify this instance)
     * - `ARGV[2]`: Expiration in seconds (300s = 5 minutes)
     *
     * **Redis SET Options:**
     * - `NX`: Only set if key doesn't exist (atomic test-and-set)
     * - `EX`: Set expiration in seconds (automatic cleanup)
     *
     * **Return Values:**
     * - `"OK"`: Lock acquired successfully
     * - `null`: Lock already held by another instance
     *
     * ## Safety Features
     * - **Atomic Operation**: SET NX EX is atomic - no race conditions
     * - **Auto-Expiration**: Lock expires after 5 minutes even if holder crashes
     * - **Unique Values**: Each instance uses UUID to prevent accidental release
     * - **Deadlock Prevention**: Expiration ensures locks don't stick forever
     *
     * ## Performance
     * - **Single Redis Call**: One atomic operation, minimal network overhead
     * - **Fast Failure**: Returns immediately if lock unavailable
     * - **Low Contention**: Lock attempts every 5 minutes across instances
     *
     * @param lockValue Unique identifier (UUID) for this lock attempt
     * @return `true` if lock acquired, `false` if another instance holds it
     */
    private suspend fun acquireLock(lockValue: String): Boolean {
        val fullLockKey = "$lockNamespace$LOCK_KEY"
        logger.debug("Attempting to acquire lock: key={}, value={}", fullLockKey, lockValue)

        val script = """
                       return redis.call('set', KEYS[1], ARGV[1], 'NX', 'EX', ARGV[2])
                   """.trimIndent()

        return try {
            val result = redisOperations.execute(
                RedisScript.of(script, String::class.java),
                listOf(fullLockKey),
                listOf(lockValue, LOCK_EXPIRY.seconds.toString())
            )
                .next()                 // Convert Flux<String> to Mono<String> (first emission only)
                .awaitSingleOrNull()    // Convert Mono<String> to suspend function returning String?

            val acquired = result == "OK"

            logger.debug(
                if (acquired) "Successfully acquired lock: {}"
                else "Failed to acquire lock: {}",
                lockValue
            )

            acquired

        } catch (error: Exception) {
            logger.error("Error acquiring lock: {}", error.message)
            false
        }
    }

    /**
     * Releases a distributed Redis lock using atomic Lua script validation.
     *
     * ## Why Safe Lock Release?
     * In distributed systems, locks must be released safely to prevent:
     * - Accidental release by wrong instance
     * - Race conditions during release
     * - Orphaned locks from crashed instances
     * - Security vulnerabilities from malicious release attempts
     *
     * ## How It Works
     * Uses atomic Lua script to safely verify ownership and release:
     * 1. **Get current value** from Redis lock key
     * 2. **Compare values** to verify this instance owns the lock
     * 3. **Delete atomically** only if values match
     * 4. **Return status** indicating success or failure
     *
     * ## Lua Script Breakdown
     * ```lua
     * if redis.call('get', KEYS[1]) == ARGV[1] then
     *     return redis.call('del', KEYS[1])
     * else
     *     return 0
     * end
     * ```
     *
     * **Parameters:**
     * - `KEYS[1]`: Lock key (e.g., `spring:session:sessions:session-cleanup-lock`)
     * - `ARGV[1]`: Expected lock value (UUID to verify ownership)
     *
     * **Script Logic:**
     * - `GET`: Retrieves current lock value
     * - `==`: Compares with provided value (ownership verification)
     * - `DEL`: Deletes key only if ownership confirmed
     * - Returns: `1` if deleted, `0` if not owner or key missing
     *
     * **Return Values:**
     * - `1`: Lock successfully released by rightful owner
     * - `0`: Lock not released (wrong owner, expired, or missing)
     *
     * ## Safety Features
     * - **Atomic Verification**: GET + compare + DEL in single atomic operation
     * - **Ownership Validation**: Only lock holder can release using correct UUID
     * - **Race Condition Prevention**: No window between check and delete
     * - **Idempotent**: Safe to call multiple times with same value
     * - **Failure Tolerance**: Handles expired/missing locks gracefully
     *
     * ## Performance
     * - **Single Redis Call**: One atomic script execution, minimal latency
     * - **No Network Round-trips**: Script runs server-side in Redis
     * - **Low CPU Impact**: Simple conditional logic in Lua
     * - **Memory Efficient**: No intermediate data structures
     *
     * @param lockValue Unique identifier (UUID) that was used to acquire the lock
     * @return Mono<Void> completing when release attempt finishes (success or failure)
     */
    private suspend fun releaseLock(lockValue: String) {
        val fullLockKey = "$lockNamespace$LOCK_KEY"
        logger.debug("Attempting to release lock: key={}, value={}", fullLockKey, lockValue)

        val script = """
                       if redis.call('get', KEYS[1]) == ARGV[1] then
                           return redis.call('del', KEYS[1])
                       else
                           return 0
                       end
                   """.trimIndent()

        try {
            val result = redisOperations.execute(
                RedisScript.of(script, Long::class.java),
                listOf(fullLockKey),
                listOf(lockValue)
            )
                .next()
                .awaitSingleOrNull()

            val released = result == 1L

            if (released) {
                logger.debug("Successfully released lock: {}", lockValue)
            } else {
                logger.warn("Failed to release lock: {}", lockValue)
            }

        } catch (error: Exception) {
            logger.error("Error releasing lock: {}", error.message, error)
        }
    }


    /**
     * Executes main session cleanup operation with context management.
     *
     * Simple orchestration function that:
     * 1. Creates cleanup context with time boundaries and limits
     * 2. Logs operation start for audit trail
     * 3. Executes expired session cleanup in Redis
     *
     * @throws io.lettuce.core.RedisConnectionException if Redis operations fail
     */
    private suspend fun performCleanup() {
        val context = createCleanupContext()
        logCleanupStart(context)
        cleanupExpiredSessions(context)
    }


    /**
     * Creates cleanup operation context with boundaries and limits.
     *
     * ## Context Components
     * - **Temporal Boundaries**: Defines time ranges for session expiration
     *   - Current timestamp for operation execution
     *   - Retention boundary (5 days back) for cleanup scope
     * - **Batch Configuration**: Sets limits for efficient processing
     *   - Configurable batch size (500) to prevent memory issues
     * - **Operation Parameters**: Configures Redis constraints
     *   - Score range for ZSet queries based on timestamps
     *   - Limit object for batch processing control
     *
     * ## Performance Configuration
     * - **Configurable Retention**: Uses RETENTION_DAYS constant
     * - **Batch Size Control**: Uses BATCH_SIZE for memory management
     * - **Time Range Limits**: Converts timestamps to Redis scores
     * - **Resource Bounds**: Controls query result size
     *
     * @return [CleanupContext] with operation parameters and boundaries
     */
    private fun createCleanupContext(): CleanupContext {
        val now = Instant.now()
        val pastDays = now.minus(Duration.ofDays(RETENTION_DAYS))
        return CleanupContext(
            now = now,
            pastDays = pastDays,
            range = Range.closed(
                pastDays.toEpochMilli().toDouble(),
                now.toEpochMilli().toDouble()
            ),
            limit = Limit.limit().count(BATCH_SIZE)
        )
    }

    /**
     * Logs cleanup operation start with operational context for debugging and monitoring.
     *
     * ## Logged Information
     * - **Current Time**: Cleanup start timestamp
     * - **Time Range**: Retention window (5 days back to now)
     * - **Batch Size**: Sessions processed per batch (500)
     * - **Redis Key**: Target expiration ZSet key
     *
     * ## Output Example
     * ```
     * Starting scheduled cleanup:
     * Current time: 2023-08-10T14:30:00Z
     * Range start: Thu Aug 05 14:30:00 UTC 2023
     * Range end: Thu Aug 10 14:30:00 UTC 2023
     * Batch size: 500
     * Redis key: spring:session:sessions:expirations
     * ```
     *
     * Essential for debugging cleanup timing, verifying parameters, and monitoring cleanup scope.
     *
     * @param context Operation parameters containing time boundaries and limits
     */
    private fun logCleanupStart(context: CleanupContext) {
        logger.debug(
            """
           Starting scheduled cleanup:
           Current time: {}
           Range start: {}
           Range end: {}
           Batch size: {}
           Redis key: {}
           """.trimIndent(),
            context.now,
            Date(context.pastDays.toEpochMilli()),
            Date(context.now.toEpochMilli()),
            context.limit.count,
            redisKeyExpirations
        )
    }

    /**
     * Executes expired session cleanup in Redis ZSet with batch processing.
     *
     * ## What is a Redis ZSet?
     * A **Sorted Set** storing session IDs with expiration timestamps as scores:
     * ```
     * Key: "spring:session:sessions:expirations"
     * Data: sessionId1(score: 1691234567), sessionId2(score: 1691234890)...
     * ```
     *
     * ## Cleanup Order
     * Uses `rangeByScore()` to process **oldest expired sessions first**:
     * - Lower scores (older timestamps) processed first
     * - Sessions that expired earliest get cleaned up first
     * - Logical FIFO cleanup order
     *
     * ## CleanupContext Usage
     * - **Range**: Time boundaries (pastDays → now) as ZSet scores
     * - **Limit**: Batch size (500) to prevent memory issues
     *
     * ## Process
     * 1. Query expired sessions within time range (batch of 500, oldest first)
     * 2. Remove found sessions from ZSet in single atomic operation
     * 3. Log cleanup results
     *
     * @param context Operation boundaries containing time range and batch limits
     * @throws io.lettuce.core.RedisConnectionException if Redis operations fail
     */
    private suspend fun cleanupExpiredSessions(context: CleanupContext) {
        val sessionIds = redisOperations.opsForZSet()
            .rangeByScore(redisKeyExpirations, context.range, context.limit)
            .collectList()
            .awaitSingleOrNull() ?: emptyList()

        if (sessionIds.isEmpty()) {
            logger.debug("No expired sessions found")
        } else {
            logger.debug("Found {} expired sessions to remove", sessionIds.size)
            try {
                redisOperations.opsForZSet()
                    .remove(redisKeyExpirations, *sessionIds.toTypedArray())
                    .awaitSingleOrNull()
                logger.debug("Successfully removed {} expired sessions", sessionIds.size)
            } catch (e: Exception) {
                logger.error("Error during removal: ${e.message}", e)
                throw e
            }
        }
    }


    /**
     * Cursor-based cleanup that processes index keys incrementally with resumable pagination.
     *
     * ## Why Cursor-Based?
     * - **Memory Bounded**: Only holds small batches in memory (never all keys)
     * - **Time Controlled**: Can limit processing time per cleanup cycle
     * - **Resumable**: Each cleanup cycle continues from where the last one left off
     * - **Scalable**: Works with millions of index keys without memory issues
     *
     * ## Process Flow
     * 1. **Resume from cursor**: Start where previous cleanup ended
     * 2. **Scan batch**: Get next batch of index keys (100 at a time)
     * 3. **Process immediately**: Check existence and clean up orphaned keys
     * 4. **Update cursor**: Save position for next cleanup cycle
     * 5. **Respect limits**: Stop after processing max keys per cycle
     *
     * @throws io.lettuce.core.RedisConnectionException if Redis scanning operations fail
     */
    private suspend fun cleanupOrphanedIndexedKeysCursor() {
        val pattern = "$redisKeyNameSpace:sessions:*:idx"
        val maxKeysPerCleanup = 1000   // Process max 1000 keys per cleanup cycle
        val scanBatchSize = 100L       // Scan 100 keys at a time

        var processedCount = 0
        var orphanedCount = 0

        logger.debug("Starting cursor-based cleanup, max keys per cycle: {}", maxKeysPerCleanup)

        do {
            val scanOptions = ScanOptions.scanOptions()
                .match(pattern)
                .count(scanBatchSize)
                .build()

            try {
                // Get next batch of index keys using proper scan API
                val indexKeys = redisTemplate.execute { connection ->
                    Flux.from(connection.keyCommands().scan(scanOptions))
                        .map(::decodeByteBuffer)
                        .take(scanBatchSize)
                        .collectList()
                }.awaitSingle()

                if (indexKeys.isNotEmpty()) {
                    // Process this batch immediately
                    val orphanedInBatch = findOrphanedKeysInBatch(indexKeys)

                    if (orphanedInBatch.isNotEmpty()) {
                        val cleanupOps = prepareCleanupOperationsForBatch(orphanedInBatch)
                        executeIndexedCleanupOperations(cleanupOps)
                        orphanedCount += orphanedInBatch.size
                    }

                    processedCount += indexKeys.size
                    logger.debug("Processed batch: {} keys, {} orphaned", indexKeys.size, orphanedInBatch.size)
                }

                if (indexKeys.size < scanBatchSize) {
                    logger.debug("Completed scan - no more keys")
                    break
                }

            } catch (error: Exception) {
                logger.error("Error during cursor-based cleanup", error)
                break
            }

        } while (processedCount < maxKeysPerCleanup)

        logger.info("Cursor cleanup cycle completed: processed {} keys, cleaned {} orphaned",
            processedCount, orphanedCount)
    }



    /**
     * Identifies orphaned index keys within a batch using efficient Redis batch operations.
     *
     * ## Batch Processing Strategy
     * - **MGET Operation**: Single Redis call to check existence of all corresponding session keys
     * - **Memory Efficient**: Processes small batches from cursor scanning (not all keys at once)
     * - **Fallback Safety**: Individual key checks if batch operation fails
     * - **Fast Detection**: Bulk existence checking vs individual key lookups
     *
     * ## How Orphan Detection Works
     * 1. **Extract Session IDs**: Parse session IDs from index key names
     * 2. **Build Session Keys**: Create corresponding session key names for existence check
     * 3. **Batch Existence Check**: Use MGET to check all session keys simultaneously
     * 4. **Identify Orphans**: Index keys whose sessions no longer exist are orphaned
     *
     * ## Performance Benefits
     * - **Single Network Round-trip**: MGET checks all keys in one Redis call
     * - **Reduced Latency**: Batch operations vs N individual calls
     * - **Resource Efficient**: Lower CPU and memory usage than sequential checks
     *
     * @param indexKeys Small batch of index keys from cursor scan (typically ~100 keys)
     * @return List of orphaned index keys whose corresponding sessions no longer exist
     */
    private suspend fun findOrphanedKeysInBatch(indexKeys: List<String>): List<String> {
        if (indexKeys.isEmpty()) return emptyList()

        try {
            // Build session keys for this batch
            val sessionKeys = indexKeys.map { indexedKey ->
                val sessionId = extractSessionIdFromIndexedKey(indexedKey)
                "${springSessionProperties.redis?.sessionNamespace}:sessions:$sessionId"
            }

            // Single Redis MGET call to check all keys at once
            val existenceResults = redisTemplate.opsForValue()
                .multiGet(sessionKeys)
                .awaitSingleOrNull() ?: emptyList()

            // Find orphaned keys in this batch
            val orphanedKeys = mutableListOf<String>()
            indexKeys.forEachIndexed { index, indexedKey ->
                val sessionValue = existenceResults.getOrNull(index)
                if (sessionValue == null) {
                    orphanedKeys.add(indexedKey)
                }
            }

            logger.debug("Batch check: {} total, {} orphaned", indexKeys.size, orphanedKeys.size)
            return orphanedKeys

        } catch (error: Exception) {
            logger.error("Error in batch existence check, falling back to individual checks", error)

            // Fallback to individual checks
            val orphanedKeys = mutableListOf<String>()
            indexKeys.forEach { indexedKey ->
                try {
                    val sessionId = extractSessionIdFromIndexedKey(indexedKey)
                    val sessionKey = "${springSessionProperties.redis?.sessionNamespace}:sessions:$sessionId"
                    val sessionExists = redisTemplate.hasKey(sessionKey).awaitSingleOrNull() ?: false
                    if (!sessionExists) orphanedKeys.add(indexedKey)
                } catch (e: Exception) {
                    logger.error("Error checking individual key: {}", indexedKey, e)
                }
            }
            return orphanedKeys
        }
    }


    /**
     * Prepares cleanup operations for orphaned keys by gathering their Redis set memberships.
     *
     * ## Purpose
     * Before deleting orphaned index keys, we must remove session IDs from all Redis sets
     * that reference them. Index keys track which sets contain the session ID.
     *
     * ## Batch Processing
     * - **Chunked Operations**: Processes keys in chunks of 20 to manage Redis load
     * - **Parallel Set Queries**: Uses Flux to query multiple set memberships concurrently
     * - **Error Resilience**: Creates empty operations for failed batches to ensure key deletion
     *
     * ## Operation Structure
     * Each [CleanupOperation] contains:
     * - **Index Key**: The orphaned key to delete
     * - **Session ID**: Extracted session identifier
     * - **Members List**: Redis sets that need the session ID removed
     *
     * ## Why Two-Phase Cleanup?
     * 1. **Data Integrity**: Remove references before deleting index keys
     * 2. **Consistency**: Prevents dangling references in Redis sets
     * 3. **Atomic Batching**: Groups operations for efficient Redis execution
     *
     * @param orphanedKeys List of confirmed orphaned index keys needing cleanup
     * @return List of [CleanupOperation] objects ready for execution
     */
    private suspend fun prepareCleanupOperationsForBatch(
        orphanedKeys: List<String>
    ): List<CleanupOperation> {

        if (orphanedKeys.isEmpty()) return emptyList()

        val batchSize = 20
        val allCleanupOps = mutableListOf<CleanupOperation>()

        orphanedKeys.chunked(batchSize).forEach { batch ->
            try {
                // Batch get members for all keys in this chunk
                val membersResults = redisTemplate.execute { connection ->
                    Flux.fromIterable(batch)
                        .flatMap { indexedKey ->
                            connection.setCommands()
                                .sMembers(ByteBuffer.wrap(indexedKey.toByteArray()))
                                .collectList()
                                .map { members -> indexedKey to members.map { decodeByteBuffer(it) } }
                        }
                        .collectList()
                }.collectList()
                    .awaitSingleOrNull()?.flatten() ?: emptyList()

                // Process results and create cleanup operations
                membersResults.forEach { (indexedKey, members) ->
                    val sessionId = extractSessionIdFromIndexedKey(indexedKey)

                    val cleanupOp = CleanupOperation(
                        indexKey = indexedKey,
                        sessionId = sessionId,
                        membersToCleanFrom = members
                    )
                    allCleanupOps.add(cleanupOp)
                }


            } catch (error: Exception) {
                logger.error("Error preparing cleanup operations for batch", error)
                // Add empty operations for failed batch to avoid skipping deletion
                batch.forEach { indexedKey ->
                    val sessionId = extractSessionIdFromIndexedKey(indexedKey)
                    allCleanupOps.add(
                        CleanupOperation(
                            indexKey = indexedKey,
                            sessionId = sessionId,
                            membersToCleanFrom = emptyList()
                        )
                    )
                }

            }
        }

        return allCleanupOps
    }


    /**
     * Executes cleanup operations in two optimized phases with batched Redis commands.
     *
     * ## Two-Phase Cleanup Strategy
     *
     * ### Phase 1: Batched Set Removals
     * - **Groups by Target**: Collects all session IDs to remove from each Redis set
     * - **Batch Operations**: Single SREM call per set (vs individual removals)
     * - **Data Integrity**: Removes references before deleting index keys
     * - **Error Isolation**: Failed set operations don't block index key deletion
     *
     * ### Phase 2: Batch Index Key Deletion
     * - **Bulk Delete**: Single DELETE call for all index keys using Flux
     * - **Atomic Operation**: All index keys deleted together
     * - **Performance**: One Redis call vs N individual deletions
     * - **Cleanup Completion**: Ensures orphaned keys are fully removed
     *
     * ## Performance Benefits
     * - **Reduced Network Calls**: Batches minimize Redis round-trips
     * - **Lower Redis Load**: Bulk operations vs individual commands
     * - **Memory Efficient**: Processes operations without storing large intermediate collections
     * - **Time Bounded**: Fast execution suitable for scheduled cleanup cycles
     *
     * ## Error Handling
     * - Set removal errors are logged but don't prevent index key deletion
     * - Ensures cleanup progress even with partial failures
     * - Individual operation failures don't cascade to entire batch
     *
     * @param cleanupOps List of prepared cleanup operations to execute
     */
    private suspend fun executeIndexedCleanupOperations(
        cleanupOps: List<CleanupOperation>
    ) {
        if (cleanupOps.isEmpty()) {
            logger.debug("No cleanup operations to execute")
            return
        }

        // Phase 1: Group all removals by target set for batching
        val removalsBySet = mutableMapOf<String, MutableList<String>>()

        cleanupOps.forEach { op ->
            op.membersToCleanFrom.forEach { member ->
                removalsBySet.getOrPut(member) { mutableListOf() }.add(op.sessionId)
            }
        }

        // Execute batched set removals
        removalsBySet.forEach { (setKey, sessionIds) ->
            try {
                val removedCount = redisTemplate.opsForSet()
                    .remove(setKey, *sessionIds.toTypedArray())
                    .awaitSingleOrNull() ?: 0

                logger.debug("Removed {} session IDs from set {}", removedCount, setKey)
            } catch (error: Exception) {
                logger.error("Error removing from set {}: {}", setKey, error.message)
            }
        }

        // Phase 2: Batch delete all index keys using UNLINK instead of DEL
        val indexKeys = cleanupOps.map { it.indexKey }
        val deletedCount = if (indexKeys.isNotEmpty()) {
            val script = """
                            local count = 0
                            for i = 1, #KEYS do
                                count = count + redis.call('unlink', KEYS[i])
                            end
                            return count
                        """.trimIndent()

            redisOperations.execute(
                RedisScript.of(script, Long::class.java),
                indexKeys,
                emptyList<String>()
            ).next().awaitSingleOrNull() ?: 0
        } else {
            0
        }

        logger.debug("Unlinked {} index keys", deletedCount)

    }


    /**
     * Extracts session ID from Spring Session Redis index key.
     *
     * Index keys follow format: `namespace:sessions:sessionId:idx`
     * Splits on ":" and takes element at index 5.
     *
     * Examples:
     * ```
     * "spring:session:sessions:abc123:idx" -> "abc123"
     * "myapp:session:sessions:user789:idx" -> "user789"
     * ```
     *
     * @param indexedKey Redis index key in Spring Session format
     * @return Session ID component
     */
    private fun extractSessionIdFromIndexedKey(indexedKey: String): String =
        indexedKey.split(":")[5]

    /**
     * Converts Redis ByteBuffer to UTF-8 string.
     *
     * Redis operations return ByteBuffer objects that need string conversion.
     * Uses standard UTF-8 decoding for text processing.
     *
     * Examples:
     * ```
     * // SCAN results
     * val keyName = decodeByteBuffer(scanResult)
     *
     * // Set members
     * val memberValue = decodeByteBuffer(setMember)
     * ```
     *
     * @param byteBuffer ByteBuffer from Redis operation
     * @return Decoded UTF-8 string
     */
    private fun decodeByteBuffer(byteBuffer: ByteBuffer): String =
        StandardCharsets.UTF_8.decode(byteBuffer).toString()

}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions