Skip to content

Commit dbf7b2c

Browse files
committed
do less
1 parent 6f9ec3e commit dbf7b2c

File tree

3 files changed

+30
-96
lines changed

3 files changed

+30
-96
lines changed

dbos/client.go

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -162,32 +162,28 @@ func (c *client) Enqueue(queueName, workflowName string, input any, opts ...Enqu
162162
}
163163

164164
uncancellableCtx := WithoutCancel(dbosCtx)
165-
err := retry(dbosCtx, func() error {
166-
tx, err := dbosCtx.systemDB.(*sysDB).pool.Begin(uncancellableCtx)
167-
if err != nil {
168-
return newWorkflowExecutionError(workflowID, fmt.Errorf("failed to begin transaction: %w", err))
169-
}
170-
defer tx.Rollback(uncancellableCtx) // Rollback if not committed
171-
172-
// Insert workflow status with transaction
173-
insertInput := insertWorkflowStatusDBInput{
174-
status: status,
175-
tx: tx,
176-
}
177-
_, err = dbosCtx.systemDB.insertWorkflowStatus(uncancellableCtx, insertInput)
178-
if err != nil {
179-
dbosCtx.logger.Error("failed to insert workflow status", "error", err, "workflow_id", workflowID)
180-
return err
181-
}
182-
183-
if err := tx.Commit(uncancellableCtx); err != nil {
184-
return fmt.Errorf("failed to commit transaction: %w", err)
185-
}
186-
return nil
187-
}, withRetrierLogger(dbosCtx.logger))
165+
166+
tx, err := dbosCtx.systemDB.(*sysDB).pool.Begin(uncancellableCtx)
167+
if err != nil {
168+
return nil, newWorkflowExecutionError(workflowID, fmt.Errorf("failed to begin transaction: %v", err))
169+
}
170+
defer tx.Rollback(uncancellableCtx) // Rollback if not committed
171+
172+
// Insert workflow status with transaction
173+
insertInput := insertWorkflowStatusDBInput{
174+
status: status,
175+
tx: tx,
176+
}
177+
_, err = dbosCtx.systemDB.insertWorkflowStatus(uncancellableCtx, insertInput)
188178
if err != nil {
179+
dbosCtx.logger.Error("failed to insert workflow status", "error", err, "workflow_id", workflowID)
189180
return nil, err
190181
}
182+
183+
if err := tx.Commit(uncancellableCtx); err != nil {
184+
return nil, fmt.Errorf("failed to commit transaction: %w", err)
185+
}
186+
191187
return newWorkflowPollingHandle[any](uncancellableCtx, workflowID), nil
192188
}
193189

dbos/system_database.go

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2621,49 +2621,6 @@ type retryConfig struct {
26212621
// retryOption is a functional option for configuring retry behavior
26222622
type retryOption func(*retryConfig)
26232623

2624-
// withRetrierMaxRetries sets the maximum number of retry attempts (-1 for infinite)
2625-
func withRetrierMaxRetries(maxRetries int) retryOption {
2626-
return func(c *retryConfig) {
2627-
c.maxRetries = maxRetries
2628-
}
2629-
}
2630-
2631-
// withRetrierBaseDelay sets the initial delay between retry attempts
2632-
func withRetrierBaseDelay(delay time.Duration) retryOption {
2633-
return func(c *retryConfig) {
2634-
c.baseDelay = delay
2635-
}
2636-
}
2637-
2638-
// withRetrierMaxDelay sets the maximum delay between retry attempts
2639-
func withRetrierMaxDelay(delay time.Duration) retryOption {
2640-
return func(c *retryConfig) {
2641-
c.maxDelay = delay
2642-
}
2643-
}
2644-
2645-
// withRetrierBackoffFactor sets the exponential backoff factor
2646-
func withRetrierBackoffFactor(factor float64) retryOption {
2647-
return func(c *retryConfig) {
2648-
c.backoffFactor = factor
2649-
}
2650-
}
2651-
2652-
// withRetrierJitter sets the jitter range for retry delays (e.g., 0.95 to 1.05 for ±5% jitter)
2653-
func withRetrierJitter(min, max float64) retryOption {
2654-
return func(c *retryConfig) {
2655-
c.jitterMin = min
2656-
c.jitterMax = max
2657-
}
2658-
}
2659-
2660-
// withRetrierCondition sets the function that determines if an error is retryable
2661-
func withRetrierCondition(condition func(error, *slog.Logger) bool) retryOption {
2662-
return func(c *retryConfig) {
2663-
c.retryCondition = condition
2664-
}
2665-
}
2666-
26672624
// withRetrierLogger sets the logger for the retrier
26682625
func withRetrierLogger(logger *slog.Logger) retryOption {
26692626
return func(c *retryConfig) {

dbos/workflow.go

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1408,13 +1408,11 @@ func (c *dbosContext) RetrieveWorkflow(_ DBOSContext, workflowID string) (Workfl
14081408
})
14091409
}, WithStepName("DBOS.retrieveWorkflow"))
14101410
} else {
1411-
workflowStatus, err = retryWithResult(c, func() ([]WorkflowStatus, error) {
1412-
return c.systemDB.listWorkflows(c, listWorkflowsDBInput{
1413-
workflowIDs: []string{workflowID},
1414-
loadInput: loadInput,
1415-
loadOutput: loadOutput,
1416-
})
1417-
}, withRetrierLogger(c.logger))
1411+
workflowStatus, err = c.systemDB.listWorkflows(c, listWorkflowsDBInput{
1412+
workflowIDs: []string{workflowID},
1413+
loadInput: loadInput,
1414+
loadOutput: loadOutput,
1415+
})
14181416
}
14191417
if err != nil {
14201418
return nil, fmt.Errorf("failed to retrieve workflow status: %w", err)
@@ -1471,9 +1469,7 @@ func (c *dbosContext) CancelWorkflow(_ DBOSContext, workflowID string) error {
14711469
}, WithStepName("DBOS.cancelWorkflow"))
14721470
return err
14731471
} else {
1474-
return retry(c, func() error {
1475-
return c.systemDB.cancelWorkflow(c, workflowID)
1476-
}, withRetrierLogger(c.logger))
1472+
return c.systemDB.cancelWorkflow(c, workflowID)
14771473
}
14781474
}
14791475

