Skip to content

Commit 1e1de11

Browse files
authored
Handle conflicting executions (dbos-inc#139)
If a workflow observes the returned error is a `DBOSError` with code `ConflictingIDError`, fall back to `awaitWorkflowResult` in the workflow goroutine. Note that we cannot distinguish this error if the user handles it (e.g., during `setEvent`) and wraps it with some other error message.
1 parent 7b87951 commit 1e1de11

File tree

3 files changed

+47
-96
lines changed

3 files changed

+47
-96
lines changed

dbos/system_database.go

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1189,17 +1189,16 @@ func (s *sysDB) recordOperationResult(ctx context.Context, input recordOperation
11891189
return fmt.Errorf("failed to serialize output: %w", err)
11901190
}
11911191

1192-
var commandTag pgconn.CommandTag
11931192
if input.tx != nil {
1194-
commandTag, err = input.tx.Exec(ctx, query,
1193+
_, err = input.tx.Exec(ctx, query,
11951194
input.workflowID,
11961195
input.stepID,
11971196
outputString,
11981197
errorString,
11991198
input.stepName,
12001199
)
12011200
} else {
1202-
commandTag, err = s.pool.Exec(ctx, query,
1201+
_, err = s.pool.Exec(ctx, query,
12031202
input.workflowID,
12041203
input.stepID,
12051204
outputString,
@@ -1208,12 +1207,6 @@ func (s *sysDB) recordOperationResult(ctx context.Context, input recordOperation
12081207
)
12091208
}
12101209

1211-
/*
1212-
s.logger.Debug("RecordOperationResult CommandTag", "command_tag", commandTag)
1213-
s.logger.Debug("RecordOperationResult Rows affected", "rows_affected", commandTag.RowsAffected())
1214-
s.logger.Debug("RecordOperationResult SQL", "sql", commandTag.String())
1215-
*/
1216-
12171210
if err != nil {
12181211
s.logger.Error("RecordOperationResult Error occurred", "error", err)
12191212
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == _PG_ERROR_UNIQUE_VIOLATION {
@@ -1222,10 +1215,6 @@ func (s *sysDB) recordOperationResult(ctx context.Context, input recordOperation
12221215
return err
12231216
}
12241217

1225-
if commandTag.RowsAffected() == 0 {
1226-
s.logger.Warn("RecordOperationResult No rows were affected by the insert")
1227-
}
1228-
12291218
return nil
12301219
}
12311220

@@ -1778,7 +1767,7 @@ func (s *sysDB) send(ctx context.Context, input WorkflowSendInput) error {
17781767

17791768
err = s.recordOperationResult(ctx, recordInput)
17801769
if err != nil {
1781-
return fmt.Errorf("failed to record operation result: %w", err)
1770+
return err
17821771
}
17831772
}
17841773

@@ -1932,7 +1921,7 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) {
19321921
}
19331922
err = s.recordOperationResult(ctx, recordInput)
19341923
if err != nil {
1935-
return nil, fmt.Errorf("failed to record operation result: %w", err)
1924+
return nil, err
19361925
}
19371926

19381927
if err := tx.Commit(ctx); err != nil {
@@ -2013,7 +2002,7 @@ func (s *sysDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error
20132002

20142003
err = s.recordOperationResult(ctx, recordInput)
20152004
if err != nil {
2016-
return fmt.Errorf("failed to record operation result: %w", err)
2005+
return err
20172006
}
20182007

20192008
// Commit transaction
@@ -2145,7 +2134,7 @@ func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error)
21452134

21462135
err = s.recordOperationResult(ctx, recordInput)
21472136
if err != nil {
2148-
return nil, fmt.Errorf("failed to record operation result: %w", err)
2137+
return nil, err
21492138
}
21502139
}
21512140

dbos/workflow.go

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -826,35 +826,44 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
826826
c.workflowsWg.Add(1)
827827
go func() {
828828
defer c.workflowsWg.Done()
829-
result, err := fn(workflowCtx, input)
830-
status := WorkflowStatusSuccess
831829

832-
// If an error occurred, set the status to error
833-
if err != nil {
834-
status = WorkflowStatusError
835-
}
830+
var result any
831+
var err error
836832

837-
// If the afterFunc has started, the workflow was cancelled and the status should be set to cancelled
838-
if stopFunc != nil && !stopFunc() {
839-
c.logger.Info("Workflow was cancelled. Waiting for cancel function to complete", "workflow_id", workflowID)
840-
// Wait for the cancel function to complete
841-
// Note this must happen before we write on the outcome channel (and signal the handler's GetResult)
842-
<-cancelFuncCompleted
843-
// Set the status to cancelled and move on so we still record the outcome in the DB
844-
status = WorkflowStatusCancelled
845-
}
833+
result, err = fn(workflowCtx, input)
846834

847-
recordErr := c.systemDB.updateWorkflowOutcome(uncancellableCtx, updateWorkflowOutcomeDBInput{
848-
workflowID: workflowID,
849-
status: status,
850-
err: err,
851-
output: result,
852-
})
853-
if recordErr != nil {
854-
c.logger.Error("Error recording workflow outcome", "workflow_id", workflowID, "error", recordErr)
855-
outcomeChan <- workflowOutcome[any]{result: nil, err: recordErr}
856-
close(outcomeChan)
857-
return
835+
// Handle DBOS ID conflict errors by waiting workflow result
836+
var dbosErr *DBOSError
837+
if errors.As(err, &dbosErr) && dbosErr.Code == ConflictingIDError {
838+
c.logger.Warn("Workflow ID conflict detected. Waiting for existing workflow to complete", "workflow_id", workflowID)
839+
result, err = c.systemDB.awaitWorkflowResult(uncancellableCtx, workflowID)
840+
} else {
841+
status := WorkflowStatusSuccess
842+
843+
// If an error occurred, set the status to error
844+
if err != nil {
845+
status = WorkflowStatusError
846+
}
847+
848+
// If the afterFunc has started, the workflow was cancelled and the status should be set to cancelled
849+
if stopFunc != nil && !stopFunc() {
850+
c.logger.Info("Workflow was cancelled. Waiting for cancel function to complete", "workflow_id", workflowID)
851+
<-cancelFuncCompleted // Wait for the cancel function to complete
852+
status = WorkflowStatusCancelled
853+
}
854+
855+
recordErr := c.systemDB.updateWorkflowOutcome(uncancellableCtx, updateWorkflowOutcomeDBInput{
856+
workflowID: workflowID,
857+
status: status,
858+
err: err,
859+
output: result,
860+
})
861+
if recordErr != nil {
862+
c.logger.Error("Error recording workflow outcome", "workflow_id", workflowID, "error", recordErr)
863+
outcomeChan <- workflowOutcome[any]{result: nil, err: recordErr}
864+
close(outcomeChan)
865+
return
866+
}
858867
}
859868
outcomeChan <- workflowOutcome[any]{result: result, err: err}
860869
close(outcomeChan)

dbos/workflows_test.go

Lines changed: 6 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1736,14 +1736,11 @@ func TestSendRecv(t *testing.T) {
17361736
})
17371737

17381738
t.Run("TestConcurrentRecvs", func(t *testing.T) {
1739-
// Test concurrent receivers - only 1 should timeout, others should get errors
1739+
// Test concurrent receivers - all should return valid results
17401740
receiveTopic := "concurrent-recv-topic"
17411741

1742-
// Start multiple concurrent receive workflows - no messages will be sent
1742+
// Start multiple concurrent receive workflows
17431743
numReceivers := 5
1744-
var wg sync.WaitGroup
1745-
results := make(chan string, numReceivers)
1746-
errors := make(chan error, numReceivers)
17471744
receiverHandles := make([]WorkflowHandle[string], numReceivers)
17481745

17491746
// Start all receivers - they will signal when ready and wait for coordination
@@ -1768,56 +1765,12 @@ func TestSendRecv(t *testing.T) {
17681765
// Now unblock all receivers simultaneously so they race to the Recv call
17691766
concurrentRecvStartEvent.Set()
17701767

1771-
// Collect results from all receivers concurrently
1772-
// Only 1 should timeout (winner of the CV), others should get errors
1773-
wg.Add(numReceivers)
1768+
// Collect results from all receivers
17741769
for i := range numReceivers {
1775-
go func(index int) {
1776-
defer wg.Done()
1777-
result, err := receiverHandles[index].GetResult()
1778-
if err != nil {
1779-
errors <- err
1780-
} else {
1781-
results <- result
1782-
}
1783-
}(i)
1784-
}
1785-
1786-
wg.Wait()
1787-
close(results)
1788-
close(errors)
1789-
1790-
// Count timeout results and errors
1791-
timeoutCount := 0
1792-
errorCount := 0
1793-
1794-
for result := range results {
1795-
if result == "" {
1796-
// Empty string indicates a timeout - only 1 receiver should get this
1797-
timeoutCount++
1798-
}
1770+
result, err := receiverHandles[i].GetResult()
1771+
require.NoError(t, err, "receiver %d should not error", i)
1772+
require.Equal(t, result, "", "receiver %d should have an empty string result", i)
17991773
}
1800-
1801-
for err := range errors {
1802-
t.Logf("Receiver error (expected): %v", err)
1803-
1804-
// Check that the error is of the expected type
1805-
dbosErr, ok := err.(*DBOSError)
1806-
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
1807-
require.Equal(t, ConflictingIDError, dbosErr.Code, "expected error code to be ConflictingIDError, got %v", dbosErr.Code)
1808-
require.Equal(t, "concurrent-recv-wfid", dbosErr.WorkflowID, "expected workflow ID to be 'concurrent-recv-wfid', got %s", dbosErr.WorkflowID)
1809-
require.True(t, dbosErr.IsBase, "expected error to have IsBase=true")
1810-
require.Contains(t, dbosErr.Message, "Conflicting workflow ID concurrent-recv-wfid", "expected error message to contain conflicting workflow ID")
1811-
1812-
errorCount++
1813-
}
1814-
1815-
// Verify that exactly 1 receiver timed out and 4 got errors
1816-
assert.Equal(t, 1, timeoutCount, "expected exactly 1 receiver to timeout")
1817-
assert.Equal(t, 4, errorCount, "expected exactly 4 receivers to get errors")
1818-
1819-
// Ensure total results match expected
1820-
assert.Equal(t, numReceivers, timeoutCount+errorCount, "expected total results to equal number of receivers")
18211774
})
18221775

18231776
t.Run("durableSleep", func(t *testing.T) {

0 commit comments

Comments
 (0)