Skip to content

Commit c24252c

Browse files
authored
Merge pull request #204 from cschleiden/namespace-log-fields
Namespace fields in logs
2 parents df7dbc1 + 15c7983 commit c24252c

File tree

17 files changed

+147
-73
lines changed

17 files changed

+147
-73
lines changed

backend/redis/diagnostics.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77

88
"github.com/cschleiden/go-workflows/diag"
9+
"github.com/cschleiden/go-workflows/log"
910
redis "github.com/redis/go-redis/v9"
1011
)
1112

@@ -21,7 +22,7 @@ func (rb *redisBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
2122
}
2223

2324
if len(scores) == 0 {
24-
rb.Logger().Error("could not find instance %v", "afterInstanceID", afterInstanceID)
25+
rb.Logger().Error("could not find instance %v", log.NamespaceKey+".redis.afterInstanceID", afterInstanceID)
2526
return nil, nil
2627
}
2728

backend/redis/signal.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/cschleiden/go-workflows/internal/history"
88
"github.com/cschleiden/go-workflows/internal/tracing"
9+
"github.com/cschleiden/go-workflows/log"
910
"github.com/redis/go-redis/v9"
1011
"go.opentelemetry.io/otel/attribute"
1112
"go.opentelemetry.io/otel/trace"
@@ -20,8 +21,8 @@ func (rb *redisBackend) SignalWorkflow(ctx context.Context, instanceID string, e
2021
ctx = tracing.UnmarshalSpan(ctx, instanceState.Metadata)
2122
a := event.Attributes.(*history.SignalReceivedAttributes)
2223
_, span := rb.Tracer().Start(ctx, fmt.Sprintf("SignalWorkflow: %s", a.Name), trace.WithAttributes(
23-
attribute.String(tracing.WorkflowInstanceID, instanceID),
24-
attribute.String("signal.name", event.Attributes.(*history.SignalReceivedAttributes).Name),
24+
attribute.String(log.InstanceIDKey, instanceID),
25+
attribute.String(log.SignalNameKey, event.Attributes.(*history.SignalReceivedAttributes).Name),
2526
))
2627
defer span.End()
2728

backend/redis/workflow.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/cschleiden/go-workflows/internal/history"
1212
"github.com/cschleiden/go-workflows/internal/task"
1313
"github.com/cschleiden/go-workflows/internal/tracing"
14+
"github.com/cschleiden/go-workflows/log"
1415
"github.com/redis/go-redis/v9"
1516
"go.opentelemetry.io/otel/attribute"
1617
"go.opentelemetry.io/otel/trace"
@@ -265,7 +266,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
265266

266267
for _, cmd := range executedCmds {
267268
if cmdErr := cmd.Err(); cmdErr != nil {
268-
rb.Logger().Debug("redis command error", "cmd", cmd.FullName(), "cmdErr", cmdErr.Error())
269+
rb.Logger().Debug("redis command error", log.NamespaceKey+".redis.cmd", cmd.FullName(), log.NamespaceKey+".redis.cmdErr", cmdErr.Error())
269270
}
270271
}
271272

@@ -277,7 +278,7 @@ func (rb *redisBackend) CompleteWorkflowTask(
277278
ctx = tracing.UnmarshalSpan(ctx, instanceState.Metadata)
278279
_, span := rb.Tracer().Start(ctx, "WorkflowComplete",
279280
trace.WithAttributes(
280-
attribute.String("workflow_instance_id", instanceState.Instance.InstanceID),
281+
attribute.String(log.NamespaceKey+log.InstanceIDKey, instanceState.Instance.InstanceID),
281282
))
282283
span.End()
283284

client/client.go

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cschleiden/go-workflows/internal/history"
1616
"github.com/cschleiden/go-workflows/internal/metrickeys"
1717
"github.com/cschleiden/go-workflows/internal/tracing"
18+
"github.com/cschleiden/go-workflows/log"
1819
"github.com/cschleiden/go-workflows/metrics"
1920
"github.com/cschleiden/go-workflows/workflow"
2021
"go.opentelemetry.io/otel/attribute"
@@ -73,8 +74,8 @@ func (c *client) CreateWorkflowInstance(ctx context.Context, options WorkflowIns
7374

7475
// Start new span and add to metadata
7576
sctx, span := c.backend.Tracer().Start(ctx, fmt.Sprintf("CreateWorkflowInstance: %s", workflowName), trace.WithAttributes(
76-
attribute.String(tracing.WorkflowInstanceID, wfi.InstanceID),
77-
attribute.String(tracing.WorkflowName, workflowName),
77+
attribute.String(log.InstanceIDKey, wfi.InstanceID),
78+
attribute.String(log.WorkflowNameKey, workflowName),
7879
))
7980
defer span.End()
8081

@@ -93,19 +94,30 @@ func (c *client) CreateWorkflowInstance(ctx context.Context, options WorkflowIns
9394
return nil, fmt.Errorf("creating workflow instance: %w", err)
9495
}
9596

96-
c.backend.Logger().Debug("Created workflow instance", "instance_id", wfi.InstanceID)
97+
c.backend.Logger().Debug("Created workflow instance", log.InstanceIDKey, wfi.InstanceID)
9798

9899
c.backend.Metrics().Counter(metrickeys.WorkflowInstanceCreated, metrics.Tags{}, 1)
99100

100101
return wfi, nil
101102
}
102103

103104
func (c *client) CancelWorkflowInstance(ctx context.Context, instance *workflow.Instance) error {
105+
ctx, span := c.backend.Tracer().Start(ctx, "CancelWorkflowInstance", trace.WithAttributes(
106+
attribute.String(log.InstanceIDKey, instance.InstanceID),
107+
))
108+
defer span.End()
109+
104110
cancellationEvent := history.NewWorkflowCancellationEvent(time.Now())
105111
return c.backend.CancelWorkflowInstance(ctx, instance, cancellationEvent)
106112
}
107113

108114
func (c *client) SignalWorkflow(ctx context.Context, instanceID string, name string, arg interface{}) error {
115+
ctx, span := c.backend.Tracer().Start(ctx, "SignalWorkflow", trace.WithAttributes(
116+
attribute.String(log.InstanceIDKey, instanceID),
117+
attribute.String(log.SignalNameKey, name),
118+
))
119+
defer span.End()
120+
109121
input, err := c.backend.Converter().To(arg)
110122
if err != nil {
111123
return fmt.Errorf("converting arguments: %w", err)
@@ -125,7 +137,7 @@ func (c *client) SignalWorkflow(ctx context.Context, instanceID string, name str
125137
return err
126138
}
127139

128-
c.backend.Logger().Debug("Signaled workflow instance", "instance_id", instanceID)
140+
c.backend.Logger().Debug("Signaled workflow instance", log.InstanceIDKey, instanceID)
129141

130142
return nil
131143
}
@@ -135,6 +147,11 @@ func (c *client) WaitForWorkflowInstance(ctx context.Context, instance *workflow
135147
timeout = time.Second * 20
136148
}
137149

150+
ctx, span := c.backend.Tracer().Start(ctx, "WaitForWorkflowInstance", trace.WithAttributes(
151+
attribute.String(log.InstanceIDKey, instance.InstanceID),
152+
))
153+
defer span.End()
154+
138155
b := backoff.ExponentialBackOff{
139156
InitialInterval: time.Millisecond * 1,
140157
MaxInterval: time.Second * 1,
@@ -166,13 +183,18 @@ func (c *client) WaitForWorkflowInstance(ctx context.Context, instance *workflow
166183
// GetWorkflowResult gets the workflow result for the given workflow result. It first waits for the workflow to finish or until
167184
// the given timeout has expired.
168185
func GetWorkflowResult[T any](ctx context.Context, c Client, instance *workflow.Instance, timeout time.Duration) (T, error) {
186+
ic := c.(*client)
187+
b := ic.backend
188+
189+
ctx, span := b.Tracer().Start(ctx, "GetWorkflowResult", trace.WithAttributes(
190+
attribute.String(log.InstanceIDKey, instance.InstanceID),
191+
))
192+
defer span.End()
193+
169194
if err := c.WaitForWorkflowInstance(ctx, instance, timeout); err != nil {
170195
return *new(T), fmt.Errorf("workflow did not finish in time: %w", err)
171196
}
172197

173-
ic := c.(*client)
174-
b := ic.backend
175-
176198
h, err := b.GetWorkflowInstanceHistory(ctx, instance, nil)
177199
if err != nil {
178200
return *new(T), fmt.Errorf("getting workflow history: %w", err)
@@ -207,5 +229,10 @@ func GetWorkflowResult[T any](ctx context.Context, c Client, instance *workflow.
207229
}
208230

209231
func (c *client) RemoveWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) error {
232+
ctx, span := c.backend.Tracer().Start(ctx, "RemoveWorkflowInstance", trace.WithAttributes(
233+
attribute.String(log.InstanceIDKey, instance.InstanceID),
234+
))
235+
defer span.End()
236+
210237
return c.backend.RemoveWorkflowInstance(ctx, instance)
211238
}

client/client_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/google/uuid"
1717
"github.com/stretchr/testify/mock"
1818
"github.com/stretchr/testify/require"
19+
"go.opentelemetry.io/otel/trace"
1920
)
2021

2122
func Test_Client_CreateWorkflowInstance_ParamMismatch(t *testing.T) {
@@ -45,6 +46,7 @@ func Test_Client_GetWorkflowResultTimeout(t *testing.T) {
4546
ctx := context.Background()
4647

4748
b := &backend.MockBackend{}
49+
b.On("Tracer").Return(trace.NewNoopTracerProvider().Tracer("test"))
4850
b.On("GetWorkflowInstanceState", mock.Anything, instance).Return(core.WorkflowInstanceStateActive, nil)
4951

5052
c := &client{
@@ -68,6 +70,7 @@ func Test_Client_GetWorkflowResultSuccess(t *testing.T) {
6870
r, _ := converter.DefaultConverter.To(42)
6971

7072
b := &backend.MockBackend{}
73+
b.On("Tracer").Return(trace.NewNoopTracerProvider().Tracer("test"))
7174
b.On("GetWorkflowInstanceState", mock.Anything, instance).Return(core.WorkflowInstanceStateActive, nil).Once().Run(func(args mock.Arguments) {
7275
// After the first call, advance the clock to immediately go to the second call below
7376
mockClock.Add(time.Second)
@@ -99,9 +102,10 @@ func Test_Client_SignalWorkflow(t *testing.T) {
99102
ctx := context.Background()
100103

101104
b := &backend.MockBackend{}
105+
b.On("Tracer").Return(trace.NewNoopTracerProvider().Tracer("test"))
102106
b.On("Logger").Return(logger.NewDefaultLogger())
103107
b.On("Converter").Return(converter.DefaultConverter)
104-
b.On("SignalWorkflow", ctx, instanceID, mock.MatchedBy(func(event *history.Event) bool {
108+
b.On("SignalWorkflow", mock.Anything, instanceID, mock.MatchedBy(func(event *history.Event) bool {
105109
return event.Type == history.EventType_SignalReceived &&
106110
event.Attributes.(*history.SignalReceivedAttributes).Name == "test"
107111
})).Return(nil)
@@ -127,9 +131,10 @@ func Test_Client_SignalWorkflow_WithArgs(t *testing.T) {
127131
input, _ := converter.DefaultConverter.To(arg)
128132

129133
b := &backend.MockBackend{}
134+
b.On("Tracer").Return(trace.NewNoopTracerProvider().Tracer("test"))
130135
b.On("Logger").Return(logger.NewDefaultLogger())
131136
b.On("Converter").Return(converter.DefaultConverter)
132-
b.On("SignalWorkflow", ctx, instanceID, mock.MatchedBy(func(event *history.Event) bool {
137+
b.On("SignalWorkflow", mock.Anything, instanceID, mock.MatchedBy(func(event *history.Event) bool {
133138
return event.Type == history.EventType_SignalReceived &&
134139
event.Attributes.(*history.SignalReceivedAttributes).Name == "test" &&
135140
bytes.Equal(event.Attributes.(*history.SignalReceivedAttributes).Arg, input)

internal/activity/activitystate.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ func NewActivityState(activityID string, instance *workflow.Instance, logger log
1818
activityID,
1919
instance,
2020
logger.With(
21-
"activity_id", activityID,
22-
"instance_id", instance.InstanceID,
21+
log.ActivityIDKey, activityID,
22+
log.InstanceIDKey, instance.InstanceID,
2323
)}
2424
}
2525

internal/activity/executor.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,10 @@ func (e *Executor) ExecuteActivity(ctx context.Context, task *task.Activity) (pa
6060
activityCtx := WithActivityState(ctx, as)
6161

6262
activityCtx = tracing.UnmarshalSpan(activityCtx, task.Metadata)
63-
activityCtx, span := e.tracer.Start(activityCtx, "ActivityTaskExecution", trace.WithAttributes(
64-
attribute.String("activity", a.Name),
65-
attribute.String(tracing.WorkflowInstanceID, task.WorkflowInstance.InstanceID),
66-
attribute.String(tracing.ActivityTaskID, task.ID),
63+
activityCtx, span := e.tracer.Start(activityCtx, fmt.Sprintf("ActivityTaskExecution: %s", a.Name), trace.WithAttributes(
64+
attribute.String(log.ActivityNameKey, a.Name),
65+
attribute.String(log.InstanceIDKey, task.WorkflowInstance.InstanceID),
66+
attribute.String(log.ActivityIDKey, task.ID),
6767
))
6868
defer span.End()
6969

internal/tracing/attributes.go

Lines changed: 0 additions & 13 deletions
This file was deleted.

internal/workflow/executor.go

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@ func NewExecutor(logger log.Logger, tracer trace.Tracer, registry *Registry, cv
8181
func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*ExecutionResult, error) {
8282
ctx = tracing.UnmarshalSpan(ctx, t.Metadata)
8383
ctx, span := e.tracer.Start(ctx, "WorkflowTaskExecution", trace.WithAttributes(
84-
attribute.String(tracing.WorkflowInstanceID, t.WorkflowInstance.InstanceID),
85-
attribute.String(tracing.WorkflowTaskID, t.ID),
86-
attribute.Int(tracing.WorkflowTaskEvents, len(t.NewEvents)),
84+
attribute.String(log.InstanceIDKey, t.WorkflowInstance.InstanceID),
85+
attribute.String(log.TaskIDKey, t.ID),
86+
attribute.Int(log.NewEventsKey, len(t.NewEvents)),
8787
))
8888
defer span.End()
8989

@@ -92,17 +92,22 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
9292
// execution to be associated with the span for the WorkflowTaskExecution.
9393
e.workflowTracer.UpdateExecution(span)
9494

95-
logger := e.logger.With("task_id", t.ID, "instance_id", t.WorkflowInstance.InstanceID)
95+
logger := e.logger.With(
96+
log.TaskIDKey, t.ID,
97+
log.InstanceIDKey, t.WorkflowInstance.InstanceID)
9698

97-
logger.Debug("Executing workflow task", "task_last_sequence_id", t.LastSequenceID)
99+
logger.Debug("Executing workflow task", log.TaskLastSequenceIDKey, t.LastSequenceID)
98100

99101
if t.WorkflowInstanceState == core.WorkflowInstanceStateFinished {
100102
// This could happen if signals are delivered after the workflow is finished
101103
logger.Error("Received workflow task for finished workflow instance, discarding events")
102104

103105
// Log events that caused this task to be scheduled
104106
for _, event := range t.NewEvents {
105-
logger.Debug("Discarded event:", "id", event.ID, "event_type", event.Type.String(), "schedule_event_id", event.ScheduleEventID)
107+
logger.Debug("Discarded event:",
108+
log.EventIDKey, event.ID,
109+
log.EventTypeKey, event.Type.String(),
110+
log.ScheduleEventIDKey, event.ScheduleEventID)
106111
}
107112

108113
return &ExecutionResult{
@@ -113,7 +118,9 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
113118
skipNewEvents := false
114119

115120
if t.LastSequenceID > e.lastSequenceID {
116-
logger.Debug("Task has newer history than current state, fetching and replaying history", "task_sequence_id", t.LastSequenceID, "local_sequence_id", e.lastSequenceID)
121+
logger.Debug("Task has newer history than current state, fetching and replaying history",
122+
log.TaskSequenceIDKey, t.LastSequenceID,
123+
log.LocalSequenceIDKey, e.lastSequenceID)
117124

118125
h, err := e.historyProvider.GetWorkflowInstanceHistory(ctx, t.WorkflowInstance, &e.lastSequenceID)
119126
if err != nil {
@@ -130,7 +137,9 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
130137
// With an error occurred during replay, we need to ensure new events don't get duplicate sequence ids
131138
e.lastSequenceID = t.LastSequenceID
132139
} else if t.LastSequenceID != e.lastSequenceID {
133-
logger.Error("After replaying history, task still has newer history than current state", "task_sequence_id", t.LastSequenceID, "local_sequence_id", e.lastSequenceID)
140+
logger.Error("After replaying history, task still has newer history than current state",
141+
log.TaskSequenceIDKey, t.LastSequenceID,
142+
log.LocalSequenceIDKey, e.lastSequenceID)
134143

135144
return nil, errors.New("even after fetching history and replaying history executor state does not match task")
136145
}
@@ -188,9 +197,9 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
188197
}
189198

190199
logger.Debug("Finished workflow task",
191-
"executed", len(executedEvents),
192-
"last_sequence_id", e.lastSequenceID,
193-
"completed", completed,
200+
log.ExecutedEventsKey, len(executedEvents),
201+
log.TaskLastSequenceIDKey, e.lastSequenceID,
202+
log.WorkflowCompletedKey, completed,
194203
)
195204

196205
return &ExecutionResult{
@@ -242,7 +251,7 @@ func (e *executor) executeNewEvents(newEvents []*history.Event) ([]*history.Even
242251

243252
func (e *executor) Close() {
244253
if e.workflow != nil {
245-
e.logger.Debug("Stopping workflow executor", "instance_id", e.workflowState.Instance().InstanceID)
254+
e.logger.Debug("Stopping workflow executor", log.InstanceIDKey, e.workflowState.Instance().InstanceID)
246255

247256
// End workflow if running to prevent leaking goroutines
248257
e.workflow.Close()
@@ -251,12 +260,12 @@ func (e *executor) Close() {
251260

252261
func (e *executor) executeEvent(event *history.Event) error {
253262
e.logger.Debug("Executing event",
254-
"instance_id", e.workflowState.Instance().InstanceID,
255-
"event_id", event.ID,
256-
"seq_id", event.SequenceID,
257-
"event_type", event.Type,
258-
"schedule_event_id", event.ScheduleEventID,
259-
"is_replaying", e.workflowState.Replaying(),
263+
log.InstanceIDKey, e.workflowState.Instance().InstanceID,
264+
log.EventIDKey, event.ID,
265+
log.SeqIDKey, event.SequenceID,
266+
log.EventTypeKey, event.Type,
267+
log.ScheduleEventIDKey, event.ScheduleEventID,
268+
log.IsReplayingKey, e.workflowState.Replaying(),
260269
)
261270

262271
var err error

internal/workflowstate/workflowstate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func NewWorkflowState(instance *core.WorkflowInstance, logger log.Logger, clock
7676
}
7777

7878
state.logger = NewReplayLogger(state, logger.With(
79-
"instance_id", instance.InstanceID,
79+
log.InstanceIDKey, instance.InstanceID,
8080
))
8181

8282
return state

0 commit comments

Comments
 (0)