Skip to content

Commit b87b0c7

Browse files
committed
wrap tx only in retry in RunWorkflow
1 parent 74ffbe0 commit b87b0c7

File tree

1 file changed

+85
-88
lines changed

1 file changed

+85
-88
lines changed

dbos/workflow.go

Lines changed: 85 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/gob"
66
"errors"
77
"fmt"
8-
"log/slog"
98
"math"
109
"reflect"
1110
"runtime"
@@ -342,9 +341,7 @@ func registerScheduledWorkflow(ctx DBOSContext, workflowName string, fn Workflow
342341
WithQueue(_DBOS_INTERNAL_QUEUE_NAME),
343342
withWorkflowName(workflowName),
344343
}
345-
_, err := retryWithResult(ctx, func() (WorkflowHandle[any], error) {
346-
return ctx.RunWorkflow(ctx, fn, scheduledTime, opts...)
347-
}, withRetrierLogger(c.logger))
344+
_, err := ctx.RunWorkflow(ctx, fn, scheduledTime, opts...)
348345
if err != nil {
349346
c.logger.Error("failed to run scheduled workflow", "fqn", workflowName, "error", err)
350347
}
@@ -457,9 +454,7 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], opts ...
457454

458455
typeErasedWrapper := wrappedWorkflowFunc(func(ctx DBOSContext, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) {
459456
opts = append(opts, withWorkflowName(fqn)) // Append the name so ctx.RunWorkflow can look it up from the registry to apply registration-time options
460-
handle, err := retryWithResult(ctx, func() (WorkflowHandle[any], error) {
461-
return ctx.RunWorkflow(ctx, typedErasedWorkflow, input, opts...)
462-
}, withRetrierLogger(ctx.(*dbosContext).logger))
457+
handle, err := ctx.RunWorkflow(ctx, typedErasedWorkflow, input, opts...)
463458
if err != nil {
464459
return nil, err
465460
}
@@ -581,14 +576,7 @@ func RunWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], input P, opts
581576
return fn(ctx, input.(P))
582577
})
583578

584-
// Wrap the RunWorkflow call with retryWithResult for database operation retries
585-
var logger *slog.Logger
586-
if dbosCtx, ok := ctx.(*dbosContext); ok {
587-
logger = dbosCtx.logger
588-
}
589-
handle, err := retryWithResult(ctx, func() (WorkflowHandle[any], error) {
590-
return ctx.RunWorkflow(ctx, typedErasedWorkflow, input, opts...)
591-
}, withRetrierLogger(logger))
579+
handle, err := ctx.RunWorkflow(ctx, typedErasedWorkflow, input, opts...)
592580
if err != nil {
593581
return nil, err
594582
}
@@ -744,94 +732,103 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
744732
Priority: int(params.priority),
745733
}
746734

747-
// Init status and record child workflow relationship in a single transaction
748-
tx, err := c.systemDB.(*sysDB).pool.Begin(uncancellableCtx)
749-
if err != nil {
750-
return nil, newWorkflowExecutionError(workflowID, fmt.Errorf("failed to begin transaction: %w", err))
751-
}
752-
defer tx.Rollback(uncancellableCtx) // Rollback if not committed
735+
var stopFunc func() bool
736+
cancelFuncCompleted := make(chan struct{})
737+
var workflowCtx DBOSContext
738+
outcomeChan := make(chan workflowOutcome[any], 1)
739+
var earlyReturnPollingHandle *workflowPollingHandle[any]
753740

