Skip to content

Commit 066c137

Browse files
committed
handle lost wake-up
1 parent 38d6e62 commit 066c137

File tree

2 files changed

+10
-2
lines changed

2 files changed

+10
-2
lines changed

.github/workflows/chaos-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ jobs:
3737
run: go install gotest.tools/gotestsum@latest
3838

3939
- name: Run chaos tests
40-
run: go vet ./... && gotestsum -- -v -race -count=1 ./...
40+
run: go vet ./... && go test -v -race -count=1 ./...
4141
working-directory: ./chaos_tests
4242
env:
4343
PGPASSWORD: a!b@c$d()e*_,/:;=?@ff[]22

dbos/system_database.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1681,11 +1681,15 @@ func (s *sysDB) notificationListenerLoop(ctx context.Context) {
16811681
switch n.Channel {
16821682
case _DBOS_NOTIFICATIONS_CHANNEL:
16831683
if cond, ok := s.workflowNotificationsMap.Load(n.Payload); ok {
1684+
cond.(*sync.Cond).L.Lock()
16841685
cond.(*sync.Cond).Broadcast()
1686+
cond.(*sync.Cond).L.Unlock()
16851687
}
16861688
case _DBOS_WORKFLOW_EVENTS_CHANNEL:
16871689
if cond, ok := s.workflowEventsMap.Load(n.Payload); ok {
1690+
cond.(*sync.Cond).L.Lock()
16881691
cond.(*sync.Cond).Broadcast()
1692+
cond.(*sync.Cond).L.Unlock()
16891693
}
16901694
}
16911695
}
@@ -1830,8 +1834,10 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) {
18301834
// First check if there's already a receiver for this workflow/topic to avoid unnecessary database load
18311835
payload := fmt.Sprintf("%s::%s", destinationID, topic)
18321836
cond := sync.NewCond(&sync.Mutex{})
1837+
cond.L.Lock()
18331838
_, loaded := s.workflowNotificationsMap.LoadOrStore(payload, cond)
18341839
if loaded {
1840+
cond.L.Unlock()
18351841
s.logger.Error("Receive already called for workflow", "destination_id", destinationID)
18361842
return nil, newWorkflowConflictIDError(destinationID)
18371843
}
@@ -1847,6 +1853,7 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) {
18471853
query := fmt.Sprintf(`SELECT EXISTS (SELECT 1 FROM %s.notifications WHERE destination_uuid = $1 AND topic = $2)`, pgx.Identifier{s.schema}.Sanitize())
18481854
err = s.pool.QueryRow(ctx, query, destinationID, topic).Scan(&exists)
18491855
if err != nil {
1856+
cond.L.Unlock()
18501857
return false, fmt.Errorf("failed to check message: %w", err)
18511858
}
18521859
if !exists {
@@ -1855,7 +1862,6 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) {
18551862

18561863
done := make(chan struct{})
18571864
go func() {
1858-
cond.L.Lock()
18591865
defer cond.L.Unlock()
18601866
cond.Wait()
18611867
close(done)
@@ -1879,6 +1885,8 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) {
18791885
s.logger.Warn("Recv() context cancelled", "payload", payload, "cause", context.Cause(ctx))
18801886
return nil, ctx.Err()
18811887
}
1888+
} else {
1889+
cond.L.Unlock()
18821890
}
18831891

18841892
// Find the oldest message and delete it atomically

0 commit comments

Comments
 (0)