Skip to content

Commit 87f8a27

Browse files
committed
fix: handle user disconnection more efficiently
1 parent 7285ca3 commit 87f8a27

File tree

1 file changed

+49
-17
lines changed

1 file changed

+49
-17
lines changed

pkg/models/nats_user_connection.go

Lines changed: 49 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -103,20 +103,17 @@ func (m *NatsModel) OnAfterUserDisconnected(roomId, userId string) {
103103
go m.handleDelayedOfflineTasks(roomId, userId, userInfo, log)
104104
}
105105

106-
// handleDelayedOfflineTasks manages the grace period for user reconnection and subsequent cleanup.
106+
// handleDelayedOfflineTasks manages the grace period for user reconnection and subsequent cleanup using periodic checks.
107107
func (m *NatsModel) handleDelayedOfflineTasks(roomId, userId string, userInfo *plugnmeet.NatsKvUserInfo, log *logrus.Entry) {
108108
log = log.WithField("subMethod", "handleDelayedOfflineTasks")
109-
log.Info("starting delayed offline tasks")
109+
log.Info("starting delayed offline tasks with periodic checks")
110110

111111
// get TurnCredentials to use it later otherwise it may cleaned up if room ended
112112
turnCreds, _ := m.natsService.GetUserTurnCredentials(roomId, userId)
113113

114-
// Stage 1: Wait for the reconnection grace period.
115-
time.Sleep(5 * time.Second)
116-
117-
status, err := m.natsService.GetRoomUserStatus(roomId, userId)
118-
if err == nil && status == natsservice.UserStatusOnline {
119-
// User reconnected, do nothing.
114+
// Stage 1: Wait for the reconnection grace period (5s), checking every second.
115+
reconnected, roomEnded := m.waitForReconnect(roomId, userId, 5*time.Second, 1*time.Second, log)
116+
if reconnected {
120117
log.Info("user reconnected within grace period, aborting offline tasks")
121118
return
122119
}
@@ -126,7 +123,7 @@ func (m *NatsModel) handleDelayedOfflineTasks(roomId, userId string, userInfo *p
126123

127124
// Broadcast the final offline status.
128125
if userInfo != nil {
129-
if err = m.natsService.BroadcastSystemEventToEveryoneExceptUserId(plugnmeet.NatsMsgServerToClientEvents_USER_OFFLINE, roomId, userInfo, userId); err != nil {
126+
if err := m.natsService.BroadcastSystemEventToEveryoneExceptUserId(plugnmeet.NatsMsgServerToClientEvents_USER_OFFLINE, roomId, userInfo, userId); err != nil {
130127
if !errors.Is(err, config.NoOnlineUserFound) {
131128
log.WithError(err).Warn("failed to broadcast USER_OFFLINE event")
132129
}
@@ -136,15 +133,17 @@ func (m *NatsModel) handleDelayedOfflineTasks(roomId, userId string, userInfo *p
136133
_ = m.natsService.BroadcastSystemEventToEveryoneExceptUserId(plugnmeet.NatsMsgServerToClientEvents_USER_OFFLINE, roomId, &plugnmeet.NatsKvUserInfo{UserId: userId}, userId)
137134
}
138135

139-
// Stage 2: Wait a bit longer before cleaning up resources.
140-
time.Sleep(30 * time.Second)
141-
142-
status, err = m.natsService.GetRoomUserStatus(roomId, userId)
143-
if err == nil && status == natsservice.UserStatusOnline {
144-
// User reconnected, do not delete consumer.
145-
log.Info("user reconnected before final cleanup, consumer will not be deleted")
146-
return
136+
// If the room ended during Stage 1, skip Stage 2 and go straight to cleanup.
137+
if roomEnded {
138+
log.Info("room ended during grace period, skipping second wait and proceeding to final cleanup")
139+
} else {
140+
// Stage 2: Wait a bit longer (30s) before cleaning up, checking for changes every 5 seconds.
141+
if reconnected, _ = m.waitForReconnect(roomId, userId, 30*time.Second, 5*time.Second, log); reconnected {
142+
log.Info("user reconnected before final cleanup, consumer will not be deleted")
143+
return
144+
}
147145
}
146+
148147
// also try to silently remove this user from livekit as well
149148
_, _ = m.lk.RemoveParticipant(roomId, userId)
150149

@@ -159,6 +158,39 @@ func (m *NatsModel) handleDelayedOfflineTasks(roomId, userId string, userInfo *p
159158
log.Info("user offline tasks completed")
160159
}
161160

161+
// waitForReconnect periodically checks for user reconnection or if the room has ended.
162+
// Returns (reconnected bool, roomEnded bool).
163+
func (m *NatsModel) waitForReconnect(roomId, userId string, totalWait, interval time.Duration, log *logrus.Entry) (reconnected bool, roomEnded bool) {
164+
ticker := time.NewTicker(interval)
165+
defer ticker.Stop()
166+
timeout := time.After(totalWait)
167+
168+
for {
169+
select {
170+
case <-timeout:
171+
// Wait period is over. User did not reconnect, and we didn't detect the room ending.
172+
return false, false
173+
case <-ticker.C:
174+
// 1. Check if user reconnected (highest priority)
175+
status, err := m.natsService.GetRoomUserStatus(roomId, userId)
176+
if err == nil && status == natsservice.UserStatusOnline {
177+
return true, false // User reconnected.
178+
}
179+
180+
// 2. Check if room has ended.
181+
roomInfo, _ := m.natsService.GetRoomInfo(roomId) // Ignore error, nil info is a valid signal.
182+
if roomInfo == nil {
183+
log.Info("room info not found, assuming it has ended.")
184+
return false, true // Room has ended.
185+
}
186+
if roomInfo.Status == natsservice.RoomStatusEnded || roomInfo.Status == natsservice.RoomStatusTriggeredEnd {
187+
log.Info("room has ended, proceeding to next cleanup step")
188+
return false, true // Room has ended.
189+
}
190+
}
191+
}
192+
}
193+
162194
func (m *NatsModel) revokeTurnCredentials(creds *turn.Credentials, log *logrus.Entry) {
163195
ctx, cancel := context.WithTimeout(m.app.GetApplicationCtx(), 5*time.Second)
164196
defer cancel()

0 commit comments

Comments
 (0)