@@ -38,6 +38,48 @@ import (
38
38
func TestTracing (t * testing.T ) {
39
39
testutil .SkipTestIfShortMode (t ,
40
40
"Wal creation tests are depending on embedded etcd server so are integration-level tests." )
41
+
42
+ // Test Unary RPC tracing
43
+ t .Run ("UnaryRPC" , func (t * testing.T ) {
44
+ testRPCTracing (t , "UnaryRPC" , containsUnaryRPCSpan , func (cli * clientv3.Client ) error {
45
+ // make a request with the instrumented client
46
+ resp , err := cli .Get (context .TODO (), "key" )
47
+ require .NoError (t , err )
48
+ require .Empty (t , resp .Kvs )
49
+ return nil
50
+ })
51
+ })
52
+
53
+ // Test Stream RPC tracing
54
+ t .Run ("StreamRPC" , func (t * testing.T ) {
55
+ testRPCTracing (t , "StreamRPC" , containsStreamRPCSpan , func (cli * clientv3.Client ) error {
56
+ // Create a context with a reasonable timeout
57
+ ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
58
+ defer cancel ()
59
+
60
+ // Create a watch channel
61
+ watchChan := cli .Watch (ctx , "watch-key" )
62
+
63
+ // Put a value to trigger the watch
64
+ _ , err := cli .Put (context .TODO (), "watch-key" , "watch-value" )
65
+ require .NoError (t , err )
66
+
67
+ // Wait for watch event
68
+ select {
69
+ case watchResp := <- watchChan :
70
+ require .NoError (t , watchResp .Err ())
71
+ require .Len (t , watchResp .Events , 1 )
72
+ t .Log ("Received watch event successfully" )
73
+ case <- time .After (5 * time .Second ):
74
+ t .Fatal ("Timed out waiting for watch event" )
75
+ }
76
+ return nil
77
+ })
78
+ })
79
+ }
80
+
81
+ // testRPCTracing is a common test function for both Unary and Stream RPC tracing
82
+ func testRPCTracing (t * testing.T , testName string , filterFunc func (* traceservice.ExportTraceServiceRequest ) bool , clientAction func (* clientv3.Client ) error ) {
41
83
// set up trace collector
42
84
listener , err := net .Listen ("tcp" , "localhost:" )
43
85
require .NoError (t , err )
@@ -48,7 +90,7 @@ func TestTracing(t *testing.T) {
48
90
srv := grpc .NewServer ()
49
91
traceservice .RegisterTraceServiceServer (srv , & traceServer {
50
92
traceFound : traceFound ,
51
- filterFunc : containsNodeListSpan ,
93
+ filterFunc : filterFunc ,
52
94
})
53
95
54
96
go srv .Serve (listener )
@@ -89,8 +131,7 @@ func TestTracing(t *testing.T) {
89
131
}
90
132
91
133
dialOptions := []grpc.DialOption {
92
- grpc .WithUnaryInterceptor (otelgrpc .UnaryClientInterceptor (tracingOpts ... )),
93
- grpc .WithStreamInterceptor (otelgrpc .StreamClientInterceptor (tracingOpts ... )),
134
+ grpc .WithStatsHandler (otelgrpc .NewClientHandler (tracingOpts ... )),
94
135
}
95
136
ccfg := clientv3.Config {DialOptions : dialOptions , Endpoints : []string {cfg .AdvertiseClientUrls [0 ].String ()}}
96
137
cli , err := integration .NewClient (t , ccfg )
@@ -100,21 +141,21 @@ func TestTracing(t *testing.T) {
100
141
}
101
142
defer cli .Close ()
102
143
103
- // make a request with the instrumented client
104
- resp , err := cli . Get ( context . TODO (), "key" )
144
+ // Execute the client action (either Unary or Stream RPC)
145
+ err = clientAction ( cli )
105
146
require .NoError (t , err )
106
- require .Empty (t , resp .Kvs )
107
147
108
148
// Wait for a span to be recorded from our request
109
149
select {
110
150
case <- traceFound :
151
+ t .Logf ("%s trace found" , testName )
111
152
return
112
153
case <- time .After (30 * time .Second ):
113
154
t .Fatal ("Timed out waiting for trace" )
114
155
}
115
156
}
116
157
117
- func containsNodeListSpan (req * traceservice.ExportTraceServiceRequest ) bool {
158
+ func containsUnaryRPCSpan (req * traceservice.ExportTraceServiceRequest ) bool {
118
159
for _ , resourceSpans := range req .GetResourceSpans () {
119
160
for _ , attr := range resourceSpans .GetResource ().GetAttributes () {
120
161
if attr .GetKey () != "service.name" && attr .GetValue ().GetStringValue () != "integration-test-tracing" {
@@ -132,6 +173,20 @@ func containsNodeListSpan(req *traceservice.ExportTraceServiceRequest) bool {
132
173
return false
133
174
}
134
175
176
+ // containsStreamRPCSpan checks for Watch/Watch spans in trace data
177
+ func containsStreamRPCSpan (req * traceservice.ExportTraceServiceRequest ) bool {
178
+ for _ , resourceSpans := range req .GetResourceSpans () {
179
+ for _ , scoped := range resourceSpans .GetScopeSpans () {
180
+ for _ , span := range scoped .GetSpans () {
181
+ if span .GetName () == "etcdserverpb.Watch/Watch" {
182
+ return true
183
+ }
184
+ }
185
+ }
186
+ }
187
+ return false
188
+ }
189
+
135
190
// traceServer implements TracesServiceServer
136
191
type traceServer struct {
137
192
traceFound chan struct {}
@@ -142,7 +197,11 @@ type traceServer struct {
142
197
func (t * traceServer ) Export (ctx context.Context , req * traceservice.ExportTraceServiceRequest ) (* traceservice.ExportTraceServiceResponse , error ) {
143
198
emptyValue := traceservice.ExportTraceServiceResponse {}
144
199
if t .filterFunc (req ) {
145
- t .traceFound <- struct {}{}
200
+ select {
201
+ case t .traceFound <- struct {}{}:
202
+ default :
203
+ // Channel already notified
204
+ }
146
205
}
147
206
return & emptyValue , nil
148
207
}
0 commit comments