@@ -1509,9 +1505,7 @@ func (c *dbosContext) ResumeWorkflow(_ DBOSContext, workflowID string) (Workflow
15091505
return nil, err
15101506
}, WithStepName("DBOS.resumeWorkflow"))
15111507
} else {
1512-
err = retry(c, func() error {
1513-
return c.systemDB.resumeWorkflow(c, workflowID)
1514-
}, withRetrierLogger(c.logger))
1508+
err = c.systemDB.resumeWorkflow(c, workflowID)
15151509
}
15161510
if err != nil {
15171511
return nil, err
@@ -1591,9 +1585,7 @@ func (c *dbosContext) ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (Work
15911585
return c.systemDB.forkWorkflow(ctx, dbInput)
15921586
}, WithStepName("DBOS.forkWorkflow"))
15931587
} else {
1594-
forkedWorkflowID, err = retryWithResult(c, func() (string, error) {
1595-
return c.systemDB.forkWorkflow(c, dbInput)
1596-
}, withRetrierLogger(c.logger))
1588+
forkedWorkflowID, err = c.systemDB.forkWorkflow(c, dbInput)
15971589
}
15981590
if err != nil {
15991591
return nil, err
@@ -1832,22 +1824,13 @@ func (c *dbosContext) ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption)
18321824
// Call the context method to list workflows
18331825
workflowState, ok := c.Value(workflowStateKey).(*workflowState)
18341826
isWithinWorkflow := ok && workflowState != nil
1835-
var workflows []WorkflowStatus
1836-
var err error
18371827
if isWithinWorkflow {
1838-
workflows, err = RunAsStep(c, func(ctx context.Context) ([]WorkflowStatus, error) {
1828+
return RunAsStep(c, func(ctx context.Context) ([]WorkflowStatus, error) {
18391829
return c.systemDB.listWorkflows(ctx, dbInput)
18401830
}, WithStepName("DBOS.listWorkflows"))
18411831
} else {
1842-
workflows, err = retryWithResult(c, func() ([]WorkflowStatus, error) {
1843-
return c.systemDB.listWorkflows(c, dbInput)
1844-
}, withRetrierLogger(c.logger))
1832+
return c.systemDB.listWorkflows(c, dbInput)
18451833
}
1846-
if err != nil {
1847-
return nil, err
1848-
}
1849-
1850-
return workflows, nil
18511834
}
18521835

18531836
// ListWorkflows retrieves a list of workflows based on the provided filters.
@@ -1909,9 +1892,7 @@ func (c *dbosContext) GetWorkflowSteps(_ DBOSContext, workflowID string) ([]Step
19091892
return c.systemDB.getWorkflowSteps(ctx, workflowID)
19101893
}, WithStepName("DBOS.getWorkflowSteps"))
19111894
} else {
1912-
return retryWithResult(c, func() ([]StepInfo, error) {
1913-
return c.systemDB.getWorkflowSteps(c, workflowID)
1914-
}, withRetrierLogger(c.logger))
1895+
return c.systemDB.getWorkflowSteps(c, workflowID)
19151896
}
19161897
}
19171898

0 commit comments

Comments
 (0)