Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion docs/ENGINE_SPEC.md
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,22 @@ engine := NewEngine(pool,

This ensures low-priority steps eventually get processed even in busy queues.

### 6.4 Workflow Events
### 6.4 Long-Running Steps Mode

For handlers that run for extended periods (minutes to hours), enable long-running mode:

```go
engine := NewEngine(pool, WithLongRunningSteps())
```

In this mode, handler execution happens outside of the database transaction. This releases the DB connection during handler execution and makes the "running" status immediately visible.

**Trade-offs:**

- If a worker crashes mid-handler, the step remains in "running" status — implement a recovery mechanism to reset stuck steps
- Handlers should be idempotent or use `StepContext.IdempotencyKey()`

### 6.5 Workflow Events

All state changes are persisted as events.

Expand Down
288 changes: 288 additions & 0 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type Engine struct {
missingHandlerJitterPct float64
skipLogMu sync.Mutex
skipLogNextAllowed map[string]time.Time

// Long-running steps mode: execute handlers outside of transaction
longRunningSteps bool
}

// StartAwaitResult contains the result of StartAwait operation.
Expand Down Expand Up @@ -320,6 +323,11 @@ func (engine *Engine) ExecuteNext(ctx context.Context, workerID string) (empty b
return true, nil
}

// Use long-running mode if enabled
if engine.longRunningSteps {
return engine.executeNextLongRunning(ctx, workerID)
}

err = engine.txManager.ReadCommitted(ctx, func(ctx context.Context) error {
if engine.isShutdown() {
empty = true
Expand Down Expand Up @@ -501,6 +509,286 @@ func (engine *Engine) ExecuteNext(ctx context.Context, workerID string) (empty b
return empty, nil
}

// longRunningStepContext holds the data needed to execute a step across multiple transactions.
type longRunningStepContext struct {
item *QueueItem
instance *WorkflowInstance
step *WorkflowStep
stepDef *StepDefinition
def *WorkflowDefinition
}

// executeNextLongRunning executes the next step with handler running outside of transaction.
// This mode is optimized for handlers that run for extended periods (minutes to hours).
func (engine *Engine) executeNextLongRunning(ctx context.Context, workerID string) (empty bool, err error) {
var execCtx *longRunningStepContext
var skipStep bool
var useNormalExecution bool

// Transaction 1: Dequeue, validate, and prepare step for execution
err = engine.txManager.ReadCommitted(ctx, func(txCtx context.Context) error {
if engine.isShutdown() {
empty = true
return nil
}

item, err := engine.store.DequeueStep(txCtx, workerID)
if err != nil {
return fmt.Errorf("dequeue step: %w", err)
}

if item == nil {
empty = true
return nil
}

if engine.isShutdown() {
_ = engine.store.ReleaseQueueItem(txCtx, item.ID)
empty = true
return nil
}

instance, err := engine.store.GetInstance(txCtx, item.InstanceID)
if err != nil {
return fmt.Errorf("get instance: %w", err)
}

// If workflow is in DLQ state, skip execution
if instance.Status == StatusDLQ {
_ = engine.store.LogEvent(txCtx, instance.ID, nil, EventStepFailed, map[string]any{
KeyMessage: "instance in DLQ state, skipping queued item",
})
_ = engine.store.RemoveFromQueue(txCtx, item.ID)
skipStep = true
return nil
}

var step *WorkflowStep
if item.StepID == nil {
step, err = engine.createFirstStep(txCtx, instance)
if err != nil {
return fmt.Errorf("create first step: %w", err)
}
} else {
steps, err := engine.store.GetStepsByInstance(txCtx, instance.ID)
if err != nil {
return fmt.Errorf("get steps: %w", err)
}

for _, currStep := range steps {
currStep := currStep
if currStep.ID == *item.StepID {
step = &currStep
break
}
}

if step == nil {
return fmt.Errorf("step not found: %d", *item.StepID)
}
}

// Do not execute skipped or paused steps
if step.Status == StepStatusSkipped ||
step.Status == StepStatusPaused ||
step.Status == StepStatusRolledBack {
_ = engine.store.RemoveFromQueue(txCtx, step.ID)
skipStep = true
return nil
}

def, err := engine.store.GetWorkflowDefinition(txCtx, instance.WorkflowID)
if err != nil {
return fmt.Errorf("get workflow definition: %w", err)
}

stepDef, ok := def.Definition.Steps[step.StepName]
if !ok {
return fmt.Errorf("step definition not found: %s", step.StepName)
}

// Check if this is a compensation step
if step.Status == StepStatusCompensation {
if stepDef.OnFailure != "" {
if onFailDef, ok := def.Definition.Steps[stepDef.OnFailure]; ok {
engine.mu.RLock()
_, has := engine.handlers[onFailDef.Handler]
engine.mu.RUnlock()
if !has {
// No local handler, reschedule
delay := engine.jitteredCooldown()
if delay > 0 {
_ = engine.store.RescheduleAndReleaseQueueItem(txCtx, item.ID, delay)
} else {
_ = engine.store.ReleaseQueueItem(txCtx, item.ID)
}
logKey := fmt.Sprintf("comp-skip:%d:%s", instance.ID, step.StepName)
if engine.shouldLogSkip(logKey) {
_ = engine.store.LogEvent(txCtx, instance.ID, nil, EventStepSkippedMissingHandler, map[string]any{
KeyStepName: step.StepName,
KeyMessage: "no local compensation handler registered; rescheduled",
})
}
skipStep = true
return nil
}
}
}
// Compensation steps use normal execution (within transaction)
useNormalExecution = true
}

// Check for missing handlers on task steps
if step.StepType == StepTypeTask && !useNormalExecution {
engine.mu.RLock()
_, has := engine.handlers[stepDef.Handler]
engine.mu.RUnlock()
if !has {
delay := engine.jitteredCooldown()
if delay > 0 {
_ = engine.store.RescheduleAndReleaseQueueItem(txCtx, item.ID, delay)
} else {
_ = engine.store.ReleaseQueueItem(txCtx, item.ID)
}
logKey := fmt.Sprintf("task-skip:%d:%s", instance.ID, step.StepName)
if engine.shouldLogSkip(logKey) {
_ = engine.store.LogEvent(txCtx, instance.ID, nil, EventStepSkippedMissingHandler, map[string]any{
KeyStepName: step.StepName,
KeyMessage: "no local handler registered; rescheduled",
})
}
skipStep = true
return nil
}
}

// For non-Task steps (Fork, Join, Condition, SavePoint, Human), use normal execution
if step.StepType != StepTypeTask {
useNormalExecution = true
}

// If using normal execution, do it within this transaction
if useNormalExecution {
engine.activeSteps.Add(1)
defer engine.activeSteps.Done()
defer func() {
_ = engine.store.RemoveFromQueue(txCtx, item.ID)
}()

if step.Status == StepStatusCompensation {
return engine.executeCompensationStep(txCtx, instance, step)
}
return engine.executeStep(txCtx, instance, step)
}

// For Task steps in long-running mode:
// Update status to running and remove from queue within this transaction
if err := engine.store.UpdateStep(txCtx, step.ID, StepStatusRunning, nil, nil); err != nil {
return fmt.Errorf("update step status: %w", err)
}

_ = engine.store.LogEvent(txCtx, instance.ID, &step.ID, EventStepStarted, map[string]any{
KeyStepName: step.StepName,
KeyStepType: stepDef.Type,
})

_ = engine.store.RemoveFromQueue(txCtx, item.ID)

// Save context for handler execution outside transaction
execCtx = &longRunningStepContext{
item: item,
instance: instance,
step: step,
stepDef: stepDef,
def: def,
}

return nil
})

if err != nil {
return empty, err
}

if empty || skipStep || useNormalExecution || execCtx == nil {
return empty, nil
}

// Track active step for graceful shutdown
engine.activeSteps.Add(1)
defer engine.activeSteps.Done()

// Execute handler OUTSIDE of transaction
// This is the key difference - the DB connection is released during handler execution
handlerCtx, cancel := context.WithCancel(ctx)
defer cancel()

engine.registerInstanceContext(execCtx.instance.ID, execCtx.step.ID, cancel)
defer engine.unregisterInstanceContext(execCtx.instance.ID, execCtx.step.ID)

// Check for cancellation before starting handler
cancelReq, err := engine.store.GetCancelRequest(ctx, execCtx.instance.ID)
if err == nil && cancelReq != nil {
// Handle cancellation in a new transaction
return false, engine.txManager.ReadCommitted(ctx, func(txCtx context.Context) error {
return engine.handleCancellation(txCtx, execCtx.instance, execCtx.step, cancelReq)
})
}

// Apply timeout if configured
if execCtx.stepDef.Timeout != 0 {
var timeoutCancel context.CancelFunc
handlerCtx, timeoutCancel = context.WithTimeout(handlerCtx, execCtx.stepDef.Timeout)
defer timeoutCancel()
}

// PLUGIN HOOK: OnStepStart
if engine.pluginManager != nil {
if err := engine.pluginManager.ExecuteStepStart(ctx, execCtx.instance, execCtx.step); err != nil {
return false, engine.txManager.ReadCommitted(ctx, func(txCtx context.Context) error {
return engine.handleStepFailure(txCtx, execCtx.instance, execCtx.step, execCtx.stepDef, err)
})
}
}

// Execute the handler (this can take minutes/hours)
output, stepErr := engine.executeTask(handlerCtx, execCtx.instance, execCtx.step, execCtx.stepDef)

// Check for cancellation after handler completes
if errors.Is(handlerCtx.Err(), context.Canceled) {
cancelReq, err = engine.store.GetCancelRequest(ctx, execCtx.instance.ID)
if err == nil && cancelReq != nil {
return false, engine.txManager.ReadCommitted(ctx, func(txCtx context.Context) error {
return engine.handleCancellation(txCtx, execCtx.instance, execCtx.step, cancelReq)
})
}
}

// Transaction 2: Handle step result (success or failure)
err = engine.txManager.ReadCommitted(ctx, func(txCtx context.Context) error {
if stepErr != nil {
// PLUGIN HOOK: OnStepFailed
if engine.pluginManager != nil {
if errPlugin := engine.pluginManager.ExecuteStepFailed(txCtx, execCtx.instance, execCtx.step, stepErr); errPlugin != nil {
slog.Warn("[floxy] plugin hook OnStepFailed failed", "error", errPlugin)
}
}
return engine.handleStepFailure(txCtx, execCtx.instance, execCtx.step, execCtx.stepDef, stepErr)
}

// PLUGIN HOOK: OnStepComplete
if engine.pluginManager != nil {
if err := engine.pluginManager.ExecuteStepComplete(txCtx, execCtx.instance, execCtx.step); err != nil {
slog.Warn("[floxy] plugin hook OnStepComplete failed", "error", err)
}
}

return engine.handleStepSuccess(txCtx, execCtx.instance, execCtx.step, execCtx.stepDef, output, true)
})

return false, err
}

func (engine *Engine) MakeHumanDecision(
ctx context.Context,
stepID int64,
Expand Down
Loading