754-
// Insert workflow status with transaction
755-
insertInput := insertWorkflowStatusDBInput{
756-
status: workflowStatus,
757-
maxRetries: params.maxRetries,
758-
tx: tx,
759-
}
760-
insertStatusResult, err := c.systemDB.insertWorkflowStatus(uncancellableCtx, insertInput)
761-
if err != nil {
762-
c.logger.Error("failed to insert workflow status", "error", err, "workflow_id", workflowID)
763-
return nil, err
764-
}
741+
// Init status and record child workflow relationship in a single transaction
742+
err := retry(c, func() error {
743+
tx, err := c.systemDB.(*sysDB).pool.Begin(uncancellableCtx)
744+
if err != nil {
745+
return newWorkflowExecutionError(workflowID, fmt.Errorf("failed to begin transaction: %w", err))
746+
}
747+
defer tx.Rollback(uncancellableCtx) // Rollback if not committed
765748

766-
// Record child workflow relationship if this is a child workflow
767-
if isChildWorkflow {
768-
// Get the step ID that was used for generating the child workflow ID
769-
childInput := recordChildWorkflowDBInput{
770-
parentWorkflowID: parentWorkflowState.workflowID,
771-
childWorkflowID: workflowID,
772-
stepName: params.workflowName,
773-
stepID: parentWorkflowState.stepID,
774-
tx: tx,
749+
// Insert workflow status with transaction
750+
insertInput := insertWorkflowStatusDBInput{
751+
status: workflowStatus,
752+
maxRetries: params.maxRetries,
753+
tx: tx,
775754
}
776-
err = c.systemDB.recordChildWorkflow(uncancellableCtx, childInput)
755+
insertStatusResult, err := c.systemDB.insertWorkflowStatus(uncancellableCtx, insertInput)
777756
if err != nil {
778-
c.logger.Error("failed to record child workflow", "error", err, "parent_workflow_id", parentWorkflowState.workflowID, "child_workflow_id", workflowID)
779-
return nil, newWorkflowExecutionError(parentWorkflowState.workflowID, fmt.Errorf("recording child workflow: %w", err))
757+
c.logger.Error("failed to insert workflow status", "error", err, "workflow_id", workflowID)
758+
return err
780759
}
781-
}
782760

783-
// Return a polling handle if: we are enqueueing, the workflow is already in a terminal state (success or error),
784-
if len(params.queueName) > 0 || insertStatusResult.status == WorkflowStatusSuccess || insertStatusResult.status == WorkflowStatusError {
785-
// Commit the transaction to update the number of attempts and/or enact the enqueue
786-
if err := tx.Commit(uncancellableCtx); err != nil {
787-
return nil, newWorkflowExecutionError(workflowID, fmt.Errorf("failed to commit transaction: %w", err))
761+
// Record child workflow relationship if this is a child workflow
762+
if isChildWorkflow {
763+
// Get the step ID that was used for generating the child workflow ID
764+
childInput := recordChildWorkflowDBInput{
765+
parentWorkflowID: parentWorkflowState.workflowID,
766+
childWorkflowID: workflowID,
767+
stepName: params.workflowName,
768+
stepID: parentWorkflowState.stepID,
769+
tx: tx,
770+
}
771+
err = c.systemDB.recordChildWorkflow(uncancellableCtx, childInput)
772+
if err != nil {
773+
c.logger.Error("failed to record child workflow", "error", err, "parent_workflow_id", parentWorkflowState.workflowID, "child_workflow_id", workflowID)
774+
return newWorkflowExecutionError(parentWorkflowState.workflowID, fmt.Errorf("recording child workflow: %w", err))
775+
}
788776
}
789-
return newWorkflowPollingHandle[any](uncancellableCtx, workflowStatus.ID), nil
790-
}
791777

792-
// Channel to receive the outcome from the goroutine
793-
// The buffer size of 1 allows the goroutine to send the outcome without blocking
794-
// In addition it allows the channel to be garbage collected
795-
outcomeChan := make(chan workflowOutcome[any], 1)
778+
// Return a polling handle if: we are enqueueing, the workflow is already in a terminal state (success or error),
779+
if len(params.queueName) > 0 || insertStatusResult.status == WorkflowStatusSuccess || insertStatusResult.status == WorkflowStatusError {
780+
// Commit the transaction to update the number of attempts and/or enact the enqueue
781+
if err := tx.Commit(uncancellableCtx); err != nil {
782+
return newWorkflowExecutionError(workflowID, fmt.Errorf("failed to commit transaction: %w", err))
783+
}
784+
earlyReturnPollingHandle = newWorkflowPollingHandle[any](uncancellableCtx, workflowStatus.ID)
785+
return nil
786+
}
796787

797-
// Create workflow state to track step execution
798-
wfState := &workflowState{
799-
workflowID: workflowID,
800-
stepID: -1, // Steps are O-indexed
801-
}
788+
// Create workflow state to track step execution
789+
wfState := &workflowState{
790+
workflowID: workflowID,
791+
stepID: -1, // Steps are O-indexed
792+
}
802793

803-
workflowCtx := WithValue(c, workflowStateKey, wfState)
794+
workflowCtx = WithValue(c, workflowStateKey, wfState)
804795

805-
// If the workflow has a timeout but no deadline, compute the deadline from the timeout.
806-
// Else use the durable deadline.
807-
durableDeadline := time.Time{}
808-
if insertStatusResult.timeout > 0 && insertStatusResult.workflowDeadline.IsZero() {
809-
durableDeadline = time.Now().Add(insertStatusResult.timeout)
810-
} else if !insertStatusResult.workflowDeadline.IsZero() {
811-
durableDeadline = insertStatusResult.workflowDeadline
812-
}
796+
// If the workflow has a timeout but no deadline, compute the deadline from the timeout.
797+
// Else use the durable deadline.
798+
durableDeadline := time.Time{}
799+
if insertStatusResult.timeout > 0 && insertStatusResult.workflowDeadline.IsZero() {
800+
durableDeadline = time.Now().Add(insertStatusResult.timeout)
801+
} else if !insertStatusResult.workflowDeadline.IsZero() {
802+
durableDeadline = insertStatusResult.workflowDeadline
803+
}
813804

814-
var stopFunc func() bool
815-
cancelFuncCompleted := make(chan struct{})
816-
if !durableDeadline.IsZero() {
817-
workflowCtx, _ = WithTimeout(workflowCtx, time.Until(durableDeadline))
818-
// Register a cancel function that cancels the workflow in the DB as soon as the context is cancelled
819-
dbosCancelFunction := func() {
820-
c.logger.Info("Cancelling workflow", "workflow_id", workflowID)
821-
err = retry(uncancellableCtx, func() error {
822-
return c.systemDB.cancelWorkflow(uncancellableCtx, workflowID)
823-
}, withRetrierLogger(c.logger))
824-
if err != nil {
825-
c.logger.Error("Failed to cancel workflow", "error", err)
805+
if !durableDeadline.IsZero() {
806+
workflowCtx, _ = WithTimeout(workflowCtx, time.Until(durableDeadline))
807+
// Register a cancel function that cancels the workflow in the DB as soon as the context is cancelled
808+
dbosCancelFunction := func() {
809+
c.logger.Info("Cancelling workflow", "workflow_id", workflowID)
810+
err = retry(uncancellableCtx, func() error {
811+
return c.systemDB.cancelWorkflow(uncancellableCtx, workflowID)
812+
}, withRetrierLogger(c.logger))
813+
if err != nil {
814+
c.logger.Error("Failed to cancel workflow", "error", err)
815+
}
816+
close(cancelFuncCompleted)
826817
}
827-
close(cancelFuncCompleted)
818+
stopFunc = context.AfterFunc(workflowCtx, dbosCancelFunction)
828819
}
829-
stopFunc = context.AfterFunc(workflowCtx, dbosCancelFunction)
830-
}
831820

832-
// Commit the transaction. This must happen before we start the goroutine to ensure the workflow is found by steps in the database
833-
if err := tx.Commit(uncancellableCtx); err != nil {
834-
return nil, newWorkflowExecutionError(workflowID, fmt.Errorf("failed to commit transaction: %w", err))
821+
// Commit the transaction. This must happen before we start the goroutine to ensure the workflow is found by steps in the database
822+
if err := tx.Commit(uncancellableCtx); err != nil {
823+
return newWorkflowExecutionError(workflowID, fmt.Errorf("failed to commit transaction: %w", err))
824+
}
825+
return nil
826+
}, withRetrierLogger(c.logger))
827+
if err != nil {
828+
return nil, err
829+
}
830+
if earlyReturnPollingHandle != nil {
831+
return earlyReturnPollingHandle, nil
835832
}
836833

837834
// Run the function in a goroutine

0 commit comments

Comments
 (0)