Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 47 additions & 13 deletions contrib/opentelemetry/tracing_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ type TracerOptions struct {
// DisableBaggage can be set to disable baggage propagation.
DisableBaggage bool

// EnableSpanLinks can be set to enable span links for spans disconnected from their parent span.
EnableSpanLinks bool
Comment on lines +44 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be an additional option? It seems like span links (an otel-specific feature?) could be added automatically if the disconnect continue as new workflows option is set.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I preferred to make this the default, but decided not to for backwards compatibility. Adding links results in additional tracing data which may increase costs depending on your observability vendor

TIL DataDog's SDKs also support span links: https://docs.datadoghq.com/tracing/trace_collection/span_links/


// DisconnectContinueAsNew can be set to disconnect ContinueAsNew workflows from their parent span.
DisconnectContinueAsNewWorkflows bool

// AllowInvalidParentSpans will swallow errors interpreting parent
// spans from headers. Useful when migrating from one tracing library
// to another, while workflows/activities may be in progress.
Expand Down Expand Up @@ -116,12 +122,14 @@ func NewTracingInterceptor(options TracerOptions) (interceptor.Interceptor, erro

func (t *tracer) Options() interceptor.TracerOptions {
return interceptor.TracerOptions{
SpanContextKey: t.options.SpanContextKey,
HeaderKey: t.options.HeaderKey,
DisableSignalTracing: t.options.DisableSignalTracing,
DisableQueryTracing: t.options.DisableQueryTracing,
DisableUpdateTracing: t.options.DisableUpdateTracing,
AllowInvalidParentSpans: t.options.AllowInvalidParentSpans,
SpanContextKey: t.options.SpanContextKey,
HeaderKey: t.options.HeaderKey,
DisableSignalTracing: t.options.DisableSignalTracing,
DisableQueryTracing: t.options.DisableQueryTracing,
DisableUpdateTracing: t.options.DisableUpdateTracing,
EnableSpanLinks: t.options.EnableSpanLinks,
DisconnectContinueAsNewWorkflows: t.options.DisconnectContinueAsNewWorkflows,
AllowInvalidParentSpans: t.options.AllowInvalidParentSpans,
}
}

Expand Down Expand Up @@ -175,8 +183,11 @@ func (t *tracer) ContextWithSpan(ctx context.Context, span interceptor.TracerSpa

func (t *tracer) StartSpan(opts *interceptor.TracerStartSpanOptions) (interceptor.TracerSpan, error) {
// Create context with parent
var parent trace.SpanContext
var bag baggage.Baggage
var (
parent trace.SpanContext
bag baggage.Baggage
)

switch optParent := opts.Parent.(type) {
case nil:
case *tracerSpan:
Expand All @@ -188,26 +199,49 @@ func (t *tracer) StartSpan(opts *interceptor.TracerStartSpanOptions) (intercepto
default:
return nil, fmt.Errorf("unrecognized parent type %T", optParent)
}

ctx := context.Background()

if parent.IsValid() {
ctx = trace.ContextWithSpanContext(ctx, parent)
if !t.options.DisableBaggage {
ctx = baggage.ContextWithBaggage(ctx, bag)
}
}

// Create span
span := t.options.SpanStarter(ctx, t.options.Tracer, opts.Operation+":"+opts.Name, trace.WithTimestamp(opts.Time))
var spanOpts []trace.SpanStartOption

spanOpts = append(spanOpts, trace.WithTimestamp(opts.Time))

// Set tags
if len(opts.Tags) > 0 {
attrs := make([]attribute.KeyValue, 0, len(opts.Tags))
attrs := make([]attribute.KeyValue, len(opts.Tags))
for k, v := range opts.Tags {
attrs = append(attrs, attribute.String(k, v))
}
span.SetAttributes(attrs...)
spanOpts = append(spanOpts, trace.WithAttributes(attrs...))
}

// New span created and linked to parent
if t.options.EnableSpanLinks {
if opts.Link != nil {
var spanCtx trace.SpanContext
switch ref := opts.Link.(type) {
case *tracerSpan:
spanCtx = ref.SpanContext()
case *tracerSpanRef:
spanCtx = ref.SpanContext
}

if spanCtx.IsValid() {
link := trace.Link{SpanContext: spanCtx}
spanOpts = append(spanOpts, trace.WithLinks(link))
}
}
}

// Start the new span
span := t.options.SpanStarter(ctx, t.options.Tracer, opts.Operation+":"+opts.Name, spanOpts...)

tSpan := &tracerSpan{Span: span}
if !t.options.DisableBaggage {
tSpan.Baggage = bag
Expand Down
59 changes: 53 additions & 6 deletions interceptor/tracing_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ type TracerOptions struct {
// DisableUpdateTracing can be set to disable update tracing.
DisableUpdateTracing bool

// EnableSpanLinks can be set to enable span links for spans disconnected from their parent span.
EnableSpanLinks bool

// DisconnectContinueAsNew can be set to disconnect ContinueAsNew workflows from their parent span.
DisconnectContinueAsNewWorkflows bool

// AllowInvalidParentSpans will swallow errors interpreting parent
// spans from headers. Useful when migrating from one tracing library
// to another, while workflows/activities may be in progress.
Expand Down Expand Up @@ -147,6 +153,9 @@ type TracerStartSpanOptions struct {
// IdempotencyKey should be treated as opaque data by Tracer implementations.
// Do not attempt to parse it, as the format is subject to change.
IdempotencyKey string

// Link is a link to a parent span.
Link TracerSpanRef
}

// TracerSpanRef represents a span reference such as a parent.
Expand Down Expand Up @@ -756,16 +765,54 @@ func (t *tracingWorkflowOutboundInterceptor) NewContinueAsNewError(
args ...interface{},
) error {
err := t.Next.NewContinueAsNewError(ctx, wfn, args...)
if !workflow.IsReplaying(ctx) {
if contErr, _ := err.(*workflow.ContinueAsNewError); contErr != nil {
// Get the current span and write header
if span, _ := ctx.Value(t.root.options.SpanContextKey).(TracerSpan); span != nil {
if writeErr := t.root.writeSpanToHeader(span, WorkflowHeader(ctx)); writeErr != nil {
return fmt.Errorf("failed writing span when creating continue as new error: %w", writeErr)

if !workflow.IsReplaying(ctx) && workflow.IsContinueAsNewError(err) {
// This will either be the parent span or a new root span
span, _ := ctx.Value(t.root.options.SpanContextKey).(TracerSpan)

if t.root.options.DisconnectContinueAsNewWorkflows {
info := workflow.GetInfo(ctx)

// Start a new root span detached from the parent
opts := &TracerStartSpanOptions{
Operation: "ContinueAsNew",
Name: info.WorkflowType.Name,
DependedOn: false,
Tags: map[string]string{
workflowIDTagKey: info.WorkflowExecution.ID,
runIDTagKey: info.WorkflowExecution.RunID,
},
ToHeader: true,
Time: time.Now(),
Link: nil,
}

// Connect the new root span to the parent with a link
if t.root.options.EnableSpanLinks {
if parentSpan, ok := ctx.Value(t.root.options.SpanContextKey).(TracerSpan); ok {
opts.Link = parentSpan
}
}

s, err := t.root.tracer.StartSpan(opts)
if err != nil {
return fmt.Errorf("failed to start detached span for ContinueAsNew: %w", err)
}
span = s

var finishOpts TracerFinishSpanOptions
defer span.Finish(&finishOpts)
}

header := WorkflowHeader(ctx)

if span != nil {
if writeErr := t.root.writeSpanToHeader(span, header); writeErr != nil {
return fmt.Errorf("failed writing span when creating continue as new error: %w", writeErr)
}
}
}

return err
}

Expand Down