Skip to content

Commit fc04f1f

Browse files
committed
fix bug -- durable stuff commit must happen before workflow is started
1 parent 37c7cea commit fc04f1f

File tree

2 files changed

+8
-8
lines changed

2 files changed

+8
-8
lines changed

dbos/dbos.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ func WithoutCancel(ctx DBOSContext) DBOSContext {
154154
return nil
155155
}
156156
if dbosCtx, ok := ctx.(*dbosContext); ok {
157-
// Spawn a new child context without the cancel function
158157
return &dbosContext{
159158
ctx: context.WithoutCancel(dbosCtx.ctx),
160159
logger: dbosCtx.logger,
@@ -168,15 +167,13 @@ func WithoutCancel(ctx DBOSContext) DBOSContext {
168167
}
169168
}
170169
return nil
171-
172170
}
173171

174172
func WithTimeout(ctx DBOSContext, timeout time.Duration) (DBOSContext, context.CancelFunc) {
175173
if ctx == nil {
176174
return nil, func() {}
177175
}
178176
if dbosCtx, ok := ctx.(*dbosContext); ok {
179-
// Spawn a new child context with a deadline and a cancel function
180177
newCtx, cancelFunc := context.WithTimeoutCause(dbosCtx.ctx, timeout, errors.New("DBOS context timeout"))
181178
return &dbosContext{
182179
ctx: newCtx,
@@ -319,6 +316,7 @@ func (c *dbosContext) Launch() error {
319316
}
320317

321318
// We might consider renaming this to "Cancel" to me more idiomatic
319+
// TODO: shutdown should really have a timeout and return an error if it wasn't able to shutdown everything
322320
func (c *dbosContext) Shutdown() {
323321
c.logger.Info("Shutting down DBOS context")
324322

dbos/workflow.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,7 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
572572
}
573573
insertStatusResult, err := c.systemDB.InsertWorkflowStatus(uncancellableCtx, insertInput)
574574
if err != nil {
575+
c.logger.Error("failed to insert workflow status", "error", err, "workflow_id", workflowID)
575576
return nil, err
576577
}
577578

@@ -597,6 +598,7 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
597598
}
598599
err = c.systemDB.RecordChildWorkflow(uncancellableCtx, childInput)
599600
if err != nil {
601+
c.logger.Error("failed to record child workflow", "error", err, "parent_workflow_id", parentWorkflowState.workflowID, "child_workflow_id", workflowID)
600602
return nil, newWorkflowExecutionError(parentWorkflowState.workflowID, fmt.Sprintf("recording child workflow: %v", err))
601603
}
602604
}
@@ -631,6 +633,11 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
631633
stopFunc = context.AfterFunc(workflowCtx, dbosCancelFunction)
632634
}
633635

636+
// Commit the transaction. This must happen before we start the goroutine to ensure the workflow is found by steps in the database
637+
if err := tx.Commit(uncancellableCtx); err != nil {
638+
return nil, newWorkflowExecutionError(workflowID, fmt.Sprintf("failed to commit transaction: %v", err))
639+
}
640+
634641
// Run the function in a goroutine
635642
c.workflowsWg.Add(1)
636643
go func() {
@@ -668,11 +675,6 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
668675
close(outcomeChan)
669676
}()
670677

671-
// Commit the transaction
672-
if err := tx.Commit(uncancellableCtx); err != nil {
673-
return nil, newWorkflowExecutionError(workflowID, fmt.Sprintf("failed to commit transaction: %v", err))
674-
}
675-
676678
return &workflowHandle[any]{workflowID: workflowID, outcomeChan: outcomeChan, dbosContext: uncancellableCtx}, nil
677679
}
678680

0 commit comments

Comments
 (0)