Skip to content

Commit 7df56b6

Browse files
committed
client enqueue retry
1 parent b87b0c7 commit 7df56b6

File tree

1 file changed

+23
-19
lines changed

1 file changed

+23
-19
lines changed

dbos/client.go

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -162,28 +162,32 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu
162162
}
163163

164164
uncancellableCtx := WithoutCancel(dbosCtx)
165-
166-
tx, err := dbosCtx.systemDB.(*sysDB).pool.Begin(uncancellableCtx)
167-
if err != nil {
168-
return nil, newWorkflowExecutionError(workflowID, fmt.Errorf("failed to begin transaction: %w", err))
169-
}
170-
defer tx.Rollback(uncancellableCtx) // Rollback if not committed
171-
172-
// Insert workflow status with transaction
173-
insertInput := insertWorkflowStatusDBInput{
174-
status: status,
175-
tx: tx,
176-
}
177-
_, err = dbosCtx.systemDB.insertWorkflowStatus(uncancellableCtx, insertInput)
165+
err := retry(dbosCtx, func() error {
166+
tx, err := dbosCtx.systemDB.(*sysDB).pool.Begin(uncancellableCtx)
167+
if err != nil {
168+
return newWorkflowExecutionError(workflowID, fmt.Errorf("failed to begin transaction: %w", err))
169+
}
170+
defer tx.Rollback(uncancellableCtx) // Rollback if not committed
171+
172+
// Insert workflow status with transaction
173+
insertInput := insertWorkflowStatusDBInput{
174+
status: status,
175+
tx: tx,
176+
}
177+
_, err = dbosCtx.systemDB.insertWorkflowStatus(uncancellableCtx, insertInput)
178+
if err != nil {
179+
dbosCtx.logger.Error("failed to insert workflow status", "error", err, "workflow_id", workflowID)
180+
return err
181+
}
182+
183+
if err := tx.Commit(uncancellableCtx); err != nil {
184+
return fmt.Errorf("failed to commit transaction: %w", err)
185+
}
186+
return nil
187+
}, withRetrierLogger(dbosCtx.logger))
178188
if err != nil {
179-
dbosCtx.logger.Error("failed to insert workflow status", "error", err, "workflow_id", workflowID)
180189
return nil, err
181190
}
182-
183-
if err := tx.Commit(uncancellableCtx); err != nil {
184-
return nil, fmt.Errorf("failed to commit transaction: %w", err)
185-
}
186-
187191
return newWorkflowPollingHandle[any](uncancellableCtx, workflowID), nil
188192
}
189193

0 commit comments

Comments
 (0)