Skip to content

Commit 5501699

Browse files
[Fix] Conflict notifications map (#147)
# Problem [Issue](#84) The system database was using a single shared notificationsMap to handle both workflow notifications (`_DBOS_NOTIFICATIONS_CHANNEL`) and workflow events (`_DBOS_WORKFLOW_EVENTS_CHANNEL`). This design could lead to potential conflicts when both channels used the same payload format (`workflowID::topic`), causing notifications intended for one channel to incorrectly trigger listeners on the other channel. # Solution Split the single notificationsMap into two separate maps: `workflowNotificationsMap` - dedicated to handling workflow notifications `workflowEventsMap` - dedicated to handling workflow events
1 parent 9aad7b2 commit 5501699

File tree

1 file changed

+28
-20
lines changed

1 file changed

+28
-20
lines changed

dbos/system_database.go

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,13 @@ type systemDatabase interface {
6767
}
6868

6969
type sysDB struct {
70-
pool *pgxpool.Pool
71-
notificationLoopDone chan struct{}
72-
notificationsMap *sync.Map
73-
logger *slog.Logger
74-
schema string
75-
launched bool
70+
pool *pgxpool.Pool
71+
notificationLoopDone chan struct{}
72+
workflowNotificationsMap *sync.Map
73+
workflowEventsMap *sync.Map
74+
logger *slog.Logger
75+
schema string
76+
launched bool
7677
}
7778

7879
/*******************************/
@@ -330,14 +331,16 @@ func newSystemDatabase(ctx context.Context, inputs newSystemDatabaseInput) (syst
330331
}
331332

332333
// Create a map of notification payloads to channels
333-
notificationsMap := &sync.Map{}
334+
workflowNotificationsMap := &sync.Map{}
335+
workflowEventsMap := &sync.Map{}
334336

335337
return &sysDB{
336-
pool: pool,
337-
notificationsMap: notificationsMap,
338-
notificationLoopDone: make(chan struct{}),
339-
logger: logger.With("service", "system_database"),
340-
schema: databaseSchema,
338+
pool: pool,
339+
workflowNotificationsMap: workflowNotificationsMap,
340+
workflowEventsMap: workflowEventsMap,
341+
notificationLoopDone: make(chan struct{}),
342+
logger: logger.With("service", "system_database"),
343+
schema: databaseSchema,
341344
}, nil
342345
}
343346

@@ -374,7 +377,8 @@ func (s *sysDB) shutdown(ctx context.Context, timeout time.Duration) {
374377
}
375378
}
376379

377-
s.notificationsMap.Clear()
380+
s.workflowNotificationsMap.Clear()
381+
s.workflowEventsMap.Clear()
378382

379383
s.launched = false
380384
}
@@ -1672,9 +1676,13 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) {
16721676
retryAttempt--
16731677
}
16741678

1675-
// Handle notifications
1676-
if n.Channel == _DBOS_NOTIFICATIONS_CHANNEL || n.Channel == _DBOS_WORKFLOW_EVENTS_CHANNEL {
1677-
if cond, ok := s.notificationsMap.Load(n.Payload); ok {
1679+
switch n.Channel {
1680+
case _DBOS_NOTIFICATIONS_CHANNEL:
1681+
if cond, ok := s.workflowNotificationsMap.Load(n.Payload); ok {
1682+
cond.(*sync.Cond).Broadcast()
1683+
}
1684+
case _DBOS_WORKFLOW_EVENTS_CHANNEL:
1685+
if cond, ok := s.workflowEventsMap.Load(n.Payload); ok {
16781686
cond.(*sync.Cond).Broadcast()
16791687
}
16801688
}
@@ -1820,15 +1828,15 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) {
18201828
// First check if there's already a receiver for this workflow/topic to avoid unnecessary database load
18211829
payload := fmt.Sprintf("%s::%s", destinationID, topic)
18221830
cond := sync.NewCond(&sync.Mutex{})
1823-
_, loaded := s.notificationsMap.LoadOrStore(payload, cond)
1831+
_, loaded := s.workflowNotificationsMap.LoadOrStore(payload, cond)
18241832
if loaded {
18251833
s.logger.Error("Receive already called for workflow", "destination_id", destinationID)
18261834
return nil, newWorkflowConflictIDError(destinationID)
18271835
}
18281836
defer func() {
18291837
// Clean up the condition variable after we're done and broadcast to wake up any waiting goroutines
18301838
cond.Broadcast()
1831-
s.notificationsMap.Delete(payload)
1839+
s.workflowNotificationsMap.Delete(payload)
18321840
}()
18331841

18341842
// Now check if there is already a message available in the database.
@@ -2048,7 +2056,7 @@ func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error)
20482056
// Create notification payload and condition variable
20492057
payload := fmt.Sprintf("%s::%s", input.TargetWorkflowID, input.Key)
20502058
cond := sync.NewCond(&sync.Mutex{})
2051-
existingCond, loaded := s.notificationsMap.LoadOrStore(payload, cond)
2059+
existingCond, loaded := s.workflowEventsMap.LoadOrStore(payload, cond)
20522060
if loaded {
20532061
// Reuse the existing condition variable
20542062
cond = existingCond.(*sync.Cond)
@@ -2059,7 +2067,7 @@ func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error)
20592067
cond.Broadcast()
20602068
// Clean up the condition variable after we're done, if we created it
20612069
if !loaded {
2062-
s.notificationsMap.Delete(payload)
2070+
s.workflowEventsMap.Delete(payload)
20632071
}
20642072
}()
20652073

0 commit comments

Comments
 (0)