Skip to content

Commit 04f65d7

Browse files
authored
Merge pull request #39 from salaboy/main
Adding support for getting the TraceParent context
2 parents 7a35af6 + 3e31816 commit 04f65d7

File tree

7 files changed

+278
-152
lines changed

7 files changed

+278
-152
lines changed

backend/activity.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ func (p *activityProcessor) ProcessWorkItem(ctx context.Context, awi *ActivityWo
4949
if ts == nil {
5050
return fmt.Errorf("%v: invalid TaskScheduled event", awi.InstanceID)
5151
}
52-
5352
// Create span as child of spanContext found in TaskScheduledEvent
5453
ctx, err := helpers.ContextFromTraceContext(ctx, ts.ParentTraceContext)
5554
if err != nil {
@@ -66,6 +65,9 @@ func (p *activityProcessor) ProcessWorkItem(ctx context.Context, awi *ActivityWo
6665
}()
6766
}
6867

68+
// set the parent trace context to be the newly created activity span
69+
ts.ParentTraceContext = helpers.TraceContextFromSpan(span)
70+
6971
// Execute the activity and get its result
7072
result, err := p.executor.ExecuteActivity(ctx, awi.InstanceID, awi.NewEvent)
7173
if err != nil {
@@ -75,7 +77,6 @@ func (p *activityProcessor) ProcessWorkItem(ctx context.Context, awi *ActivityWo
7577
}
7678
return err
7779
}
78-
7980
awi.Result = result
8081
return nil
8182
}

backend/executor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta
177177
OrchestrationInstance: &protos.OrchestrationInstance{InstanceId: string(iid)},
178178
TaskId: e.EventId,
179179
TaskExecutionId: task.TaskExecutionId,
180+
ParentTraceContext: task.ParentTraceContext,
180181
}
181182
workItem := &protos.WorkItem{
182183
Request: &protos.WorkItem_ActivityRequest{

client/worker_grpc.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"google.golang.org/protobuf/types/known/wrapperspb"
1616

1717
"github.com/dapr/durabletask-go/api"
18+
"github.com/dapr/durabletask-go/api/helpers"
1819
"github.com/dapr/durabletask-go/api/protos"
1920
"github.com/dapr/durabletask-go/backend"
2021
"github.com/dapr/durabletask-go/task"
@@ -174,7 +175,12 @@ func (c *TaskHubGrpcClient) processActivityWorkItem(
174175
executor backend.Executor,
175176
req *protos.ActivityRequest,
176177
) {
177-
var tc *protos.TraceContext = nil // TODO: How to populate trace context?
178+
var ptc *protos.TraceContext = req.ParentTraceContext
179+
ctx, err := helpers.ContextFromTraceContext(ctx, ptc)
180+
if err != nil {
181+
c.logger.Warn("%v: failed to parse trace context: %v", req.Name, err)
182+
}
183+
178184
event := &protos.HistoryEvent{
179185
EventId: req.TaskId,
180186
Timestamp: timestamppb.New(time.Now()),
@@ -184,7 +190,7 @@ func (c *TaskHubGrpcClient) processActivityWorkItem(
184190
Version: req.Version,
185191
Input: req.Input,
186192
TaskExecutionId: req.TaskExecutionId,
187-
ParentTraceContext: tc,
193+
ParentTraceContext: ptc,
188194
},
189195
},
190196
}

samples/distributedtracing/distributedtracing.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"github.com/dapr/durabletask-go/task"
1919
)
2020

21+
var tracer = otel.Tracer("distributedtracing-example")
22+
2123
func main() {
2224
// Tracing can be configured independently of the orchestration code.
2325
tp, err := ConfigureZipkinTracing()
@@ -136,6 +138,16 @@ func DoWorkActivity(ctx task.ActivityContext) (any, error) {
136138
return "", err
137139
}
138140

141+
_, childSpan := tracer.Start(ctx.Context(), "activity-subwork")
142+
// Simulate doing some sub work
143+
select {
144+
case <-time.After(2 * time.Second):
145+
// Ok
146+
case <-ctx.Context().Done():
147+
return nil, ctx.Context().Err()
148+
}
149+
childSpan.End()
150+
139151
// Simulate doing work
140152
select {
141153
case <-time.After(duration):

tests/grpc/grpc_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@ import (
2121
"github.com/dapr/durabletask-go/backend/sqlite"
2222
"github.com/dapr/durabletask-go/client"
2323
"github.com/dapr/durabletask-go/task"
24+
"github.com/dapr/durabletask-go/tests/utils"
25+
"go.opentelemetry.io/otel"
2426
)
2527

2628
var (
2729
grpcClient *client.TaskHubGrpcClient
2830
ctx = context.Background()
31+
tracer = otel.Tracer("grpc-test")
2932
)
3033

3134
// TestMain is the entry point for the test suite. We use this to set up a gRPC server and client instance
@@ -489,3 +492,51 @@ func Test_Grpc_SubOrchestratorRetries(t *testing.T) {
489492
// With 3 max attempts there will be two retries with 10 millis delay before each
490493
require.GreaterOrEqual(t, metadata.LastUpdatedAt.AsTime(), metadata.CreatedAt.AsTime().Add(2*10*time.Millisecond))
491494
}
495+
496+
func Test_SingleActivity_TaskSpan(t *testing.T) {
497+
// Registration
498+
r := task.NewTaskRegistry()
499+
r.AddOrchestratorN("SingleActivity_TestSpan", func(ctx *task.OrchestrationContext) (any, error) {
500+
var input string
501+
if err := ctx.GetInput(&input); err != nil {
502+
return nil, err
503+
}
504+
var output string
505+
err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output)
506+
return output, err
507+
})
508+
r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) {
509+
var name string
510+
if err := ctx.GetInput(&name); err != nil {
511+
return nil, err
512+
}
513+
_, childSpan := tracer.Start(ctx.Context(), "activityChild_TestSpan")
514+
childSpan.End()
515+
return fmt.Sprintf("Hello, %s!", name), nil
516+
})
517+
518+
exporter := utils.InitTracing()
519+
cancelListener := startGrpcListener(t, r)
520+
defer cancelListener()
521+
522+
// Run the orchestration
523+
id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity_TestSpan", api.WithInput("世界"), api.WithStartTime(time.Now()))
524+
if assert.NoError(t, err) {
525+
metadata, err := grpcClient.WaitForOrchestrationCompletion(ctx, id)
526+
if assert.NoError(t, err) {
527+
assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus)
528+
assert.Equal(t, `"Hello, 世界!"`, metadata.Output.Value)
529+
}
530+
}
531+
532+
// Validate the exported OTel traces
533+
spans := exporter.GetSpans().Snapshots()
534+
utils.AssertSpanSequence(t, spans,
535+
utils.AssertOrchestratorCreated("SingleActivity_TestSpan", id),
536+
utils.AssertSpan("activityChild_TestSpan"),
537+
utils.AssertActivity("SayHello", id, 0),
538+
utils.AssertOrchestratorExecuted("SingleActivity_TestSpan", id, "COMPLETED"),
539+
)
540+
// assert child-parent relationship
541+
assert.Equal(t, spans[1].Parent().SpanID(), spans[2].SpanContext().SpanID())
542+
}

0 commit comments

Comments
 (0)