Skip to content

Commit 1de0990

Browse files
committed
Handle session purging on the SessionLoop
1 parent 33b7781 commit 1de0990

File tree

1 file changed

+12
-4
lines changed

1 file changed

+12
-4
lines changed

broker/src/main/java/io/moquette/broker/SessionRegistry.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,10 +242,18 @@ public SessionCreationResult(Session session, CreationModeEnum mode, boolean alr
242242

243243
private void removeExpiredSession(ISessionsRepository.SessionData expiredSession) {
244244
final String expiredAt = expiredSession.expireAt().map(Instant::toString).orElse("UNDEFINED");
245-
LOG.debug("Removing session {}, expired on {}", expiredSession.clientId(), expiredAt);
246-
remove(pool.get(expiredSession.clientId()));
247-
sessionsRepository.delete(expiredSession);
248-
subscriptionsDirectory.removeSharedSubscriptionsForClient(expiredSession.clientId());
245+
final String clientId = expiredSession.clientId();
246+
loopsGroup.routeCommand(clientId, "PurgeSession", () -> {
247+
LOG.debug("Removing session {}, expired on {}", clientId, expiredAt);
248+
final Session session = pool.get(clientId);
249+
boolean success = session.assignState(SessionStatus.DISCONNECTED, SessionStatus.DESTROYED);
250+
if (!success) {
251+
remove(session);
252+
sessionsRepository.delete(expiredSession);
253+
subscriptionsDirectory.removeSharedSubscriptionsForClient(clientId);
254+
}
255+
return null;
256+
});
249257
}
250258

251259
private void trackForRemovalOnExpiration(ISessionsRepository.SessionData session) {

0 commit comments

Comments
 (0)