@@ -16,6 +16,7 @@ import (
16
16
"github.com/cschleiden/go-workflows/internal/payload"
17
17
"github.com/cschleiden/go-workflows/internal/sync"
18
18
"github.com/cschleiden/go-workflows/internal/task"
19
+ "github.com/cschleiden/go-workflows/internal/workflowerrors"
19
20
"github.com/cschleiden/go-workflows/internal/workflowstate"
20
21
"github.com/cschleiden/go-workflows/internal/workflowtracer"
21
22
"github.com/cschleiden/go-workflows/log"
@@ -49,6 +50,7 @@ type executor struct {
49
50
workflowState * workflowstate.WfState
50
51
workflowCtx sync.Context
51
52
workflowCtxCancel sync.CancelFunc
53
+ cv converter.Converter
52
54
clock clock.Clock
53
55
logger log.Logger
54
56
tracer trace.Tracer
@@ -96,6 +98,7 @@ func NewExecutor(
96
98
workflowState : s ,
97
99
workflowCtx : wfCtx ,
98
100
workflowCtxCancel : cancel ,
101
+ cv : cv ,
99
102
clock : clock ,
100
103
logger : logger ,
101
104
tracer : tracer ,
@@ -279,14 +282,21 @@ func (e *executor) Close() {
279
282
}
280
283
281
284
func (e * executor ) executeEvent (event * history.Event ) error {
282
- e . logger . Debug ( "Executing event" ,
285
+ fields := [] any {
283
286
log .InstanceIDKey , e .workflowState .Instance ().InstanceID ,
284
287
log .EventIDKey , event .ID ,
285
288
log .SeqIDKey , event .SequenceID ,
286
289
log .EventTypeKey , event .Type ,
287
290
log .ScheduleEventIDKey , event .ScheduleEventID ,
288
291
log .IsReplayingKey , e .workflowState .Replaying (),
289
- )
292
+ }
293
+
294
+ attributesFields := getAttributesLoggingFields (event )
295
+ if attributesFields != nil {
296
+ fields = append (fields , attributesFields ... )
297
+ }
298
+
299
+ e .logger .Debug ("Executing event" , fields )
290
300
291
301
var err error
292
302
@@ -423,7 +433,8 @@ func (e *executor) handleActivityFailed(event *history.Event, a *history.Activit
423
433
return errors .New ("no pending future for activity failed event" )
424
434
}
425
435
426
- if err := f (nil , errors .New (a .Reason )); err != nil {
436
+ actErr := workflowerrors .ToError (a .Error )
437
+ if err := f (nil , actErr ); err != nil {
427
438
return fmt .Errorf ("setting activity failed result: %w" , err )
428
439
}
429
440
@@ -561,7 +572,9 @@ func (e *executor) handleSubWorkflowFailed(event *history.Event, a *history.SubW
561
572
return errors .New ("no pending future found for sub workflow failed event" )
562
573
}
563
574
564
- if err := f (nil , errors .New (a .Error )); err != nil {
575
+ wfErr := workflowerrors .ToError (a .Error )
576
+
577
+ if err := f (nil , wfErr ); err != nil {
565
578
return fmt .Errorf ("setting sub workflow failed result: %w" , err )
566
579
}
567
580
@@ -641,11 +654,13 @@ func (e *executor) handleSideEffectResult(event *history.Event, a *history.SideE
641
654
return e .workflow .Continue ()
642
655
}
643
656
644
- func (e * executor ) workflowCompleted (result payload.Payload , err error ) {
657
+ func (e * executor ) workflowCompleted (result payload.Payload , wfErr error ) error {
645
658
eventId := e .workflowState .GetNextScheduleEventID ()
646
659
647
- cmd := command .NewCompleteWorkflowCommand (eventId , e .workflowState .Instance (), result , err )
660
+ cmd := command .NewCompleteWorkflowCommand (eventId , e .workflowState .Instance (), result , workflowerrors . FromError ( wfErr ) )
648
661
e .workflowState .AddCommand (cmd )
662
+
663
+ return nil
649
664
}
650
665
651
666
func (e * executor ) workflowRestarted (result payload.Payload , continueAsNew * continueasnew.Error ) {
@@ -668,3 +683,30 @@ func (e *executor) createNewEvent(eventType history.EventType, attributes interf
668
683
opts ... ,
669
684
)
670
685
}
686
+
687
+ func getAttributesLoggingFields (event * history.Event ) []any {
688
+ switch event .Type {
689
+ case history .EventType_WorkflowExecutionStarted :
690
+ attributes := event .Attributes .(* history.ExecutionStartedAttributes )
691
+ return []any {
692
+ log .WorkflowNameKey , attributes .Name ,
693
+ }
694
+ case history .EventType_SubWorkflowScheduled :
695
+ attributes := event .Attributes .(* history.SubWorkflowScheduledAttributes )
696
+ return []any {
697
+ log .WorkflowNameKey , attributes .Name ,
698
+ }
699
+ case history .EventType_SignalReceived :
700
+ attributes := event .Attributes .(* history.SignalReceivedAttributes )
701
+ return []any {
702
+ log .SignalNameKey , attributes .Name ,
703
+ }
704
+ case history .EventType_ActivityScheduled :
705
+ attributes := event .Attributes .(* history.ActivityScheduledAttributes )
706
+ return []any {
707
+ log .ActivityNameKey , attributes .Name ,
708
+ }
709
+ default :
710
+ return nil
711
+ }
712
+ }
0 commit comments