Skip to content

Commit 83c0535

Browse files
committed
fix(critical): duplicate event was sending because of using same subject for both JS and Core pub/sub
1 parent a540966 commit 83c0535

File tree

11 files changed

+125
-98
lines changed

11 files changed

+125
-98
lines changed

config_sample.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ nats_info:
152152
room_stream_name: "pnm-room-stream"
153153
subjects:
154154
system_api_worker: "sysApiWorker"
155-
system_js_worker: "sysJsWorker"
155+
system_js_worker: "sysJsWorker" # for JetStream
156+
system_core_worker: "sysCoreWorker" # for Core pub/sub
156157
system_public: "sysPublic"
157158
system_private: "sysPrivate"
158159
chat: "chat"

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@ require (
2020
github.com/livekit/media-sdk v0.0.0-20251106223430-dd8f5e0de2cf
2121
github.com/livekit/protocol v1.45.1
2222
github.com/livekit/server-sdk-go/v2 v2.16.1
23-
github.com/mynaparrot/plugnmeet-protocol v1.2.2-0.20260406140131-e4cdbb15b0fc
23+
github.com/mynaparrot/plugnmeet-protocol v1.2.2-0.20260409110802-d201b3fecf6e
2424
github.com/nats-io/jwt/v2 v2.8.1
2525
github.com/nats-io/nats.go v1.50.0
2626
github.com/nats-io/nkeys v0.4.15
2727
github.com/pion/webrtc/v4 v4.2.11
2828
github.com/redis/go-redis/v9 v9.18.0
2929
github.com/sirupsen/logrus v1.9.4
3030
golang.org/x/sync v0.20.0
31-
google.golang.org/genai v1.52.0
31+
google.golang.org/genai v1.52.1
3232
google.golang.org/protobuf v1.36.11
3333
gopkg.in/yaml.v3 v3.0.1
3434
gorm.io/driver/mysql v1.6.0

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,8 @@ github.com/moby/term v0.5.2 h1:6qk3FJAFDs6i/q3W/pQ97SX192qKfZgGjCQqfCJkgzQ=
197197
github.com/moby/term v0.5.2/go.mod h1:d3djjFCrjnB+fl8NJux+EJzu0msscUP+f8it8hPkFLc=
198198
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
199199
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
200-
github.com/mynaparrot/plugnmeet-protocol v1.2.2-0.20260406140131-e4cdbb15b0fc h1:bKpNiWlAYuBlhDMDuzUw9pRgL0au2dLfqGN+42wWrY8=
201-
github.com/mynaparrot/plugnmeet-protocol v1.2.2-0.20260406140131-e4cdbb15b0fc/go.mod h1:ez9i2JGlIHWTB58Cn/IZAW0alCw7VPGFWwrCpKnySo4=
200+
github.com/mynaparrot/plugnmeet-protocol v1.2.2-0.20260409110802-d201b3fecf6e h1:d6VIobdvTjH/JGq0F3P/on+ngEe0cFLx5IOIiM4sThE=
201+
github.com/mynaparrot/plugnmeet-protocol v1.2.2-0.20260409110802-d201b3fecf6e/go.mod h1:ez9i2JGlIHWTB58Cn/IZAW0alCw7VPGFWwrCpKnySo4=
202202
github.com/nats-io/jwt/v2 v2.8.1 h1:V0xpGuD/N8Mi+fQNDynXohVvp7ZztevW5io8CUWlPmU=
203203
github.com/nats-io/jwt/v2 v2.8.1/go.mod h1:nWnOEEiVMiKHQpnAy4eXlizVEtSfzacZ1Q43LIRavZg=
204204
github.com/nats-io/nats.go v1.50.0 h1:5zAeQrTvyrKrWLJ0fu02W3br8ym57qf7csDzgLOpcds=
@@ -385,8 +385,8 @@ golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
385385
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
386386
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
387387
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
388-
google.golang.org/genai v1.52.0 h1:ekVIxWHtLUNbt+v0WWi4j3JT4yrHDEbysMcHQcaCQoI=
389-
google.golang.org/genai v1.52.0/go.mod h1:A3kkl0nyBjyFlNjgxIwKq70julKbIxpSxqKO5gw/gmk=
388+
google.golang.org/genai v1.52.1 h1:dYoljKtLDXMiBdVaClSJ/ZPwZ7j1N0lGjMhwOKOQUlk=
389+
google.golang.org/genai v1.52.1/go.mod h1:A3kkl0nyBjyFlNjgxIwKq70julKbIxpSxqKO5gw/gmk=
390390
google.golang.org/genproto/googleapis/api v0.0.0-20260319201613-d00831a3d3e7 h1:41r6JMbpzBMen0R/4TZeeAmGXSJC7DftGINUodzTkPI=
391391
google.golang.org/genproto/googleapis/api v0.0.0-20260319201613-d00831a3d3e7/go.mod h1:EIQZ5bFCfRQDV4MhRle7+OgjNtZ6P1PiZBgAKuxXu/Y=
392392
google.golang.org/genproto/googleapis/rpc v0.0.0-20260319201613-d00831a3d3e7 h1:ndE4FoJqsIceKP2oYSnUZqhTdYufCYYkqwtFzfrhI7w=

pkg/config/config.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -218,13 +218,14 @@ type NatsInfo struct {
218218
}
219219

220220
type NatsSubjects struct {
221-
SystemApiWorker string `yaml:"system_api_worker"`
222-
SystemJsWorker string `yaml:"system_js_worker"`
223-
SystemPublic string `yaml:"system_public"`
224-
SystemPrivate string `yaml:"system_private"`
225-
Chat string `yaml:"chat"`
226-
Whiteboard string `yaml:"whiteboard"`
227-
DataChannel string `yaml:"data_channel"`
221+
SystemApiWorker string `yaml:"system_api_worker"`
222+
SystemJsWorker string `yaml:"system_js_worker"` // jetstream worker
223+
SystemCoreWorker string `yaml:"system_core_worker"` // core pub/sub worker
224+
SystemPublic string `yaml:"system_public"`
225+
SystemPrivate string `yaml:"system_private"`
226+
Chat string `yaml:"chat"`
227+
Whiteboard string `yaml:"whiteboard"`
228+
DataChannel string `yaml:"data_channel"`
228229
}
229230

230231
type NatsInfoRecorder struct {
@@ -243,6 +244,9 @@ func New(ctx context.Context, appCnf *AppConfig) (*AppConfig, error) {
243244
if appCnf.NatsInfo.RoomStreamName == "" {
244245
appCnf.NatsInfo.RoomStreamName = "pnm-room-stream"
245246
}
247+
if appCnf.NatsInfo.Subjects.SystemCoreWorker == "" {
248+
appCnf.NatsInfo.Subjects.SystemCoreWorker = "sysCoreWorker"
249+
}
246250

247251
// set default values
248252
if appCnf.AnalyticsSettings != nil {

pkg/controllers/auth.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -162,13 +162,14 @@ func (ac *AuthController) HandleVerifyToken(c *fiber.Ctx) error {
162162
UserId: &requestedUserId,
163163
RoomStreamName: &ac.AppConfig.NatsInfo.RoomStreamName,
164164
NatsSubjects: &plugnmeet.NatsSubjects{
165-
SystemApiWorker: natsSubjs.SystemApiWorker,
166-
SystemJsWorker: natsSubjs.SystemJsWorker,
167-
SystemPublic: natsSubjs.SystemPublic,
168-
SystemPrivate: natsSubjs.SystemPrivate,
169-
Chat: natsSubjs.Chat,
170-
Whiteboard: natsSubjs.Whiteboard,
171-
DataChannel: natsSubjs.DataChannel,
165+
SystemApiWorker: natsSubjs.SystemApiWorker,
166+
SystemJsWorker: natsSubjs.SystemJsWorker,
167+
SystemCoreWorker: natsSubjs.SystemCoreWorker,
168+
SystemPublic: natsSubjs.SystemPublic,
169+
SystemPrivate: natsSubjs.SystemPrivate,
170+
Chat: natsSubjs.Chat,
171+
Whiteboard: natsSubjs.Whiteboard,
172+
DataChannel: natsSubjs.DataChannel,
172173
},
173174
EnabledSelfInsertEncryptionKey: &enabledSelfInsertEncryptionKey,
174175
}

pkg/controllers/nats.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -209,22 +209,20 @@ func (c *NatsController) handleUserConnectionEvent(data []byte, isConnect bool)
209209
}
210210
if claims.GetName() != config.RecorderUserAuthName {
211211
if isConnect {
212-
c.natsModel.OnAfterUserJoined(claims.GetRoomId(), claims.GetUserId())
212+
c.natsModel.OnAfterUserJoined(claims.GetRoomId(), claims.GetUserId(), "handleUserConnectionEvent")
213213
} else {
214-
c.natsModel.OnAfterUserDisconnected(claims.GetRoomId(), claims.GetUserId())
214+
c.natsModel.OnAfterUserDisconnected(claims.GetRoomId(), claims.GetUserId(), "handleUserConnectionEvent")
215215
}
216216
}
217217
}
218218
}
219219

220-
// subscribeToSystemWorkerCore subscribes to the system worker subject via core NATS pub/sub.
220+
// subscribeToSystemWorkerCore subscribes to the SystemCoreWorker subject via core NATS pub/sub.
221221
// This is intended for lightweight, fire-and-forget messages (e.g., analytics) that don't require JetStream's guarantees.
222-
// It runs in parallel with the JetStream consumer.
223222
func (c *NatsController) subscribeToSystemWorkerCore() (*nats.Subscription, error) {
224-
subject := fmt.Sprintf("%s.*.*", c.app.NatsInfo.Subjects.SystemJsWorker)
223+
subject := fmt.Sprintf("%s.*.*", c.app.NatsInfo.Subjects.SystemCoreWorker)
225224
// Use a queue group to load-balance across multiple server instances.
226-
// The name is derived from the JetStream consumer for consistency.
227-
queue := fmt.Sprintf("%s%s-core", prefix, c.app.NatsInfo.Subjects.SystemJsWorker)
225+
queue := fmt.Sprintf("%s%s", prefix, c.app.NatsInfo.Subjects.SystemCoreWorker)
228226

229227
return c.app.NatsConn.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
230228
// Copy data to avoid race conditions as the message buffer is reused.

pkg/controllers/nats_auth_controller.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ func (s *NatsAuthController) setPermissionForClient(data *plugnmeet.PlugNmeetTok
163163

164164
// permission to publish messages to the system (JetStream)
165165
fmt.Sprintf("%s.%s.%s", s.app.NatsInfo.Subjects.SystemJsWorker, roomId, userId),
166+
// permission to publish messages to the system (core pub/sub)
167+
fmt.Sprintf("%s.%s.%s", s.app.NatsInfo.Subjects.SystemCoreWorker, roomId, userId),
166168
// permission to publish in core pub/sub
167169
fmt.Sprintf("%s.%s", s.app.NatsInfo.Subjects.Chat, roomId),
168170
fmt.Sprintf("%s.%s", s.app.NatsInfo.Subjects.Whiteboard, roomId),

pkg/models/nats.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/mynaparrot/plugnmeet-server/pkg/services/redis"
1010
turnservice "github.com/mynaparrot/plugnmeet-server/pkg/services/turn"
1111
"github.com/sirupsen/logrus"
12+
"golang.org/x/sync/singleflight"
1213
"google.golang.org/protobuf/encoding/protojson"
1314
)
1415

@@ -23,6 +24,7 @@ type NatsModel struct {
2324
userModel *UserModel
2425
analyticsModel *AnalyticsModel
2526
logger *logrus.Entry
27+
sfGroup *singleflight.Group
2628
}
2729

2830
func NewNatsModel(app *config.AppConfig, ds *dbservice.DatabaseService, rs *redisservice.RedisService, natsService *natsservice.NatsService, lk *livekitservice.LivekitService, turn *turnservice.TurnService, analyticsModel *AnalyticsModel, authModel *AuthModel, userModel *UserModel, logger *logrus.Logger) *NatsModel {
@@ -37,6 +39,7 @@ func NewNatsModel(app *config.AppConfig, ds *dbservice.DatabaseService, rs *redi
3739
userModel: userModel,
3840
analyticsModel: analyticsModel,
3941
logger: logger.WithField("model", "nats"),
42+
sfGroup: new(singleflight.Group),
4043
}
4144
}
4245

pkg/models/nats_user_connection.go

Lines changed: 84 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -13,94 +13,112 @@ import (
1313
"github.com/sirupsen/logrus"
1414
)
1515

16-
func (m *NatsModel) OnAfterUserJoined(roomId, userId string) {
16+
func (m *NatsModel) OnAfterUserJoined(roomId, userId, calledFrom string) {
1717
log := m.logger.WithFields(logrus.Fields{
18-
"roomId": roomId,
19-
"userId": userId,
20-
"method": "OnAfterUserJoined",
18+
"roomId": roomId,
19+
"userId": userId,
20+
"called_from": calledFrom,
21+
"method": "OnAfterUserJoined",
2122
})
2223

23-
status, err := m.natsService.GetRoomUserStatus(roomId, userId)
24-
// If there's an error or the user is already online, we don't need to proceed.
25-
if err != nil {
26-
log.WithError(err).Error("Failed to get room user status")
27-
return
28-
}
29-
if status == natsservice.UserStatusOnline {
30-
// This is a frequent case due to pings, so we don't log it to avoid noise.
31-
return
32-
}
33-
34-
log.Info("Handling user joined event")
35-
if err = m.natsService.UpdateUserStatus(roomId, userId, natsservice.UserStatusOnline); err != nil {
36-
log.WithError(err).WithField("status", natsservice.UserStatusOnline).Warn("failed to update user status")
37-
}
38-
39-
userInfo, err := m.natsService.GetUserInfo(roomId, userId)
40-
if err == nil && userInfo != nil {
41-
// broadcast this user to everyone
42-
if err = m.natsService.BroadcastSystemEventToEveryoneExceptUserId(plugnmeet.NatsMsgServerToClientEvents_USER_JOINED, roomId, userInfo, userId); err != nil {
43-
log.WithError(err).Error("Failed to broadcast USER_JOINED event")
24+
key := fmt.Sprintf("user-joined-%s-%s", roomId, userId)
25+
_, _, shared := m.sfGroup.Do(key, func() (interface{}, error) {
26+
status, err := m.natsService.GetRoomUserStatus(roomId, userId)
27+
// If there's an error or the user is already online, we don't need to proceed.
28+
if err != nil {
29+
log.WithError(err).Error("Failed to get room user status")
30+
return nil, err
31+
}
32+
if status == natsservice.UserStatusOnline {
33+
// This is a frequent case due to pings, so we don't log it to avoid noise.
34+
return nil, nil
4435
}
4536

46-
now := fmt.Sprintf("%d", time.Now().UnixMilli())
47-
m.analyticsModel.HandleEvent(&plugnmeet.AnalyticsDataMsg{
48-
EventType: plugnmeet.AnalyticsEventType_ANALYTICS_EVENT_TYPE_ROOM,
49-
EventName: plugnmeet.AnalyticsEvents_ANALYTICS_EVENT_USER_JOINED,
50-
RoomId: roomId,
51-
UserId: &userId,
52-
UserName: &userInfo.Name,
53-
ExtraData: &userInfo.Metadata,
54-
HsetValue: &now,
55-
})
56-
log.Info("Successfully processed user joined event")
57-
58-
roomInfo, err := m.natsService.GetRoomInfo(roomId)
59-
if err != nil {
60-
log.WithError(err).Error("Failed to get room info")
37+
log.Info("Handling user joined event")
38+
if err = m.natsService.UpdateUserStatus(roomId, userId, natsservice.UserStatusOnline); err != nil {
39+
log.WithError(err).WithField("status", natsservice.UserStatusOnline).Warn("failed to update user status")
6140
}
62-
if roomInfo != nil {
63-
if _, err := m.ds.IncrementOrDecrementNumParticipants(roomInfo.GetRoomSid(), "+"); err != nil {
64-
log.WithError(err).Error("Failed to increment num participants")
41+
42+
userInfo, err := m.natsService.GetUserInfo(roomId, userId)
43+
if err == nil && userInfo != nil {
44+
// broadcast this user to everyone
45+
if err = m.natsService.BroadcastSystemEventToEveryoneExceptUserId(plugnmeet.NatsMsgServerToClientEvents_USER_JOINED, roomId, userInfo, userId); err != nil {
46+
log.WithError(err).Error("Failed to broadcast USER_JOINED event")
6547
}
48+
49+
now := fmt.Sprintf("%d", time.Now().UnixMilli())
50+
m.analyticsModel.HandleEvent(&plugnmeet.AnalyticsDataMsg{
51+
EventType: plugnmeet.AnalyticsEventType_ANALYTICS_EVENT_TYPE_ROOM,
52+
EventName: plugnmeet.AnalyticsEvents_ANALYTICS_EVENT_USER_JOINED,
53+
RoomId: roomId,
54+
UserId: &userId,
55+
UserName: &userInfo.Name,
56+
ExtraData: &userInfo.Metadata,
57+
HsetValue: &now,
58+
})
59+
log.Info("Successfully processed user joined event")
60+
61+
roomInfo, err := m.natsService.GetRoomInfo(roomId)
62+
if err != nil {
63+
log.WithError(err).Error("Failed to get room info")
64+
}
65+
if roomInfo != nil {
66+
if _, err := m.ds.IncrementOrDecrementNumParticipants(roomInfo.GetRoomSid(), "+"); err != nil {
67+
log.WithError(err).Error("Failed to increment num participants")
68+
}
69+
}
70+
} else if err != nil {
71+
log.WithError(err).Warn("Could not get user info after join")
6672
}
67-
} else if err != nil {
68-
log.WithError(err).Warn("Could not get user info after join")
73+
return nil, nil
74+
})
75+
76+
if shared {
77+
log.Debug("OnAfterUserJoined call already in progress for this user, skipping.")
6978
}
7079
}
7180

7281
// OnAfterUserDisconnected should be run in separate goroutine
7382
// we'll wait for 5 seconds before declare user as offline
7483
// but will broadcast as disconnected
75-
func (m *NatsModel) OnAfterUserDisconnected(roomId, userId string) {
84+
func (m *NatsModel) OnAfterUserDisconnected(roomId, userId, calledFrom string) {
7685
log := m.logger.WithFields(logrus.Fields{
77-
"roomId": roomId,
78-
"userId": userId,
79-
"method": "OnAfterUserDisconnected",
86+
"roomId": roomId,
87+
"userId": userId,
88+
"called_from": calledFrom,
89+
"method": "OnAfterUserDisconnected",
8090
})
8191
log.Info("Handling user disconnected event")
8292

83-
// Immediately set status to disconnected and notify clients.
84-
if err := m.natsService.UpdateUserStatus(roomId, userId, natsservice.UserStatusDisconnected); err != nil {
85-
log.WithError(err).WithField("status", natsservice.UserStatusDisconnected).Warn("Failed to update user status")
86-
}
93+
key := fmt.Sprintf("user-disconnected-%s-%s", roomId, userId)
94+
_, _, shared := m.sfGroup.Do(key, func() (interface{}, error) {
95+
// Immediately set status to disconnected and notify clients.
96+
if err := m.natsService.UpdateUserStatus(roomId, userId, natsservice.UserStatusDisconnected); err != nil {
97+
log.WithError(err).WithField("status", natsservice.UserStatusDisconnected).Warn("Failed to update user status")
98+
}
8799

88-
// update analytics & db for the user leaving.
89-
m.updateUserLeftAnalytics(roomId, userId, log)
100+
// update analytics & db for the user leaving.
101+
m.updateUserLeftAnalytics(roomId, userId, log)
90102

91-
// Try to get user info for a richer disconnect message.
92-
userInfo, err := m.natsService.GetUserInfo(roomId, userId)
93-
if err != nil || userInfo == nil {
94-
// If we can't get user info, send a basic event and update analytics.
95-
if err = m.natsService.BroadcastSystemEventToEveryoneExceptUserId(plugnmeet.NatsMsgServerToClientEvents_USER_DISCONNECTED, roomId, &plugnmeet.NatsKvUserInfo{UserId: userId, RoomId: roomId}, userId); err != nil {
96-
log.WithError(err).Error("Failed to broadcast basic USER_DISCONNECTED event")
103+
// Try to get user info for a richer disconnect message.
104+
userInfo, err := m.natsService.GetUserInfo(roomId, userId)
105+
if err != nil || userInfo == nil {
106+
// If we can't get user info, send a basic event and update analytics.
107+
if err = m.natsService.BroadcastSystemEventToEveryoneExceptUserId(plugnmeet.NatsMsgServerToClientEvents_USER_DISCONNECTED, roomId, &plugnmeet.NatsKvUserInfo{UserId: userId, RoomId: roomId}, userId); err != nil {
108+
log.WithError(err).Error("Failed to broadcast basic USER_DISCONNECTED event")
109+
}
110+
} else {
111+
_ = m.natsService.BroadcastSystemEventToEveryoneExceptUserId(plugnmeet.NatsMsgServerToClientEvents_USER_DISCONNECTED, roomId, userInfo, userId)
97112
}
98-
} else {
99-
_ = m.natsService.BroadcastSystemEventToEveryoneExceptUserId(plugnmeet.NatsMsgServerToClientEvents_USER_DISCONNECTED, roomId, userInfo, userId)
100-
}
101113

102-
// Start a non-blocking background task to handle the full offline/cleanup lifecycle.
103-
go m.handleDelayedOfflineTasks(roomId, userId, userInfo, log)
114+
// Start a non-blocking background task to handle the full offline/cleanup lifecycle.
115+
go m.handleDelayedOfflineTasks(roomId, userId, userInfo, log)
116+
return nil, nil
117+
})
118+
119+
if shared {
120+
log.Debug("OnAfterUserDisconnected call already in progress for this user, skipping.")
121+
}
104122
}
105123

106124
// handleDelayedOfflineTasks manages the grace period for user reconnection and subsequent cleanup using periodic checks.

pkg/models/nats_user_event.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func (m *NatsModel) HandleClientPing(roomId, userId string) {
182182
// because the server may receive this join status a bit lately
183183
// as user has sent ping request, this indicates the user is online
184184
// OnAfterUserJoined will check the current status and act if the user was not online.
185-
m.OnAfterUserJoined(roomId, userId)
185+
m.OnAfterUserJoined(roomId, userId, "HandleClientPing")
186186

187187
lastPing := fmt.Sprintf("%d", time.Now().UnixMilli())
188188
err := m.natsService.UpdateUserKeyValue(roomId, userId, natsservice.UserLastPingAt, lastPing)

0 commit comments

Comments
 (0)