Skip to content

Commit 569e595

Browse files
committed
adding tracing utils
Signed-off-by: salaboy <[email protected]>
1 parent 8c1823c commit 569e595

File tree

2 files changed

+236
-3
lines changed

2 files changed

+236
-3
lines changed

tests/grpc/grpc_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/dapr/durabletask-go/backend/sqlite"
2222
"github.com/dapr/durabletask-go/client"
2323
"github.com/dapr/durabletask-go/task"
24+
"github.com/dapr/durabletask-go/tests/utils"
2425
"go.opentelemetry.io/otel"
2526
)
2627

@@ -420,9 +421,9 @@ func Test_Grpc_ReuseInstanceIDError(t *testing.T) {
420421
defer cancelListener()
421422
instanceID := api.InstanceID("THROW_IF_RUNNING_OR_COMPLETED")
422423

423-
_, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID))
424+
id, err := grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID))
424425
require.NoError(t, err)
425-
_, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id))
426+
id, err = grpcClient.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id))
426427
if assert.Error(t, err) {
427428
assert.Contains(t, err.Error(), "orchestration instance already exists")
428429
}
@@ -524,7 +525,7 @@ func Test_SingleActivity_TaskSpan(t *testing.T) {
524525
metadata, err := grpcClient.WaitForOrchestrationCompletion(ctx, id)
525526
if assert.NoError(t, err) {
526527
assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus)
527-
assert.Equal(t, `"Hello, 世界!"`, metadata.SerializedOutput)
528+
assert.Equal(t, `"Hello, 世界!"`, metadata.Output.Value)
528529
}
529530
}
530531

