Skip to content

Commit f4e4a1c

Browse files
authored
Include activity and workflow names in spans
1 parent 3f14d95 commit f4e4a1c

File tree

10 files changed

+32
-20
lines changed

10 files changed

+32
-20
lines changed

backend/redis/events.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package redis
33
import (
44
"context"
55
"encoding/json"
6+
"strconv"
67

78
"github.com/cschleiden/go-workflows/internal/core"
89
"github.com/cschleiden/go-workflows/internal/history"
@@ -71,7 +72,7 @@ func addFutureEventP(ctx context.Context, p redis.Pipeliner, instance *core.Work
7172
addFutureEventCmd.Run(
7273
ctx, p,
7374
[]string{futureEventsKey(), futureEventKey(instance.InstanceID, event.ScheduleEventID)},
74-
event.VisibleAt.Unix(),
75+
strconv.FormatInt(event.VisibleAt.UnixMilli(), 10),
7576
instance.InstanceID,
7677
string(eventData),
7778
)

backend/redis/signal.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ func (rb *redisBackend) SignalWorkflow(ctx context.Context, instanceID string, e
1818
}
1919

2020
ctx = tracing.UnmarshalSpan(ctx, instanceState.Metadata)
21-
_, span := rb.Tracer().Start(ctx, "SignalWorkflow", trace.WithAttributes(
21+
a := event.Attributes.(*history.SignalReceivedAttributes)
22+
_, span := rb.Tracer().Start(ctx, fmt.Sprintf("SignalWorkflow: %s", a.Name), trace.WithAttributes(
2223
attribute.String(tracing.WorkflowInstanceID, instanceID),
2324
attribute.String("signal.name", event.Attributes.(*history.SignalReceivedAttributes).Name),
2425
))

backend/redis/workflow.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ var futureEventsCmd = redis.NewScript(`
5757

5858
func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, error) {
5959
// Check for future events
60-
now := time.Now().Unix()
61-
nowStr := strconv.Itoa(int(now))
60+
now := time.Now().UnixMilli()
61+
nowStr := strconv.FormatInt(now, 10)
6262

6363
queueKeys := rb.workflowQueue.Keys()
6464

client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (c *client) CreateWorkflowInstance(ctx context.Context, options WorkflowIns
7373
metadata := &workflow.Metadata{}
7474

7575
// Start new span and add to metadata
76-
sctx, span := c.backend.Tracer().Start(ctx, "CreateWorkflowInstance", trace.WithAttributes(
76+
sctx, span := c.backend.Tracer().Start(ctx, fmt.Sprintf("CreateWorkflowInstance: %s", workflowName), trace.WithAttributes(
7777
attribute.String(tracing.WorkflowInstanceID, wfi.InstanceID),
7878
attribute.String(tracing.WorkflowName, workflowName),
7979
))

samples/runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func GetBackend(name string, opt ...backend.BackendOption) backend.Backend {
3434
WriteTimeout: time.Second * 30,
3535
ReadTimeout: time.Second * 30,
3636
})
37-
b, err := redis.NewRedisBackend(rclient)
37+
b, err := redis.NewRedisBackend(rclient, redis.WithBackendOptions(opt...))
3838
if err != nil {
3939
panic(err)
4040
}

samples/tracing/tracing.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616

1717
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
1818
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
19-
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
2019

2120
"github.com/cschleiden/go-workflows/worker"
2221

@@ -33,19 +32,24 @@ func main() {
3332
attribute.String("environment", "sample"),
3433
)
3534

36-
stdoutexp, err := stdouttrace.New(stdouttrace.WithPrettyPrint())
37-
if err != nil {
38-
panic(err)
39-
}
35+
// stdoutexp, err := stdouttrace.New(stdouttrace.WithPrettyPrint())
36+
// if err != nil {
37+
// panic(err)
38+
// }
4039

41-
oclient := otlptracehttp.NewClient(otlptracehttp.WithEndpoint("localhost:8360"), otlptracehttp.WithURLPath("/traces/otlp/v0.9"), otlptracehttp.WithInsecure())
40+
oclient := otlptracehttp.NewClient(
41+
// otlptracehttp.WithEndpoint("localhost:8360"),
42+
// otlptracehttp.WithURLPath("/traces/otlp/v0.9"),
43+
otlptracehttp.WithEndpoint("localhost:4318"),
44+
otlptracehttp.WithInsecure(),
45+
)
4246
exp, err := otlptrace.New(ctx, oclient)
4347
if err != nil {
4448
panic(err)
4549
}
4650

4751
tp := trace.NewTracerProvider(
48-
trace.WithSyncer(stdoutexp),
52+
// trace.WithSyncer(stdoutexp),
4953
trace.WithBatcher(exp),
5054
trace.WithResource(r),
5155
)

workflow/activity.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ func executeActivity[TResult any](ctx Context, options ActivityOptions, attempt
5252
wfState.AddCommand(cmd)
5353
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(f))
5454

55-
span := workflowtracer.Tracer(ctx).Start(
56-
ctx,
57-
"ExecuteActivity", trace.WithAttributes(
55+
span := workflowtracer.Tracer(ctx).Start(ctx,
56+
fmt.Sprintf("ExecuteActivity: %s", name),
57+
trace.WithAttributes(
5858
attribute.String("name", name),
5959
attribute.Int64(tracing.ScheduleEventID, scheduleEventID),
6060
attribute.Int("attempt", attempt),

workflow/sleep.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111

1212
func Sleep(ctx sync.Context, d time.Duration) error {
1313
span := workflowtracer.Tracer(ctx).Start(ctx, "Sleep",
14-
trace.WithAttributes(attribute.Int64("duration_s", int64(d/time.Second))))
14+
trace.WithAttributes(attribute.Int64("duration_ms", int64(d/time.Millisecond))))
1515
defer span.End()
1616

1717
_, err := ScheduleTimer(ctx, d).Get(ctx)

workflow/subworkflow.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ func createSubWorkflowInstance[TResult any](ctx sync.Context, options SubWorkflo
5656
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(f))
5757

5858
span := workflowtracer.Tracer(ctx).Start(ctx,
59-
"CreateSubworkflowInstance", trace.WithAttributes(
59+
fmt.Sprintf("CreateSubworkflowInstance: %s", name),
60+
trace.WithAttributes(
6061
attribute.String("name", name),
6162
attribute.Int64(tracing.ScheduleEventID, scheduleEventID),
6263
attribute.Int("attempt", attempt),

workflow/timer.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,18 @@ func ScheduleTimer(ctx Context, delay time.Duration) Future[struct{}] {
2323
wfState := workflowstate.WorkflowState(ctx)
2424

2525
scheduleEventID := wfState.GetNextScheduleEventID()
26-
timerCmd := command.NewScheduleTimerCommand(scheduleEventID, Now(ctx).Add(delay))
26+
at := Now(ctx).Add(delay)
27+
timerCmd := command.NewScheduleTimerCommand(scheduleEventID, at)
2728
wfState.AddCommand(timerCmd)
2829

2930
wfState.TrackFuture(scheduleEventID, workflowstate.AsDecodingSettable(f))
3031

3132
span := workflowtracer.Tracer(ctx).Start(ctx, "ScheduleTimer",
32-
trace.WithAttributes(attribute.Int64("duration_s", int64(delay/time.Second))))
33+
trace.WithAttributes(
34+
attribute.Int64("duration_ms", int64(delay/time.Millisecond)),
35+
attribute.String("now", Now(ctx).String()),
36+
attribute.String("at", at.String()),
37+
))
3338
defer span.End()
3439

3540
// Check if the context is cancelable

0 commit comments

Comments
 (0)