Skip to content

Commit 7285ca3

Browse files
committed
fix: reworked on OnAfterRoomEnded to handle session termination more efficiently
1 parent d47b550 commit 7285ca3

File tree

3 files changed

+75
-34
lines changed

3 files changed

+75
-34
lines changed

pkg/models/room_end.go

Lines changed: 72 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,21 @@ func (m *RoomModel) EndRoom(ctx context.Context, r *plugnmeet.RoomEndReq) (bool,
2727
}
2828
log.Info("Proceeding to end room")
2929

30-
// Fetch room information from the database.
31-
roomDbInfo, err := m.ds.GetRoomInfoByRoomId(roomID, 1)
32-
if err != nil {
33-
return false, err.Error()
34-
}
35-
if roomDbInfo == nil || roomDbInfo.ID == 0 {
36-
return false, "room not found in DB or not active"
37-
}
38-
39-
// Fetch the live room state from the NATS key-value store.
30+
// Fetch the live room state from the NATS key-value store first.
4031
info, err := m.natsService.GetRoomInfo(roomID)
4132
if err != nil {
42-
log.WithError(err).Warn("NATS GetRoomInfo failed during EndRoom. Proceeding with DB cleanup.")
33+
log.WithError(err).Warn("NATS GetRoomInfo failed during EndRoom. Falling back to DB check.")
4334
}
44-
// Handle cases where the room exists in the DB but not in NATS.
35+
4536
if info == nil {
37+
// If NATS fails or room not in NATS, check the database.
38+
roomDbInfo, dbErr := m.ds.GetRoomInfoByRoomId(roomID, 1) // Using 1 for active
39+
if dbErr != nil {
40+
return false, dbErr.Error()
41+
}
42+
if roomDbInfo == nil || roomDbInfo.ID == 0 {
43+
return false, "room not found in DB or not active"
44+
}
4645
if roomDbInfo.IsRunning == 1 {
4746
log.Warn("Room active in DB but not in NATS during EndRoom. Marking as ended and cleaning up.")
4847
go m.OnAfterRoomEnded(roomDbInfo.ID, roomDbInfo.RoomId, roomDbInfo.Sid, "", "") // Metadata might be empty
@@ -51,16 +50,14 @@ func (m *RoomModel) EndRoom(ctx context.Context, r *plugnmeet.RoomEndReq) (bool,
5150
}
5251

5352
// Temporarily cache the live room data in Redis.
54-
// This serves as a fallback in case the 'room_finished' webhook from LiveKit is delayed.
5553
m.rs.HoldTemporaryRoomData(info)
5654

57-
// Broadcast a 'SESSION_ENDED' event to all clients in the room to notify them.
58-
err = m.natsService.BroadcastSystemEventToRoom(plugnmeet.NatsMsgServerToClientEvents_SESSION_ENDED, roomID, "notifications.room-disconnected-room-ended", nil)
59-
if err != nil {
55+
// Broadcast a 'SESSION_ENDED' event to all clients in the room.
56+
if err = m.natsService.BroadcastSystemEventToRoom(plugnmeet.NatsMsgServerToClientEvents_SESSION_ENDED, roomID, "notifications.room-disconnected-room-ended", nil); err != nil {
6057
log.WithError(err).Error("error sending session ended notification message")
6158
}
6259

63-
// Trigger the main asynchronous cleanup process in a separate goroutine.
60+
// Trigger the main asynchronous cleanup process.
6461
go m.OnAfterRoomEnded(info.DbTableId, info.RoomId, info.RoomSid, info.Metadata, info.Status)
6562
return true, "success"
6663
}
@@ -76,8 +73,7 @@ func (m *RoomModel) OnAfterRoomEnded(dbTableId uint64, roomID, roomSID, metadata
7673

7774
if roomStatus != natsservice.RoomStatusEnded {
7875
// update status immediately to prevent user to join
79-
err := m.natsService.UpdateRoomStatus(roomID, natsservice.RoomStatusTriggeredEnd)
80-
if err != nil {
76+
if err := m.natsService.UpdateRoomStatus(roomID, natsservice.RoomStatusTriggeredEnd); err != nil {
8177
log.WithError(err).Error("error updating room status")
8278
}
8379
}
@@ -106,9 +102,11 @@ func (m *RoomModel) OnAfterRoomEnded(dbTableId uint64, roomID, roomSID, metadata
106102
}
107103
}()
108104

109-
// to avoid race condition better wait few seconds
110-
// so that all the users got disconnect properly
111-
time.Sleep(config.WaitBeforeTriggerOnAfterRoomEnded)
105+
// Wait for all users to disconnect before proceeding.
106+
m.waitForAllUsersToDisconnect(roomID)
107+
108+
// send session_ended webhook before ending room in livekit
109+
m.sendSessionEndedWebhook(roomID, roomSID)
112110

113111
if roomStatus != natsservice.RoomStatusEnded {
114112
err := m.natsService.UpdateRoomStatus(roomID, natsservice.RoomStatusEnded)
@@ -177,3 +175,55 @@ func (m *RoomModel) OnAfterRoomEnded(dbTableId uint64, roomID, roomSID, metadata
177175
m.analyticsModel.PrepareToExportAnalytics(roomID, roomSID, metadata)
178176
})
179177
}
178+
179+
// waitForAllUsersToDisconnect waits for all users in a room to disconnect.
180+
// It checks periodically and times out after a configured duration.
181+
func (m *RoomModel) waitForAllUsersToDisconnect(roomID string) {
182+
log := m.logger.WithField("room_id", roomID)
183+
totalWait := config.WaitBeforeTriggerOnAfterRoomEnded
184+
interval := 1 * time.Second // Check every second
185+
186+
timeout := time.After(totalWait)
187+
ticker := time.NewTicker(interval)
188+
defer ticker.Stop()
189+
190+
log.Infof("Waiting up to %s for all users to disconnect.", totalWait)
191+
192+
for {
193+
select {
194+
case <-timeout:
195+
log.Warn("Timed out waiting for all users to disconnect. Proceeding with cleanup.")
196+
return
197+
case <-ticker.C:
198+
onlineUsers, err := m.natsService.GetOnlineUsersId(roomID)
199+
if err != nil {
200+
log.WithError(err).Warn("Failed to get online user list while waiting for disconnect. Proceeding with cleanup.")
201+
return // Exit if we can't check users
202+
}
203+
204+
if onlineUsers == nil || len(onlineUsers) == 0 {
205+
log.Info("All users have disconnected. Proceeding with cleanup.")
206+
return // All users are gone, exit loop
207+
}
208+
log.Infof("Waiting for %d user(s) to disconnect...", len(onlineUsers))
209+
}
210+
}
211+
}
212+
213+
// sendSessionEndedWebhook to send webhook
214+
func (m *RoomModel) sendSessionEndedWebhook(roomId, roomSid string) {
215+
if m.webhookNotifier != nil {
216+
e := "session_ended"
217+
msg := &plugnmeet.CommonNotifyEvent{
218+
Event: &e,
219+
Room: &plugnmeet.NotifyEventRoom{
220+
RoomId: &roomId,
221+
Sid: &roomSid,
222+
},
223+
}
224+
225+
if err := m.webhookNotifier.SendWebhookEvent(msg); err != nil {
226+
m.logger.WithError(err).Errorln("error sending session ended webhook")
227+
}
228+
}
229+
}

pkg/models/webhook_room.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -109,27 +109,19 @@ func (m *WebhookModel) roomFinished(event *livekit.WebhookEvent) {
109109
event.Room.MaxParticipants = uint32(rInfo.MaxParticipants)
110110
event.Room.EmptyTimeout = uint32(rInfo.EmptyTimeout)
111111

112-
// we are introducing a new event name here
113-
// because for our case we still have remaining tasks
114-
m.sendCustomTypeWebhook(event, "session_ended")
115-
116112
if rInfo.Status != natsservice.RoomStatusEnded {
117113
// This means the room was ended directly by LiveKit (e.g., empty timeout),
118114
// not through the plugNmeet API. We need to trigger our cleanup flow.
119115
log.Warnln("room was not ended via API, triggering plugNmeet EndRoom flow")
120116

121117
// change status to ended
122-
err = m.natsService.UpdateRoomStatus(rInfo.RoomId, natsservice.RoomStatusEnded)
123-
if err != nil {
118+
if err = m.natsService.UpdateRoomStatus(rInfo.RoomId, natsservice.RoomStatusEnded); err != nil {
124119
log.WithError(err).Errorln("failed to update room status")
125120
}
126121
// end the room in the proper plugNmeet way
127122
m.rm.EndRoom(m.ctx, &plugnmeet.RoomEndReq{RoomId: rInfo.RoomId})
128123
}
129124

130-
// now we'll perform a few service related tasks
131-
// time.Sleep(time.Second)
132-
133125
// at the end we'll handle event notification
134126
m.sendToWebhookNotifier(event)
135127

pkg/services/redis/room.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,9 @@ func (s *RedisService) HoldTemporaryRoomData(info *plugnmeet.NatsKvRoomInfo) {
2424
log.WithError(err).Errorln("marshalling failed")
2525
return
2626
}
27-
key := fmt.Sprintf(temporaryRoomData, info.RoomId)
2827

29-
err = s.rc.SetNX(s.ctx, key, marshal, time.Minute*1).Err()
30-
if err != nil {
28+
key := fmt.Sprintf(temporaryRoomData, info.RoomId)
29+
if err = s.rc.SetNX(s.ctx, key, marshal, time.Minute*1).Err(); err != nil {
3130
log.WithError(err).Errorln("SetNX failed")
3231
}
3332
}

0 commit comments

Comments
 (0)