Skip to content

Commit 569bc32

Browse files
committed
manually handle locks in getEvent
1 parent e61ae86 commit 569bc32

File tree

1 file changed

+6
-1
lines changed

1 file changed

+6
-1
lines changed

dbos/system_database.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2066,8 +2066,10 @@ func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error)
20662066
// Create notification payload and condition variable
20672067
payload := fmt.Sprintf("%s::%s", input.TargetWorkflowID, input.Key)
20682068
cond := sync.NewCond(&sync.Mutex{})
2069+
cond.L.Lock()
20692070
existingCond, loaded := s.workflowEventsMap.LoadOrStore(payload, cond)
20702071
if loaded {
2072+
cond.L.Unlock()
20712073
// Reuse the existing condition variable
20722074
cond = existingCond.(*sync.Cond)
20732075
}
@@ -2088,14 +2090,14 @@ func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error)
20882090
row := s.pool.QueryRow(ctx, query, input.TargetWorkflowID, input.Key)
20892091
err := row.Scan(&valueString)
20902092
if err != nil && err != pgx.ErrNoRows {
2093+
cond.L.Unlock()
20912094
return nil, fmt.Errorf("failed to query workflow event: %w", err)
20922095
}
20932096

20942097
if err == pgx.ErrNoRows {
20952098
// Wait for notification with timeout using condition variable
20962099
done := make(chan struct{})
20972100
go func() {
2098-
cond.L.Lock()
20992101
defer cond.L.Unlock()
21002102
cond.Wait()
21012103
close(done)
@@ -2127,8 +2129,11 @@ func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error)
21272129
row = s.pool.QueryRow(ctx, query, input.TargetWorkflowID, input.Key)
21282130
err = row.Scan(&valueString)
21292131
if err != nil && err != pgx.ErrNoRows {
2132+
cond.L.Unlock()
21302133
return nil, fmt.Errorf("failed to query workflow event after wait: %w", err)
21312134
}
2135+
} else {
2136+
cond.L.Unlock()
21322137
}
21332138

21342139
// Deserialize the value if it exists

0 commit comments

Comments
 (0)