Skip to content

Commit 2519b48

Browse files
committed
handle pgx.ErrTxClosed as retryable but wrap entire RunWorkflow in a retry
1 parent d22cd2c commit 2519b48

File tree

2 files changed

+21
-22
lines changed

2 files changed

+21
-22
lines changed

dbos/system_database.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2552,9 +2552,12 @@ func isRetryablePGError(err error) bool {
25522552
return false
25532553
}
25542554

2555-
// Special case for tx is closed, which can happen when rollbacking an already committed transaction, and vice versa.
2556-
if strings.Contains(err.Error(), "tx is closed") {
2557-
return false
2555+
// If tx is closed (because failure happened between pgx trying to commit/rollback and setting tx.closed)
2556+
// pgx will always return pgx.ErrTxClosed again.
2557+
// This is only retryable if the caller retries with a new transaction object.
2558+
// Otherwise, retrying with the same closed transaction will always fail.
2559+
if errors.Is(err, pgx.ErrTxClosed) {
2560+
return true
25582561
}
25592562

25602563
// PostgreSQL codes indicating connection/admin shutdown etc.

dbos/workflow.go

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"time"
1212

1313
"github.com/google/uuid"
14-
"github.com/jackc/pgx/v5"
1514
"github.com/robfig/cron/v3"
1615
)
1716

@@ -342,7 +341,9 @@ func registerScheduledWorkflow(ctx DBOSContext, workflowName string, fn Workflow
342341
WithQueue(_DBOS_INTERNAL_QUEUE_NAME),
343342
withWorkflowName(workflowName),
344343
}
345-
_, err := ctx.RunWorkflow(ctx, fn, scheduledTime, opts...)
344+
_, err := retryWithResult(ctx, func() (WorkflowHandle[any], error) {
345+
return ctx.RunWorkflow(ctx, fn, scheduledTime, opts...)
346+
}, withRetrierLogger(c.logger))
346347
if err != nil {
347348
c.logger.Error("failed to run scheduled workflow", "fqn", workflowName, "error", err)
348349
}
@@ -470,7 +471,9 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], opts ...
470471

471472
typeErasedWrapper := wrappedWorkflowFunc(func(ctx DBOSContext, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) {
472473
opts = append(opts, withWorkflowName(fqn)) // Append the name so ctx.RunWorkflow can look it up from the registry to apply registration-time options
473-
handle, err := ctx.RunWorkflow(ctx, typedErasedWorkflow, input, opts...)
474+
handle, err := retryWithResult(ctx, func() (WorkflowHandle[any], error) {
475+
return ctx.RunWorkflow(ctx, typedErasedWorkflow, input, opts...)
476+
}, withRetrierLogger(ctx.(*dbosContext).logger))
474477
if err != nil {
475478
return nil, err
476479
}
@@ -592,7 +595,10 @@ func RunWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], input P, opts
592595
return fn(ctx, input.(P))
593596
})
594597

595-
handle, err := ctx.RunWorkflow(ctx, typedErasedWorkflow, input, opts...)
598+
// Wrap the RunWorkflow call with retryWithResult for database operation retries
599+
handle, err := retryWithResult(ctx, func() (WorkflowHandle[any], error) {
600+
return ctx.RunWorkflow(ctx, typedErasedWorkflow, input, opts...)
601+
}, withRetrierLogger(ctx.(*dbosContext).logger))
596602
if err != nil {
597603
return nil, err
598604
}
@@ -695,9 +701,7 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
695701

696702
// If this is a child workflow that has already been recorded in operations_output, return directly a polling handle
697703
if isChildWorkflow {
698-
childWorkflowID, err := retryWithResult(uncancellableCtx, func() (*string, error) {
699-
return c.systemDB.checkChildWorkflow(uncancellableCtx, parentWorkflowState.workflowID, parentWorkflowState.stepID)
700-
}, withRetrierLogger(c.logger))
704+
childWorkflowID, err := c.systemDB.checkChildWorkflow(uncancellableCtx, parentWorkflowState.workflowID, parentWorkflowState.stepID)
701705
if err != nil {
702706
return nil, newWorkflowExecutionError(parentWorkflowState.workflowID, fmt.Errorf("checking child workflow: %w", err))
703707
}
@@ -751,25 +755,19 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
751755
}
752756

753757
// Init status and record child workflow relationship in a single transaction
754-
tx, err := retryWithResult(uncancellableCtx, func() (pgx.Tx, error) {
755-
return c.systemDB.(*sysDB).pool.Begin(uncancellableCtx)
756-
}, withRetrierLogger(c.logger))
758+
tx, err := c.systemDB.(*sysDB).pool.Begin(uncancellableCtx)
757759
if err != nil {
758760
return nil, newWorkflowExecutionError(workflowID, fmt.Errorf("failed to begin transaction: %w", err))
759761
}
760-
defer retry(uncancellableCtx, func() error {
761-
return tx.Rollback(uncancellableCtx) // Rollback if not committed
762-
}, withRetrierLogger(c.logger))
762+
defer tx.Rollback(uncancellableCtx) // Rollback if not committed
763763

764764
// Insert workflow status with transaction
765765
insertInput := insertWorkflowStatusDBInput{
766766
status: workflowStatus,
767767
maxRetries: params.maxRetries,
768768
tx: tx,
769769
}
770-
insertStatusResult, err := retryWithResult(uncancellableCtx, func() (*insertWorkflowResult, error) {
771-
return c.systemDB.insertWorkflowStatus(uncancellableCtx, insertInput)
772-
}, withRetrierLogger(c.logger))
770+
insertStatusResult, err := c.systemDB.insertWorkflowStatus(uncancellableCtx, insertInput)
773771
if err != nil {
774772
c.logger.Error("failed to insert workflow status", "error", err, "workflow_id", workflowID)
775773
return nil, err
@@ -785,9 +783,7 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
785783
stepID: parentWorkflowState.stepID,
786784
tx: tx,
787785
}
788-
err = retry(uncancellableCtx, func() error {
789-
return c.systemDB.recordChildWorkflow(uncancellableCtx, childInput)
790-
}, withRetrierLogger(c.logger))
786+
err = c.systemDB.recordChildWorkflow(uncancellableCtx, childInput)
791787
if err != nil {
792788
c.logger.Error("failed to record child workflow", "error", err, "parent_workflow_id", parentWorkflowState.workflowID, "child_workflow_id", workflowID)
793789
return nil, newWorkflowExecutionError(parentWorkflowState.workflowID, fmt.Errorf("recording child workflow: %w", err))

0 commit comments

Comments
 (0)