From 94ce2ede822c4ed4af0bd1319de66ac7401dbe92 Mon Sep 17 00:00:00 2001 From: Ali Ibrahim Date: Sun, 15 Jun 2025 17:25:52 +0400 Subject: [PATCH] Added the option to disconnect ContinueAsNew traces --- contrib/opentelemetry/tracing_interceptor.go | 60 +++++++++++++++----- interceptor/tracing_interceptor.go | 59 +++++++++++++++++-- 2 files changed, 100 insertions(+), 19 deletions(-) diff --git a/contrib/opentelemetry/tracing_interceptor.go b/contrib/opentelemetry/tracing_interceptor.go index 053b7565d..d8ecd6353 100644 --- a/contrib/opentelemetry/tracing_interceptor.go +++ b/contrib/opentelemetry/tracing_interceptor.go @@ -41,6 +41,12 @@ type TracerOptions struct { // DisableBaggage can be set to disable baggage propagation. DisableBaggage bool + // EnableSpanLinks can be set to enable span links for spans disconnected from their parent span. + EnableSpanLinks bool + + // DisconnectContinueAsNew can be set to disconnect ContinueAsNew workflows from their parent span. + DisconnectContinueAsNewWorkflows bool + // AllowInvalidParentSpans will swallow errors interpreting parent // spans from headers. Useful when migrating from one tracing library // to another, while workflows/activities may be in progress. @@ -116,12 +122,14 @@ func NewTracingInterceptor(options TracerOptions) (interceptor.Interceptor, erro func (t *tracer) Options() interceptor.TracerOptions { return interceptor.TracerOptions{ - SpanContextKey: t.options.SpanContextKey, - HeaderKey: t.options.HeaderKey, - DisableSignalTracing: t.options.DisableSignalTracing, - DisableQueryTracing: t.options.DisableQueryTracing, - DisableUpdateTracing: t.options.DisableUpdateTracing, - AllowInvalidParentSpans: t.options.AllowInvalidParentSpans, + SpanContextKey: t.options.SpanContextKey, + HeaderKey: t.options.HeaderKey, + DisableSignalTracing: t.options.DisableSignalTracing, + DisableQueryTracing: t.options.DisableQueryTracing, + DisableUpdateTracing: t.options.DisableUpdateTracing, + EnableSpanLinks: t.options.EnableSpanLinks, + DisconnectContinueAsNewWorkflows: t.options.DisconnectContinueAsNewWorkflows, + AllowInvalidParentSpans: t.options.AllowInvalidParentSpans, } } @@ -175,8 +183,11 @@ func (t *tracer) ContextWithSpan(ctx context.Context, span interceptor.TracerSpa func (t *tracer) StartSpan(opts *interceptor.TracerStartSpanOptions) (interceptor.TracerSpan, error) { // Create context with parent - var parent trace.SpanContext - var bag baggage.Baggage + var ( + parent trace.SpanContext + bag baggage.Baggage + ) + switch optParent := opts.Parent.(type) { case nil: case *tracerSpan: @@ -188,7 +199,9 @@ func (t *tracer) StartSpan(opts *interceptor.TracerStartSpanOptions) (intercepto default: return nil, fmt.Errorf("unrecognized parent type %T", optParent) } + ctx := context.Background() + if parent.IsValid() { ctx = trace.ContextWithSpanContext(ctx, parent) if !t.options.DisableBaggage { @@ -196,18 +209,39 @@ func (t *tracer) StartSpan(opts *interceptor.TracerStartSpanOptions) (intercepto } } - // Create span - span := t.options.SpanStarter(ctx, t.options.Tracer, opts.Operation+":"+opts.Name, trace.WithTimestamp(opts.Time)) + var spanOpts []trace.SpanStartOption + + spanOpts = append(spanOpts, trace.WithTimestamp(opts.Time)) - // Set tags if len(opts.Tags) > 0 { - attrs := make([]attribute.KeyValue, 0, len(opts.Tags)) + attrs := make([]attribute.KeyValue, len(opts.Tags)) for k, v := range opts.Tags { attrs = append(attrs, attribute.String(k, v)) } - span.SetAttributes(attrs...) + spanOpts = append(spanOpts, trace.WithAttributes(attrs...)) } + // New span created and linked to parent + if t.options.EnableSpanLinks { + if opts.Link != nil { + var spanCtx trace.SpanContext + switch ref := opts.Link.(type) { + case *tracerSpan: + spanCtx = ref.SpanContext() + case *tracerSpanRef: + spanCtx = ref.SpanContext + } + + if spanCtx.IsValid() { + link := trace.Link{SpanContext: spanCtx} + spanOpts = append(spanOpts, trace.WithLinks(link)) + } + } + } + + // Start the new span + span := t.options.SpanStarter(ctx, t.options.Tracer, opts.Operation+":"+opts.Name, spanOpts...) + tSpan := &tracerSpan{Span: span} if !t.options.DisableBaggage { tSpan.Baggage = bag diff --git a/interceptor/tracing_interceptor.go b/interceptor/tracing_interceptor.go index 91fa489c0..9c0d363f1 100644 --- a/interceptor/tracing_interceptor.go +++ b/interceptor/tracing_interceptor.go @@ -96,6 +96,12 @@ type TracerOptions struct { // DisableUpdateTracing can be set to disable update tracing. DisableUpdateTracing bool + // EnableSpanLinks can be set to enable span links for spans disconnected from their parent span. + EnableSpanLinks bool + + // DisconnectContinueAsNew can be set to disconnect ContinueAsNew workflows from their parent span. + DisconnectContinueAsNewWorkflows bool + // AllowInvalidParentSpans will swallow errors interpreting parent // spans from headers. Useful when migrating from one tracing library // to another, while workflows/activities may be in progress. @@ -147,6 +153,9 @@ type TracerStartSpanOptions struct { // IdempotencyKey should be treated as opaque data by Tracer implementations. // Do not attempt to parse it, as the format is subject to change. IdempotencyKey string + + // Link is a link to a parent span. + Link TracerSpanRef } // TracerSpanRef represents a span reference such as a parent. @@ -756,16 +765,54 @@ func (t *tracingWorkflowOutboundInterceptor) NewContinueAsNewError( args ...interface{}, ) error { err := t.Next.NewContinueAsNewError(ctx, wfn, args...) - if !workflow.IsReplaying(ctx) { - if contErr, _ := err.(*workflow.ContinueAsNewError); contErr != nil { - // Get the current span and write header - if span, _ := ctx.Value(t.root.options.SpanContextKey).(TracerSpan); span != nil { - if writeErr := t.root.writeSpanToHeader(span, WorkflowHeader(ctx)); writeErr != nil { - return fmt.Errorf("failed writing span when creating continue as new error: %w", writeErr) + + if !workflow.IsReplaying(ctx) && workflow.IsContinueAsNewError(err) { + // This will either be the parent span or a new root span + span, _ := ctx.Value(t.root.options.SpanContextKey).(TracerSpan) + + if t.root.options.DisconnectContinueAsNewWorkflows { + info := workflow.GetInfo(ctx) + + // Start a new root span detached from the parent + opts := &TracerStartSpanOptions{ + Operation: "ContinueAsNew", + Name: info.WorkflowType.Name, + DependedOn: false, + Tags: map[string]string{ + workflowIDTagKey: info.WorkflowExecution.ID, + runIDTagKey: info.WorkflowExecution.RunID, + }, + ToHeader: true, + Time: time.Now(), + Link: nil, + } + + // Connect the new root span to the parent with a link + if t.root.options.EnableSpanLinks { + if parentSpan, ok := ctx.Value(t.root.options.SpanContextKey).(TracerSpan); ok { + opts.Link = parentSpan } } + + s, err := t.root.tracer.StartSpan(opts) + if err != nil { + return fmt.Errorf("failed to start detached span for ContinueAsNew: %w", err) + } + span = s + + var finishOpts TracerFinishSpanOptions + defer span.Finish(&finishOpts) + } + + header := WorkflowHeader(ctx) + + if span != nil { + if writeErr := t.root.writeSpanToHeader(span, header); writeErr != nil { + return fmt.Errorf("failed writing span when creating continue as new error: %w", writeErr) + } } } + return err }