Skip to content

Commit c573696

Browse files
committed
Logical tracing
1 parent 785f990 commit c573696

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+805
-360
lines changed

backend/history/history.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ const (
5555

5656
// Recorded result of a side-efect
5757
EventType_SideEffectResult
58+
59+
// Distributed tracing span has been started
60+
EventType_TraceStarted
5861
)
5962

6063
func (et EventType) String() string {
@@ -102,6 +105,9 @@ func (et EventType) String() string {
102105
case EventType_SideEffectResult:
103106
return "SideEffectResult"
104107

108+
case EventType_TraceStarted:
109+
return "TraceStarted"
110+
105111
default:
106112
return "Unknown"
107113
}

backend/history/serialization.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ func DeserializeAttributes(eventType EventType, attributes []byte) (attr interfa
5959
case EventType_SideEffectResult:
6060
attr = &SideEffectResultAttributes{}
6161

62+
case EventType_TraceStarted:
63+
attr = &TraceStartedAttributes{}
64+
6265
case EventType_TimerScheduled:
6366
attr = &TimerScheduledAttributes{}
6467
case EventType_TimerFired:

backend/history/span_started.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package history
2+
3+
import "github.com/cschleiden/go-workflows/backend/payload"
4+
5+
type TraceStartedAttributes struct {
6+
SpanID payload.Payload `json:"spanID"`
7+
}

backend/history/subworkflow_scheduled.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,6 @@ type SubWorkflowScheduledAttributes struct {
1616
Inputs []payload.Payload `json:"inputs,omitempty"`
1717

1818
Metadata *metadata.WorkflowMetadata `json:"metadata,omitempty"`
19+
20+
WorkflowSpanID [8]byte `json:"workflow_span_id,omitempty"`
1921
}

backend/history/workflow_started.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,6 @@ type ExecutionStartedAttributes struct {
1414
Metadata *metadata.WorkflowMetadata `json:"metadata,omitempty"`
1515

1616
Inputs []payload.Payload `json:"inputs,omitempty"`
17+
18+
WorkflowSpanID [8]byte `json:"workflowSpanID,omitempty"`
1719
}

backend/options.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ import (
77
"github.com/cschleiden/go-workflows/backend/converter"
88
"github.com/cschleiden/go-workflows/backend/metrics"
99
mi "github.com/cschleiden/go-workflows/internal/metrics"
10-
"github.com/cschleiden/go-workflows/internal/tracing"
10+
"github.com/cschleiden/go-workflows/internal/propagators"
1111
"github.com/cschleiden/go-workflows/workflow"
1212
"go.opentelemetry.io/otel/trace"
13+
"go.opentelemetry.io/otel/trace/noop"
1314
)
1415

1516
type Options struct {
@@ -51,10 +52,10 @@ var DefaultOptions Options = Options{
5152

5253
Logger: slog.Default(),
5354
Metrics: mi.NewNoopMetricsClient(),
54-
TracerProvider: trace.NewNoopTracerProvider(),
55+
TracerProvider: noop.NewTracerProvider(),
5556
Converter: converter.DefaultConverter,
5657

57-
ContextPropagators: []workflow.ContextPropagator{&tracing.TracingContextPropagator{}},
58+
ContextPropagators: []workflow.ContextPropagator{&propagators.TracingContextPropagator{}},
5859

5960
RemoveContinuedAsNewInstances: false,
6061
}

backend/redis/instance.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,10 @@ func (rb *redisBackend) CancelWorkflowInstance(ctx context.Context, instance *co
133133
}
134134

135135
// Cancel instance
136-
if cmds, err := rb.rdb.Pipelined(ctx, func(p redis.Pipeliner) error {
136+
if _, err := rb.rdb.Pipelined(ctx, func(p redis.Pipeliner) error {
137137
return rb.addWorkflowInstanceEventP(ctx, p, workflow.Queue(instanceState.Queue), instance, event)
138138
}); err != nil {
139-
fmt.Println(cmds)
139+
// fmt.Println(cmds)
140140
return fmt.Errorf("adding cancellation event to workflow instance: %w", err)
141141
}
142142

backend/redis/signal.go

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,8 @@ import (
66

77
"github.com/cschleiden/go-workflows/backend"
88
"github.com/cschleiden/go-workflows/backend/history"
9-
"github.com/cschleiden/go-workflows/internal/log"
10-
"github.com/cschleiden/go-workflows/internal/propagators"
119
"github.com/cschleiden/go-workflows/workflow"
1210
"github.com/redis/go-redis/v9"
13-
"go.opentelemetry.io/otel/attribute"
14-
"go.opentelemetry.io/otel/trace"
1511
)
1612

1713
func (rb *redisBackend) SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error {
@@ -30,19 +26,6 @@ func (rb *redisBackend) SignalWorkflow(ctx context.Context, instanceID string, e
3026
return err
3127
}
3228

33-
// TODO: Can we do this in the client?
34-
ctx, err = (&propagators.TracingContextPropagator{}).Extract(ctx, instanceState.Metadata)
35-
if err != nil {
36-
rb.Options().Logger.Error("extracting tracing context", log.ErrorKey, err)
37-
}
38-
39-
a := event.Attributes.(*history.SignalReceivedAttributes)
40-
ctx, span := rb.Tracer().Start(ctx, fmt.Sprintf("SignalWorkflow: %s", a.Name), trace.WithAttributes(
41-
attribute.String(log.InstanceIDKey, instanceID),
42-
attribute.String(log.SignalNameKey, event.Attributes.(*history.SignalReceivedAttributes).Name),
43-
))
44-
defer span.End()
45-
4629
if _, err = rb.rdb.TxPipelined(ctx, func(p redis.Pipeliner) error {
4730
if err := rb.addWorkflowInstanceEventP(ctx, p, workflow.Queue(instanceState.Queue), instanceState.Instance, event); err != nil {
4831
return fmt.Errorf("adding event to stream: %w", err)

backend/redis/workflow.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ import (
1515
"github.com/cschleiden/go-workflows/internal/workflowerrors"
1616
"github.com/cschleiden/go-workflows/workflow"
1717
"github.com/redis/go-redis/v9"
18-
"go.opentelemetry.io/otel/attribute"
19-
"go.opentelemetry.io/otel/trace"
2018
)
2119

2220
func (rb *redisBackend) PrepareWorkflowQueues(ctx context.Context, queues []workflow.Queue) error {
@@ -306,12 +304,6 @@ func (rb *redisBackend) CompleteWorkflowTask(
306304
rb.options.Logger.Error("extracting tracing context", log.ErrorKey, err)
307305
}
308306

309-
_, span := rb.Tracer().Start(ctx, "WorkflowComplete",
310-
trace.WithAttributes(
311-
attribute.String(log.NamespaceKey+log.InstanceIDKey, task.WorkflowInstance.InstanceID),
312-
))
313-
span.End()
314-
315307
// Auto expiration
316308
expiration := rb.options.AutoExpiration
317309
if state == core.WorkflowInstanceStateContinuedAsNew && rb.options.AutoExpirationContinueAsNew > 0 {

backend/test/e2e.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,7 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti
622622
tests = append(tests, e2eQueueTests...)
623623
tests = append(tests, e2eRemovalTests...)
624624
tests = append(tests, e2eContinueAsNewTests...)
625+
tests = append(tests, e2eTracingTests...)
625626

626627
run := func(suffix string, workerOptions worker.Options) {
627628
for _, tt := range tests {

0 commit comments

Comments
 (0)