@@ -63,28 +63,30 @@ func NewExecutor(logger log.Logger, registry *Registry, historyProvider Workflow
63
63
}
64
64
65
65
func (e * executor ) ExecuteTask (ctx context.Context , t * task.Workflow ) (* ExecutionResult , error ) {
66
- e .logger .Debug ("Executing workflow task" , "task_id" , t .ID , "instance_id" , t .WorkflowInstance .InstanceID )
66
+ logger := e .logger .With ("task_id" , t .ID , "instance_id" , t .WorkflowInstance .InstanceID )
67
+
68
+ logger .Debug ("Executing workflow task" )
67
69
68
70
e .workflowState .ClearCommands ()
69
71
70
72
skipNewEvents := false
71
73
72
74
if t .LastSequenceID > e .lastSequenceID {
73
- e . logger .Debug ("Task has newer history than current state, fetching and replaying history" , "task_sequence_id" , t .LastSequenceID , "sequence_id" , e .lastSequenceID )
75
+ logger .Debug ("Task has newer history than current state, fetching and replaying history" , "task_sequence_id" , t .LastSequenceID , "sequence_id" , e .lastSequenceID )
74
76
75
77
h , err := e .historyProvider .GetWorkflowInstanceHistory (ctx , t .WorkflowInstance , & e .lastSequenceID )
76
78
if err != nil {
77
79
return nil , fmt .Errorf ("getting workflow history: %w" , err )
78
80
}
79
81
80
82
if err := e .replayHistory (h ); err != nil {
81
- e . logger .Error ("Error while replaying history" , "error" , err )
83
+ logger .Error ("Error while replaying history" , "error" , err )
82
84
83
85
// Fail workflow with an error. Skip executing new events, but still go through the commands
84
86
e .workflowCompleted (nil , err )
85
87
skipNewEvents = true
86
88
} else if t .LastSequenceID != e .lastSequenceID {
87
- e . logger .Debug ("Task has newer history than current state" , "task_sequence_id" , t .LastSequenceID , "sequence_id" , e .lastSequenceID )
89
+ logger .Debug ("Task has newer history than current state" , "task_sequence_id" , t .LastSequenceID , "sequence_id" , e .lastSequenceID )
88
90
89
91
return nil , errors .New ("even after fetching history and replaying history executor state does not match task" )
90
92
}
@@ -103,7 +105,7 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
103
105
var err error
104
106
executedEvents , err = e .executeNewEvents (toExecute )
105
107
if err != nil {
106
- e . logger .Error ("Error while executing new events" , "error" , err )
108
+ logger .Error ("Error while executing new events" , "error" , err )
107
109
108
110
e .workflowCompleted (nil , err )
109
111
}
@@ -122,9 +124,7 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
122
124
executedEvents [i ].SequenceID = e .nextSequenceID ()
123
125
}
124
126
125
- e .logger .Debug ("Finished workflow task" ,
126
- "task_id" , t .ID ,
127
- "instance_id" , t .WorkflowInstance .InstanceID ,
127
+ logger .Debug ("Finished workflow task" ,
128
128
"executed" , len (executedEvents ),
129
129
"last_sequence_id" , e .lastSequenceID ,
130
130
"completed" , completed ,
0 commit comments