@@ -40,14 +40,46 @@ func TestTracing(t *testing.T) {
40
40
"Wal creation tests are depending on embedded etcd server so are integration-level tests." )
41
41
42
42
// Test Unary RPC tracing
43
- t .Run ("UnaryRPC" , testUnaryRPCTracing )
43
+ t .Run ("UnaryRPC" , func (t * testing.T ) {
44
+ testRPCTracing (t , "UnaryRPC" , containsUnaryRPCSpan , func (cli * clientv3.Client ) error {
45
+ // make a request with the instrumented client
46
+ resp , err := cli .Get (context .TODO (), "key" )
47
+ require .NoError (t , err )
48
+ require .Empty (t , resp .Kvs )
49
+ return nil
50
+ })
51
+ })
44
52
45
53
// Test Stream RPC tracing
46
- t .Run ("StreamRPC" , testStreamRPCTracing )
54
+ t .Run ("StreamRPC" , func (t * testing.T ) {
55
+ testRPCTracing (t , "StreamRPC" , containsStreamRPCSpan , func (cli * clientv3.Client ) error {
56
+ // Create a context with a reasonable timeout
57
+ ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
58
+ defer cancel ()
59
+
60
+ // Create a watch channel
61
+ watchChan := cli .Watch (ctx , "watch-key" )
62
+
63
+ // Put a value to trigger the watch
64
+ _ , err := cli .Put (context .TODO (), "watch-key" , "watch-value" )
65
+ require .NoError (t , err )
66
+
67
+ // Wait for watch event
68
+ select {
69
+ case watchResp := <- watchChan :
70
+ require .NoError (t , watchResp .Err ())
71
+ require .Len (t , watchResp .Events , 1 )
72
+ t .Log ("Received watch event successfully" )
73
+ case <- time .After (5 * time .Second ):
74
+ t .Fatal ("Timed out waiting for watch event" )
75
+ }
76
+ return nil
77
+ })
78
+ })
47
79
}
48
80
49
- // testUnaryRPCTracing tests that Unary RPC calls are properly traced
50
- func testUnaryRPCTracing (t * testing.T ) {
81
+ // testRPCTracing is a common test function for both Unary and Stream RPC tracing
82
+ func testRPCTracing (t * testing.T , testName string , filterFunc func ( * traceservice. ExportTraceServiceRequest ) bool , clientAction func ( * clientv3. Client ) error ) {
51
83
// set up trace collector
52
84
listener , err := net .Listen ("tcp" , "localhost:" )
53
85
require .NoError (t , err )
@@ -58,7 +90,7 @@ func testUnaryRPCTracing(t *testing.T) {
58
90
srv := grpc .NewServer ()
59
91
traceservice .RegisterTraceServiceServer (srv , & traceServer {
60
92
traceFound : traceFound ,
61
- filterFunc : containsUnaryRPCSpan ,
93
+ filterFunc : filterFunc ,
62
94
})
63
95
64
96
go srv .Serve (listener )
@@ -109,14 +141,14 @@ func testUnaryRPCTracing(t *testing.T) {
109
141
}
110
142
defer cli .Close ()
111
143
112
- // make a request with the instrumented client
113
- resp , err := cli . Get ( context . TODO (), "key" )
144
+ // Execute the client action (either Unary or Stream RPC)
145
+ err = clientAction ( cli )
114
146
require .NoError (t , err )
115
- require .Empty (t , resp .Kvs )
116
147
117
148
// Wait for a span to be recorded from our request
118
149
select {
119
150
case <- traceFound :
151
+ t .Logf ("%s trace found" , testName )
120
152
return
121
153
case <- time .After (30 * time .Second ):
122
154
t .Fatal ("Timed out waiting for trace" )
@@ -141,98 +173,6 @@ func containsUnaryRPCSpan(req *traceservice.ExportTraceServiceRequest) bool {
141
173
return false
142
174
}
143
175
144
- // testStreamRPCTracing tests that Stream RPC calls are properly traced
145
- func testStreamRPCTracing (t * testing.T ) {
146
- // set up trace collector
147
- listener , err := net .Listen ("tcp" , "localhost:" )
148
- require .NoError (t , err )
149
-
150
- traceFound := make (chan struct {})
151
- defer close (traceFound )
152
-
153
- srv := grpc .NewServer ()
154
- traceservice .RegisterTraceServiceServer (srv , & traceServer {
155
- traceFound : traceFound ,
156
- filterFunc : containsStreamRPCSpan ,
157
- })
158
-
159
- go srv .Serve (listener )
160
- defer srv .Stop ()
161
-
162
- cfg := integration .NewEmbedConfig (t , "default" )
163
-
164
- cfg .EnableDistributedTracing = true
165
- cfg .DistributedTracingAddress = listener .Addr ().String ()
166
- cfg .DistributedTracingServiceName = "integration-test-tracing"
167
- cfg .DistributedTracingSamplingRatePerMillion = 100
168
-
169
- // start an etcd instance with tracing enabled
170
- etcdSrv , err := embed .StartEtcd (cfg )
171
- require .NoError (t , err )
172
- defer etcdSrv .Close ()
173
-
174
- select {
175
- case <- etcdSrv .Server .ReadyNotify ():
176
- case <- time .After (5 * time .Second ):
177
- t .Fatalf ("failed to start embed.Etcd for test" )
178
- }
179
-
180
- // create a client that has tracing enabled
181
- tracer := sdktrace .NewTracerProvider (sdktrace .WithSampler (sdktrace .AlwaysSample ()))
182
- defer tracer .Shutdown (context .TODO ())
183
- tp := trace .TracerProvider (tracer )
184
-
185
- tracingOpts := []otelgrpc.Option {
186
- otelgrpc .WithTracerProvider (tp ),
187
- otelgrpc .WithPropagators (
188
- propagation .NewCompositeTextMapPropagator (
189
- propagation.TraceContext {},
190
- propagation.Baggage {},
191
- )),
192
- }
193
-
194
- dialOptions := []grpc.DialOption {
195
- grpc .WithStatsHandler (otelgrpc .NewClientHandler (tracingOpts ... )),
196
- }
197
- ccfg := clientv3.Config {DialOptions : dialOptions , Endpoints : []string {cfg .AdvertiseClientUrls [0 ].String ()}}
198
- cli , err := integration .NewClient (t , ccfg )
199
- if err != nil {
200
- etcdSrv .Close ()
201
- t .Fatal (err )
202
- }
203
- defer cli .Close ()
204
-
205
- // Create a context with a reasonable timeout
206
- ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
207
- defer cancel ()
208
-
209
- // Create a watch channel
210
- watchChan := cli .Watch (ctx , "watch-key" )
211
-
212
- // Put a value to trigger the watch
213
- _ , err = cli .Put (context .TODO (), "watch-key" , "watch-value" )
214
- require .NoError (t , err )
215
-
216
- // Wait for watch event
217
- select {
218
- case watchResp := <- watchChan :
219
- require .NoError (t , watchResp .Err ())
220
- require .Len (t , watchResp .Events , 1 )
221
- t .Log ("Received watch event successfully" )
222
- case <- time .After (5 * time .Second ):
223
- t .Fatal ("Timed out waiting for watch event" )
224
- }
225
-
226
- // Wait for a span to be recorded from our streaming request
227
- select {
228
- case <- traceFound :
229
- t .Log ("Stream RPC trace found" )
230
- return
231
- case <- time .After (30 * time .Second ):
232
- t .Fatal ("Timed out waiting for stream RPC trace" )
233
- }
234
- }
235
-
236
176
// containsStreamRPCSpan checks for Watch/Watch spans in trace data
237
177
func containsStreamRPCSpan (req * traceservice.ExportTraceServiceRequest ) bool {
238
178
for _ , resourceSpans := range req .GetResourceSpans () {
0 commit comments