tests/utils/tracing.go

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
package utils
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
"go.opentelemetry.io/otel"
10+
"go.opentelemetry.io/otel/attribute"
11+
"go.opentelemetry.io/otel/sdk/trace"
12+
"go.opentelemetry.io/otel/sdk/trace/tracetest"
13+
14+
"github.com/dapr/durabletask-go/api"
15+
)
16+
17+
type (
18+
spanValidator func(t assert.TestingT, spans []trace.ReadOnlySpan, index int)
19+
spanAttributeValidator func(t assert.TestingT, span trace.ReadOnlySpan) bool
20+
spanEventValidator func(t assert.TestingT, span trace.ReadOnlySpan, eventIndex int) bool
21+
)
22+
23+
var (
24+
initTracingOnce sync.Once
25+
sharedTraceExporter = tracetest.NewInMemoryExporter()
26+
)
27+
28+
func AssertSpanSequence(t assert.TestingT, spans []trace.ReadOnlySpan, spanAsserts ...spanValidator) {
29+
for i, f := range spanAsserts {
30+
f(t, spans, i)
31+
}
32+
}
33+
34+
// assertOrchestratorCreated validates a create_orchestration span
35+
func AssertOrchestratorCreated(name string, id api.InstanceID, optionalAsserts ...spanAttributeValidator) spanValidator {
36+
spanName := fmt.Sprintf("create_orchestration||%s", name)
37+
opts := []spanAttributeValidator{
38+
assertTaskType("orchestration"),
39+
assertTaskName(name),
40+
assertInstanceID(id),
41+
}
42+
opts = append(opts, optionalAsserts...)
43+
return AssertSpan(spanName, opts...)
44+
}
45+
46+
// assertOrchestratorCreated validates an orchestration span
47+
func AssertOrchestratorExecuted(name string, id api.InstanceID, status string, optionalAsserts ...spanAttributeValidator) spanValidator {
48+
spanName := fmt.Sprintf("orchestration||%s", name)
49+
opts := []spanAttributeValidator{
50+
assertTaskType("orchestration"),
51+
assertTaskName(name),
52+
assertInstanceID(id),
53+
assertStatus(status),
54+
}
55+
opts = append(opts, optionalAsserts...)
56+
return AssertSpan(spanName, opts...)
57+
}
58+
59+
func AssertActivity(name string, id api.InstanceID, taskID int64, optionalAsserts ...spanAttributeValidator) spanValidator {
60+
spanName := fmt.Sprintf("activity||%s", name)
61+
opts := []spanAttributeValidator{
62+
assertTaskType("activity"),
63+
assertTaskName(name),
64+
assertInstanceID(id),
65+
assertTaskID(taskID),
66+
}
67+
opts = append(opts, optionalAsserts...)
68+
return AssertSpan(spanName, opts...)
69+
}
70+
71+
func AssertTimer(id api.InstanceID) spanValidator {
72+
return AssertSpan("timer", assertInstanceID(id), assertTimerFired())
73+
}
74+
75+
func AssertSpanEvents(eventAsserts ...spanEventValidator) spanAttributeValidator {
76+
return func(t assert.TestingT, span trace.ReadOnlySpan) bool {
77+
if assert.Equal(t, len(eventAsserts), len(span.Events()), "unexpected number of span events") {
78+
for i, f := range eventAsserts {
79+
if !f(t, span, i) {
80+
return false
81+
}
82+
}
83+
}
84+
return true
85+
}
86+
}
87+
88+
func AssertExternalEvent(eventName string, payloadSize int) spanEventValidator {
89+
return func(t assert.TestingT, span trace.ReadOnlySpan, eventIndex int) bool {
90+
event := span.Events()[eventIndex]
91+
hasMessage := assert.Equal(t, "Received external event", event.Name)
92+
hasNameAttribute := assert.Contains(t, event.Attributes, attribute.KeyValue{
93+
Key: "name",
94+
Value: attribute.StringValue(eventName),
95+
})
96+
hasSizeAttribute := assert.Contains(t, event.Attributes, attribute.KeyValue{
97+
Key: "size",
98+
Value: attribute.IntValue(payloadSize),
99+
})
100+
return hasMessage && hasNameAttribute && hasSizeAttribute
101+
}
102+
}
103+
104+
func AssertSuspendedEvent() spanEventValidator {
105+
return func(t assert.TestingT, span trace.ReadOnlySpan, eventIndex int) bool {
106+
event := span.Events()[eventIndex]
107+
return assert.Equal(t, "Execution suspended", event.Name)
108+
}
109+
}
110+
111+
func AssertResumedEvent() spanEventValidator {
112+
return func(t assert.TestingT, span trace.ReadOnlySpan, eventIndex int) bool {
113+
event := span.Events()[eventIndex]
114+
return assert.Equal(t, "Execution resumed", event.Name)
115+
}
116+
}
117+
118+
func AssertSpan(name string, optionalAsserts ...spanAttributeValidator) spanValidator {
119+
return func(t assert.TestingT, spans []trace.ReadOnlySpan, index int) {
120+
if !doAssertSpan(t, spans, index, name, optionalAsserts...) {
121+
fmt.Printf("span assertion for %s (index=%d) failed\n", name, index)
122+
}
123+
}
124+
}
125+
126+
func doAssertSpan(t assert.TestingT, spans []trace.ReadOnlySpan, index int, name string, optionalAsserts ...spanAttributeValidator) bool {
127+
// array bounds check
128+
if !assert.Lessf(t, index, len(spans), "%d spans were exported, but more were expected by the test", len(spans)) {
129+
return false
130+
}
131+
132+
span := spans[index]
133+
134+
// All spans have a name that we must validate
135+
success := assert.Equal(t, name, span.Name())
136+
137+
// Optional validations that are span-specific
138+
for _, optionalAssert := range optionalAsserts {
139+
if !optionalAssert(t, span) {
140+
success = false
141+
}
142+
}
143+
144+
return success
145+
}
146+
147+
func assertTaskType(expectedTaskType string) spanAttributeValidator {
148+
return func(t assert.TestingT, span trace.ReadOnlySpan) bool {
149+
return assert.Contains(t, span.Attributes(), attribute.KeyValue{
150+
Key: "durabletask.type",
151+
Value: attribute.StringValue(expectedTaskType),
152+
})
153+
}
154+
}
155+
156+
func assertTaskName(expectedTaskName string) spanAttributeValidator {
157+
return func(t assert.TestingT, span trace.ReadOnlySpan) bool {
158+
return assert.Contains(t, span.Attributes(), attribute.KeyValue{
159+
Key: "durabletask.task.name",
160+
Value: attribute.StringValue(expectedTaskName),
161+
})
162+
}
163+
}
164+
165+
func assertTaskID(expectedTaskID int64) spanAttributeValidator {
166+
return func(t assert.TestingT, span trace.ReadOnlySpan) bool {
167+
return assert.Contains(t, span.Attributes(), attribute.KeyValue{
168+
Key: "durabletask.task.task_id",
169+
Value: attribute.Int64Value(expectedTaskID),
170+
})
171+
}
172+
}
173+
174+
func assertInstanceID(expectedID api.InstanceID) spanAttributeValidator {
175+
return func(t assert.TestingT, span trace.ReadOnlySpan) bool {
176+
return assert.Contains(t, span.Attributes(), attribute.KeyValue{
177+
Key: "durabletask.task.instance_id",
178+
Value: attribute.StringValue(string(expectedID)),
179+
})
180+
}
181+
}
182+
183+
func assertStatus(expectedStatus string) spanAttributeValidator {
184+
return func(t assert.TestingT, span trace.ReadOnlySpan) bool {
185+
return assert.Contains(t, span.Attributes(), attribute.KeyValue{
186+
Key: "durabletask.runtime_status",
187+
Value: attribute.StringValue(expectedStatus),
188+
})
189+
}
190+
}
191+
192+
func assertTimerFired() spanAttributeValidator {
193+
return func(t assert.TestingT, span trace.ReadOnlySpan) bool {
194+
var firedAtStr string
195+
for _, a := range span.Attributes() {
196+
if a.Key == "durabletask.fire_at" {
197+
firedAtStr = a.Value.AsString()
198+
break
199+
}
200+
}
201+
202+
if assert.NotEmptyf(t, firedAtStr, "couldn't find the durabletask.fire_at attribute") {
203+
// Ensure we can parse the value and that the value fits into a general range.
204+
// Note that we're not attempting to validate a specific time.
205+
firedAt, err := time.Parse(time.RFC3339, firedAtStr)
206+
now := time.Now().UTC()
207+
return assert.NoError(t, err) &&
208+
assert.Less(t, firedAt, now) &&
209+
assert.Greater(t, firedAt, now.Add(-1*time.Hour))
210+
}
211+
212+
return false
213+
}
214+
}
215+
216+
// initTracing configures in-memory OTel tracing and returns an exporter which can be used
217+
// to examine the exported traces. We only want to look at exported traces because we do
218+
// tricks to mark certain spans as non-exported (i.e. orchestration replays), and want
219+
// to ensure that those spans are never actually exported.
220+
func InitTracing() *tracetest.InMemoryExporter {
221+
// The global tracer provider can only be initialized once.
222+
// Subsequent initializations will silently fail.
223+
initTracingOnce.Do(func() {
224+
processor := trace.NewSimpleSpanProcessor(sharedTraceExporter)
225+
provider := trace.NewTracerProvider(trace.WithSpanProcessor(processor))
226+
otel.SetTracerProvider(provider)
227+
})
228+
229+
// Reset the shared exporter so that new tests don't see traces from previous tests.
230+
sharedTraceExporter.Reset()
231+
return sharedTraceExporter
232+
}

0 commit comments

Comments
 (0)