@@ -44,6 +44,8 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
44
44
@usableFromInline
45
45
let channelHandler : ValkeyChannelHandler
46
46
let configuration : ValkeyConnectionConfiguration
47
+ @usableFromInline
48
+ let address : ( hostOrSocketPath: String , port: Int ? ) ?
47
49
let isClosed : Atomic < Bool >
48
50
49
51
/// Initialize connection
@@ -52,6 +54,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
52
54
connectionID: ID ,
53
55
channelHandler: ValkeyChannelHandler ,
54
56
configuration: ValkeyConnectionConfiguration ,
57
+ address: ValkeyServerAddress ? ,
55
58
logger: Logger
56
59
) {
57
60
self . unownedExecutor = channel. eventLoop. executor. asUnownedSerialExecutor ( )
@@ -60,6 +63,14 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
60
63
self . configuration = configuration
61
64
self . id = connectionID
62
65
self . logger = logger
66
+ switch address? . value {
67
+ case let . hostname( host, port) :
68
+ self . address = ( host, port)
69
+ case let . unixDomainSocket( path) :
70
+ self . address = ( path, nil )
71
+ case nil :
72
+ self . address = nil
73
+ }
63
74
self . isClosed = . init( false )
64
75
}
65
76
@@ -165,10 +176,19 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
165
176
166
177
@inlinable
167
178
func _execute< Command: ValkeyCommand > ( command: Command ) async throws -> RESPToken {
168
- try await withSpan ( Command . name, ofKind: . client) { span in
169
- span. attributes [ " db.system.name " ] = " valkey "
179
+ #if DistributedTracingSupport
180
+ let span = startSpan ( Command . name, ofKind: . client)
181
+ defer { span. end ( ) }
182
+
183
+ span. updateAttributes { attributes in
184
+ attributes [ " db.operation.name " ] = Command . name
185
+ applyCommonAttributes ( to: & attributes)
186
+ }
187
+ #endif
188
+
189
+ let requestID = Self . requestIDGenerator. next ( )
170
190
171
- let requestID = Self . requestIDGenerator . next ( )
191
+ do {
172
192
return try await withTaskCancellationHandler {
173
193
if Task . isCancelled {
174
194
throw ValkeyClientError ( . cancelled)
@@ -179,6 +199,25 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
179
199
} onCancel: {
180
200
self . cancel ( requestID: requestID)
181
201
}
202
+ } catch let error as ValkeyClientError {
203
+ #if DistributedTracingSupport
204
+ span. recordError ( error)
205
+ if let message = error. message {
206
+ var prefixEndIndex = message. startIndex
207
+ while prefixEndIndex < message. endIndex, message [ prefixEndIndex] != " " {
208
+ message. formIndex ( after: & prefixEndIndex)
209
+ }
210
+ let prefix = message [ message. startIndex ..< prefixEndIndex]
211
+ span. attributes [ " db.response.status_code " ] = " \( prefix) "
212
+ span. setStatus ( SpanStatus ( code: . error) )
213
+ }
214
+ #endif
215
+ throw error
216
+ } catch {
217
+ #if DistributedTracingSupport
218
+ span. recordError ( error)
219
+ #endif
220
+ throw error
182
221
}
183
222
}
184
223
@@ -197,8 +236,42 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
197
236
public func execute< each Command : ValkeyCommand > (
198
237
_ commands: repeat each Command
199
238
) async -> sending ( repeat Result < ( each Command ) . Response, Error > ) {
239
+ #if DistributedTracingSupport
240
+ let span = startSpan ( " MULTI " , ofKind: . client)
241
+ defer { span. end ( ) }
242
+
243
+ // We want to suffix the `db.operation.name` if all pipelined commands are of the same type.
244
+ var commandName : String ?
245
+ var operationNameSuffix : String ?
246
+ var commandCount = 0
247
+
248
+ for command in repeat each commands {
249
+ commandCount += 1
250
+ if commandName == nil {
251
+ commandName = Swift . type ( of: command) . name
252
+ operationNameSuffix = commandName
253
+ } else if commandName != Swift . type ( of: command) . name {
254
+ // We should only add a suffix if all commands in the transaction are the same.
255
+ operationNameSuffix = nil
256
+ }
257
+ }
258
+ let operationName = operationNameSuffix. map { " MULTI \( $0) " } ?? " MULTI "
259
+
260
+ span. updateAttributes { attributes in
261
+ attributes [ " db.operation.name " ] = operationName
262
+ attributes [ " db.operation.batch.size " ] = commandCount > 1 ? commandCount : nil
263
+ applyCommonAttributes ( to: & attributes)
264
+ }
265
+ #endif
266
+
200
267
func convert< Response: RESPTokenDecodable > ( _ result: Result < RESPToken , Error > , to: Response . Type ) -> Result < Response , Error > {
201
- result. flatMap {
268
+ #if DistributedTracingSupport
269
+ if case . failure( let error) = result {
270
+ span. recordError ( error)
271
+ }
272
+ #endif
273
+
274
+ return result. flatMap {
202
275
do {
203
276
return try . success( Response ( fromRESP: $0) )
204
277
} catch {
@@ -233,6 +306,16 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
233
306
}
234
307
}
235
308
309
+ @usableFromInline
310
+ func applyCommonAttributes( to attributes: inout SpanAttributes ) {
311
+ // TODO: Should this be redis as recommended by OTel semconv or valkey as seen in valkey-go?
312
+ attributes [ " db.system.name " ] = " valkey "
313
+ attributes [ " network.peer.address " ] = channel. remoteAddress? . ipAddress
314
+ attributes [ " network.peer.port " ] = channel. remoteAddress? . port
315
+ attributes [ " server.address " ] = address? . hostOrSocketPath
316
+ attributes [ " server.port " ] = address? . port == 6379 ? nil : address? . port
317
+ }
318
+
236
319
@usableFromInline
237
320
nonisolated func cancel( requestID: Int ) {
238
321
self . channel. eventLoop. execute {
@@ -298,6 +381,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
298
381
connectionID: connectionID,
299
382
channelHandler: handler,
300
383
configuration: configuration,
384
+ address: address,
301
385
logger: logger
302
386
)
303
387
}
@@ -333,6 +417,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
333
417
connectionID: 0 ,
334
418
channelHandler: handler,
335
419
configuration: configuration,
420
+ address: . hostname( " 127.0.0.1 " , port: 6379 ) ,
336
421
logger: logger
337
422
)
338
423
return channel. connect ( to: try SocketAddress ( ipAddress: " 127.0.0.1 " , port: 6379 ) ) . map {
0 commit comments