diff --git a/spring-session-data-redis/src/main/java/org/springframework/session/data/redis/ReactiveRedisIndexedSessionRepository.java b/spring-session-data-redis/src/main/java/org/springframework/session/data/redis/ReactiveRedisIndexedSessionRepository.java index d6ba106d7..79f0786ed 100644 --- a/spring-session-data-redis/src/main/java/org/springframework/session/data/redis/ReactiveRedisIndexedSessionRepository.java +++ b/spring-session-data-redis/src/main/java/org/springframework/session/data/redis/ReactiveRedisIndexedSessionRepository.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2023 the original author or authors. + * Copyright 2014-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,7 @@ import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.SmartLifecycle; import org.springframework.core.NestedExceptionUtils; import org.springframework.data.redis.connection.ReactiveSubscription; import org.springframework.data.redis.core.ReactiveRedisOperations; @@ -230,12 +231,13 @@ * {@code "spring:session:sessions:648377f7-c76f-4f45-b847-c0268bb48381:idx"} Redis set. * * @author Marcus da Coregio + * @author Ham Seung Hun * @since 3.3 */ public class ReactiveRedisIndexedSessionRepository implements ReactiveSessionRepository, ReactiveFindByIndexNameSessionRepository, DisposableBean, - InitializingBean { + InitializingBean, SmartLifecycle { private static final Log logger = LogFactory.getLog(ReactiveRedisIndexedSessionRepository.class); @@ -249,6 +251,28 @@ public class ReactiveRedisIndexedSessionRepository */ public static final int DEFAULT_DATABASE = 0; + /** + * The default SmartLifecycle phase. + * + *

+ * Set to {@code Integer.MAX_VALUE / 2} to position this repository between the Redis + * {@link org.springframework.data.redis.connection.RedisConnectionFactory} (typically + * small, e.g. {@code 0}) and web server / messaging listener containers (very large + * values, e.g. {@code Integer.MAX_VALUE - 1024}, {@code Integer.MAX_VALUE - 100}, + * {@code Integer.MAX_VALUE}), preventing shutdown races. + *

+ * + *

+ * NOTE: if the ConnectionFactory’s phase is >= this value, raise it via + * {@link #setPhase(int)} to keep “SessionRepository phase > ConnectionFactory phase”. + *

+ * + * @see org.springframework.context.SmartLifecycle + * @see org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory + * @see org.springframework.data.redis.connection.jedis.JedisConnectionFactory + */ + public static final int DEFAULT_SMART_LIFECYCLE_PHASE = Integer.MAX_VALUE / 2; + private final ReactiveRedisOperations sessionRedisOperations; private final ReactiveRedisTemplate keyEventsOperations; @@ -281,6 +305,8 @@ public class ReactiveRedisIndexedSessionRepository private int database = DEFAULT_DATABASE; + private int phase = DEFAULT_SMART_LIFECYCLE_PHASE; + private ReactiveRedisSessionIndexer indexer; private SortedSetReactiveRedisSessionExpirationStore expirationStore; @@ -289,6 +315,8 @@ public class ReactiveRedisIndexedSessionRepository private Clock clock = Clock.systemUTC(); + private volatile boolean running = false; + /** * Creates a new instance with the provided {@link ReactiveRedisOperations}. * @param sessionRedisOperations the {@link ReactiveRedisOperations} to use for @@ -308,9 +336,20 @@ public ReactiveRedisIndexedSessionRepository(ReactiveRedisOperations cleanUpExpiredSessions() { - return this.expirationStore.retrieveExpiredSessions(this.clock.instant()).flatMap(this::touch); + return this.expirationStore.retrieveExpiredSessions(this.clock.instant()) + .filter((ignored) -> isRunning()) + .flatMap(this::touch); } private Mono touch(String sessionId) { @@ -333,11 +374,36 @@ private Mono touch(String sessionId) { } @Override - public void destroy() { + public void stop() { + if (!this.running) { + return; + } + for (Disposable subscription : this.subscriptions) { subscription.dispose(); } this.subscriptions.clear(); + + this.running = false; + } + + @Override + public void destroy() { + stop(); + } + + @Override + public boolean isRunning() { + return this.running; + } + + @Override + public int getPhase() { + return this.phase; + } + + public void setPhase(int phase) { + this.phase = phase; } @Override diff --git a/spring-session-data-redis/src/main/java/org/springframework/session/data/redis/RedisIndexedSessionRepository.java b/spring-session-data-redis/src/main/java/org/springframework/session/data/redis/RedisIndexedSessionRepository.java index 9a311b027..30ac561d7 100644 --- a/spring-session-data-redis/src/main/java/org/springframework/session/data/redis/RedisIndexedSessionRepository.java +++ b/spring-session-data-redis/src/main/java/org/springframework/session/data/redis/RedisIndexedSessionRepository.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2023 the original author or authors. + * Copyright 2014-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +32,7 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.SmartLifecycle; import org.springframework.core.NestedExceptionUtils; import org.springframework.dao.NonTransientDataAccessException; import org.springframework.data.redis.connection.Message; @@ -256,11 +257,12 @@ * * @author Rob Winch * @author Vedran Pavic + * @author Ham Seung Hun * @since 2.2.0 */ public class RedisIndexedSessionRepository implements FindByIndexNameSessionRepository, MessageListener, - InitializingBean, DisposableBean { + InitializingBean, DisposableBean, SmartLifecycle { private static final Log logger = LogFactory.getLog(RedisIndexedSessionRepository.class); @@ -281,8 +283,32 @@ public class RedisIndexedSessionRepository */ public static final String DEFAULT_NAMESPACE = "spring:session"; + /** + * The default SmartLifecycle phase. + * + *

+ * Set to {@code Integer.MAX_VALUE / 2} to position this repository between the Redis + * {@link org.springframework.data.redis.connection.RedisConnectionFactory} (typically + * small, e.g. {@code 0}) and web server / messaging listener containers (very large + * values, e.g. {@code Integer.MAX_VALUE - 1024}, {@code Integer.MAX_VALUE - 100}, + * {@code Integer.MAX_VALUE}), preventing shutdown races. + *

+ * + *

+ * NOTE: if the ConnectionFactory’s phase is >= this value, raise it via + * {@link #setPhase(int)} to keep “SessionRepository phase > ConnectionFactory phase”. + *

+ * + * @see org.springframework.context.SmartLifecycle + * @see org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory + * @see org.springframework.data.redis.connection.jedis.JedisConnectionFactory + */ + public static final int DEFAULT_SMART_LIFECYCLE_PHASE = Integer.MAX_VALUE / 2; + private int database = DEFAULT_DATABASE; + private int phase = DEFAULT_SMART_LIFECYCLE_PHASE; + /** * The namespace for every key used by Spring Session in Redis. */ @@ -329,6 +355,8 @@ public class RedisIndexedSessionRepository private BiFunction, MapSession> redisSessionMapper = new RedisSessionMapper(); + private volatile boolean running = false; + /** * Creates a new instance. For an example, refer to the class level javadoc. * @param sessionRedisOperations the {@link RedisOperations} to use for managing the @@ -343,12 +371,23 @@ public RedisIndexedSessionRepository(RedisOperations sessionRedi } @Override - public void afterPropertiesSet() { + public void start() { + if (this.running) { + return; + } + if (!Scheduled.CRON_DISABLED.equals(this.cleanupCron)) { this.taskScheduler = createTaskScheduler(); this.taskScheduler.initialize(); this.taskScheduler.schedule(this::cleanUpExpiredSessions, new CronTrigger(this.cleanupCron)); } + + this.running = true; + } + + @Override + public void afterPropertiesSet() { + start(); } private static ThreadPoolTaskScheduler createTaskScheduler() { @@ -358,10 +397,35 @@ private static ThreadPoolTaskScheduler createTaskScheduler() { } @Override - public void destroy() { + public void stop() { + if (!this.running) { + return; + } + if (this.taskScheduler != null) { this.taskScheduler.destroy(); } + + this.running = false; + } + + @Override + public void destroy() { + stop(); + } + + @Override + public boolean isRunning() { + return this.running; + } + + @Override + public int getPhase() { + return this.phase; + } + + public void setPhase(int phase) { + this.phase = phase; } /** @@ -486,6 +550,10 @@ public void save(RedisSession session) { } public void cleanUpExpiredSessions() { + if (!isRunning()) { + return; + } + this.expirationPolicy.cleanExpiredSessions(); } diff --git a/spring-session-data-redis/src/test/java/org/springframework/session/data/redis/ReactiveRedisIndexedSessionRepositoryTests.java b/spring-session-data-redis/src/test/java/org/springframework/session/data/redis/ReactiveRedisIndexedSessionRepositoryTests.java new file mode 100644 index 000000000..4a2696055 --- /dev/null +++ b/spring-session-data-redis/src/test/java/org/springframework/session/data/redis/ReactiveRedisIndexedSessionRepositoryTests.java @@ -0,0 +1,256 @@ +/* + * Copyright 2014-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.session.data.redis; + +import java.time.Duration; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import org.springframework.data.redis.core.ReactiveRedisOperations; +import org.springframework.data.redis.core.ReactiveRedisTemplate; +import org.springframework.test.util.ReflectionTestUtils; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +class ReactiveRedisIndexedSessionRepositoryTests { + + @Mock + private ReactiveRedisOperations sessionRedisOperations; + + @Mock + private ReactiveRedisTemplate keyEventsOperations; + + private ReactiveRedisIndexedSessionRepository repository; + + @BeforeEach + void setup() { + this.repository = new ReactiveRedisIndexedSessionRepository(this.sessionRedisOperations, + this.keyEventsOperations); + } + + @Test + void startShouldSetRunningTrue() { + givenSubscriptionsEmpty(); + + this.repository.start(); + + assertThat(this.repository.isRunning()).isTrue(); + } + + @Test + void startShouldBeIdempotent() { + givenSubscriptionsEmpty(); + + this.repository.start(); + this.repository.start(); + + assertThat(this.repository.isRunning()).isTrue(); + + verify(this.sessionRedisOperations, times(1)).listenToPattern(anyString()); + verify(this.keyEventsOperations, times(1)).listenToChannel(anyString(), anyString()); + } + + @Test + void stopShouldSetRunningFalse() { + givenSubscriptionsEmpty(); + + this.repository.start(); + this.repository.stop(); + + assertThat(this.repository.isRunning()).isFalse(); + } + + @Test + void stopShouldBeIdempotent() { + givenSubscriptionsEmpty(); + + this.repository.start(); + this.repository.stop(); + this.repository.stop(); + assertThat(this.repository.isRunning()).isFalse(); + } + + @Test + void isAutoStartupShouldReturnTrue() { + assertThat(this.repository.isAutoStartup()).isTrue(); + } + + @Test + void getPhaseShouldReturnPhase() { + this.repository.setPhase(100); + + assertThat(this.repository.getPhase()).isEqualTo(100); + } + + @Test + void startShouldCreateSubscriptions() { + givenSubscriptionsNever(); + + this.repository.start(); + + List subscriptions = getSubscriptions(); + assertThat(subscriptions).isNotEmpty(); + + subscriptions.forEach((sub) -> assertThat(sub.isDisposed()).isFalse()); + } + + @Test + void stopShouldDisposeAllSubscriptions() { + givenSubscriptionsEmpty(); + + this.repository.start(); + + List subscriptionsBeforeStop = getSubscriptions(); + assertThat(subscriptionsBeforeStop).isNotEmpty(); + + this.repository.stop(); + + subscriptionsBeforeStop.forEach((sub) -> assertThat(sub.isDisposed()).isTrue()); + + List subscriptionsAfterStop = getSubscriptions(); + assertThat(subscriptionsAfterStop).isEmpty(); + } + + @Test + void cleanUpExpiredSessionsShouldRespectRunningState() { + this.repository.stop(); + + Flux result = invokeCleanUpExpiredSessions(); + + StepVerifier.create(result).verifyComplete(); + } + + @Test + void cleanUpExpiredSessionsWhenRunning() { + givenSubscriptionsEmpty(); + + this.repository.start(); + + assertThat(this.repository.isRunning()).isTrue(); + + Flux result = invokeCleanUpExpiredSessions(); + + StepVerifier.create(result).verifyComplete(); + } + + @Test + void destroyShouldCallStop() { + givenSubscriptionsEmpty(); + + this.repository.start(); + this.repository.destroy(); + assertThat(this.repository.isRunning()).isFalse(); + } + + @Test + void afterPropertiesSetShouldCallStart() throws Exception { + givenSubscriptionsEmpty(); + + this.repository.afterPropertiesSet(); + assertThat(this.repository.isRunning()).isTrue(); + } + + @Test + void cleanupIntervalZeroShouldNotCreateCleanupTask() { + givenSubscriptionsEmpty(); + + this.repository.setCleanupInterval(Duration.ZERO); + this.repository.start(); + + List subscriptions = getSubscriptions(); + + assertThat(subscriptions).hasSize(2); + } + + @Test + void cleanupIntervalNonZeroShouldCreateCleanupTask() { + givenSubscriptionsEmpty(); + + this.repository.setCleanupInterval(Duration.ofSeconds(10)); + this.repository.start(); + + List subscriptions = getSubscriptions(); + + assertThat(subscriptions).hasSize(3); + } + + @Test + void stopWhenNotRunningNotDisposeSubscriptions() { + this.repository.stop(); + + assertThat(this.repository.isRunning()).isFalse(); + + List subscriptions = getSubscriptions(); + assertThat(subscriptions).isEmpty(); + } + + @Test + void multipleStartStopCyclesShouldWorkCorrectly() { + givenSubscriptionsNever(); + + this.repository.start(); + assertThat(this.repository.isRunning()).isTrue(); + + this.repository.stop(); + assertThat(this.repository.isRunning()).isFalse(); + + this.repository.start(); + assertThat(this.repository.isRunning()).isTrue(); + + List subscriptions = getSubscriptions(); + assertThat(subscriptions).isNotEmpty(); + subscriptions.forEach((sub) -> assertThat(sub.isDisposed()).isFalse()); + } + + private void givenSubscriptionsNever() { + given(this.sessionRedisOperations.listenToPattern(anyString())).willReturn(Flux.never()); + given(this.keyEventsOperations.listenToChannel(anyString(), anyString())).willReturn(Flux.never()); + } + + private void givenSubscriptionsEmpty() { + given(this.sessionRedisOperations.listenToPattern(anyString())).willReturn(Flux.empty()); + given(this.keyEventsOperations.listenToChannel(anyString(), anyString())).willReturn(Flux.empty()); + } + + @SuppressWarnings("unchecked") + private List getSubscriptions() { + return (List) ReflectionTestUtils.getField(this.repository, "subscriptions"); + } + + private Flux invokeCleanUpExpiredSessions() { + try { + return ReflectionTestUtils.invokeMethod(this.repository, "cleanUpExpiredSessions"); + } + catch (Exception ex) { + return Flux.empty(); + } + } + +} diff --git a/spring-session-data-redis/src/test/java/org/springframework/session/data/redis/RedisIndexedSessionRepositoryTests.java b/spring-session-data-redis/src/test/java/org/springframework/session/data/redis/RedisIndexedSessionRepositoryTests.java index 3303eebbf..2c40e1c91 100644 --- a/spring-session-data-redis/src/test/java/org/springframework/session/data/redis/RedisIndexedSessionRepositoryTests.java +++ b/spring-session-data-redis/src/test/java/org/springframework/session/data/redis/RedisIndexedSessionRepositoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2023 the original author or authors. + * Copyright 2014-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -449,6 +449,8 @@ void cleanupExpiredSessions() { Set expiredIds = new HashSet<>(Arrays.asList("expired-key1", "expired-key2")); given(this.boundSetOperations.members()).willReturn(expiredIds); + this.redisRepository.start(); + this.redisRepository.cleanUpExpiredSessions(); for (Object id : expiredIds) { @@ -765,6 +767,60 @@ void setCleanupCronDisabled() { assertThat(this.redisRepository).extracting("taskScheduler").isNull(); } + @Test + void startShouldSetRunningTrue() { + this.redisRepository.start(); + assertThat(this.redisRepository.isRunning()).isTrue(); + } + + @Test + void startShouldBeIdempotent() { + this.redisRepository.start(); + this.redisRepository.start(); + assertThat(this.redisRepository.isRunning()).isTrue(); + } + + @Test + void stopShouldSetRunningFalse() { + this.redisRepository.start(); + this.redisRepository.stop(); + assertThat(this.redisRepository.isRunning()).isFalse(); + } + + @Test + void stopShouldBeIdempotent() { + this.redisRepository.start(); + this.redisRepository.stop(); + this.redisRepository.stop(); + assertThat(this.redisRepository.isRunning()).isFalse(); + } + + @Test + void isAutoStartupShouldReturnTrue() { + assertThat(this.redisRepository.isAutoStartup()).isTrue(); + } + + @Test + void getPhaseShouldReturnPhase() { + this.redisRepository.setPhase(100); + + assertThat(this.redisRepository.getPhase()).isEqualTo(100); + } + + @Test + void cleanUpExpiredSessionsShouldRespectRunningState() { + this.redisRepository.stop(); + this.redisRepository.cleanUpExpiredSessions(); + verifyNoMoreInteractions(this.redisOperations); + } + + @Test + void destroyShouldCallStop() { + this.redisRepository.start(); + this.redisRepository.destroy(); + assertThat(this.redisRepository.isRunning()).isFalse(); + } + @Test void changeRedisNamespace() { String namespace = "foo:bar";