Skip to content

Commit ef0e538

Browse files
committed
handle race when creating notification channel
1 parent 7aa5a30 commit ef0e538

File tree

1 file changed

+4
-4
lines changed

1 file changed

+4
-4
lines changed

dbos/system_database.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,13 +1158,13 @@ func (s *systemDatabase) Recv(ctx context.Context, input WorkflowRecvInput) (any
11581158
if !exists {
11591159
// We'll do so through registering a channel with the notification listener loop.
11601160
payload := fmt.Sprintf("%s::%s", destinationID, topic)
1161-
_, ok := s.notificationsMap.Load(payload)
1162-
if ok {
1161+
c := make(chan bool)
1162+
_, loaded := s.notificationsMap.LoadOrStore(payload, c)
1163+
if loaded {
1164+
close(c) // Clean up the unused channel
11631165
fmt.Println("Receive already called for workflow ", destinationID)
11641166
return nil, NewWorkflowConflictIDError(destinationID)
11651167
}
1166-
c := make(chan bool)
1167-
s.notificationsMap.Store(payload, c)
11681168
defer func() {
11691169
// Clean up the channel after we're done
11701170
// XXX We should handle panics in this function and make sure we call this. Not a problem for now as panic will crash the importing package.

0 commit comments

Comments
 (0)