Skip to content

Commit 144a40c

Browse files
committed
Add Distributed Tracing support behind new trait
Signed-off-by: Moritz Lang <[email protected]>
1 parent 8a3a0bc commit 144a40c

File tree

6 files changed

+520
-5
lines changed

6 files changed

+520
-5
lines changed

Package.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@ let package = Package(
1919
],
2020
traits: [
2121
.trait(name: "ServiceLifecycleSupport"),
22-
.default(enabledTraits: ["ServiceLifecycleSupport"]),
22+
.trait(name: "DistributedTracingSupport"),
23+
.default(enabledTraits: ["ServiceLifecycleSupport", "DistributedTracingSupport"]),
2324
],
2425
dependencies: [
2526
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.0"),
2627
.package(url: "https://github.com/apple/swift-collections.git", from: "1.1.4"),
2728
.package(url: "https://github.com/apple/swift-log.git", from: "1.6.3"),
29+
.package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.0.0"),
2830
.package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"),
2931
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.29.0"),
3032
.package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.23.0"),
@@ -39,6 +41,7 @@ let package = Package(
3941
.byName(name: "_ValkeyConnectionPool"),
4042
.product(name: "DequeModule", package: "swift-collections"),
4143
.product(name: "Logging", package: "swift-log"),
44+
.product(name: "Tracing", package: "swift-distributed-tracing", condition: .when(traits: ["DistributedTracingSupport"])),
4245
.product(name: "NIOCore", package: "swift-nio"),
4346
.product(name: "NIOPosix", package: "swift-nio"),
4447
.product(name: "NIOSSL", package: "swift-nio-ssl"),

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 89 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
4444
@usableFromInline
4545
let channelHandler: ValkeyChannelHandler
4646
let configuration: ValkeyConnectionConfiguration
47+
@usableFromInline
48+
let address: (hostOrSocketPath: String, port: Int?)?
4749
let isClosed: Atomic<Bool>
4850

4951
/// Initialize connection
@@ -52,6 +54,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
5254
connectionID: ID,
5355
channelHandler: ValkeyChannelHandler,
5456
configuration: ValkeyConnectionConfiguration,
57+
address: ValkeyServerAddress?,
5558
logger: Logger
5659
) {
5760
self.unownedExecutor = channel.eventLoop.executor.asUnownedSerialExecutor()
@@ -60,6 +63,14 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
6063
self.configuration = configuration
6164
self.id = connectionID
6265
self.logger = logger
66+
switch address?.value {
67+
case let .hostname(host, port):
68+
self.address = (host, port)
69+
case let .unixDomainSocket(path):
70+
self.address = (path, nil)
71+
case nil:
72+
self.address = nil
73+
}
6374
self.isClosed = .init(false)
6475
}
6576

@@ -165,10 +176,19 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
165176

166177
@inlinable
167178
func _execute<Command: ValkeyCommand>(command: Command) async throws -> RESPToken {
168-
try await withSpan(Command.name, ofKind: .client) { span in
169-
span.attributes["db.system.name"] = "valkey"
179+
#if DistributedTracingSupport
180+
let span = startSpan(Command.name, ofKind: .client)
181+
defer { span.end() }
182+
183+
span.updateAttributes { attributes in
184+
attributes["db.operation.name"] = Command.name
185+
applyCommonAttributes(to: &attributes)
186+
}
187+
#endif
188+
189+
let requestID = Self.requestIDGenerator.next()
170190

171-
let requestID = Self.requestIDGenerator.next()
191+
do {
172192
return try await withTaskCancellationHandler {
173193
if Task.isCancelled {
174194
throw ValkeyClientError(.cancelled)
@@ -179,6 +199,25 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
179199
} onCancel: {
180200
self.cancel(requestID: requestID)
181201
}
202+
} catch let error as ValkeyClientError {
203+
#if DistributedTracingSupport
204+
span.recordError(error)
205+
if let message = error.message {
206+
var prefixEndIndex = message.startIndex
207+
while prefixEndIndex < message.endIndex, message[prefixEndIndex] != " " {
208+
message.formIndex(after: &prefixEndIndex)
209+
}
210+
let prefix = message[message.startIndex ..< prefixEndIndex]
211+
span.attributes["db.response.status_code"] = "\(prefix)"
212+
span.setStatus(SpanStatus(code: .error))
213+
}
214+
#endif
215+
throw error
216+
} catch {
217+
#if DistributedTracingSupport
218+
span.recordError(error)
219+
#endif
220+
throw error
182221
}
183222
}
184223

@@ -197,8 +236,42 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
197236
public func execute<each Command: ValkeyCommand>(
198237
_ commands: repeat each Command
199238
) async -> sending (repeat Result<(each Command).Response, Error>) {
239+
#if DistributedTracingSupport
240+
let span = startSpan("MULTI", ofKind: .client)
241+
defer { span.end() }
242+
243+
// We want to suffix the `db.operation.name` if all pipelined commands are of the same type.
244+
var commandName: String?
245+
var operationNameSuffix: String?
246+
var commandCount = 0
247+
248+
for command in repeat each commands {
249+
commandCount += 1
250+
if commandName == nil {
251+
commandName = Swift.type(of: command).name
252+
operationNameSuffix = commandName
253+
} else if commandName != Swift.type(of: command).name {
254+
// We should only add a suffix if all commands in the transaction are the same.
255+
operationNameSuffix = nil
256+
}
257+
}
258+
let operationName = operationNameSuffix.map { "MULTI \($0)" } ?? "MULTI"
259+
260+
span.updateAttributes { attributes in
261+
attributes["db.operation.name"] = operationName
262+
attributes["db.operation.batch.size"] = commandCount > 1 ? commandCount : nil
263+
applyCommonAttributes(to: &attributes)
264+
}
265+
#endif
266+
200267
func convert<Response: RESPTokenDecodable>(_ result: Result<RESPToken, Error>, to: Response.Type) -> Result<Response, Error> {
201-
result.flatMap {
268+
#if DistributedTracingSupport
269+
if case .failure(let error) = result {
270+
span.recordError(error)
271+
}
272+
#endif
273+
274+
return result.flatMap {
202275
do {
203276
return try .success(Response(fromRESP: $0))
204277
} catch {
@@ -233,6 +306,16 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
233306
}
234307
}
235308

309+
@usableFromInline
310+
func applyCommonAttributes(to attributes: inout SpanAttributes) {
311+
// TODO: Should this be redis as recommended by OTel semconv or valkey as seen in valkey-go?
312+
attributes["db.system.name"] = "valkey"
313+
attributes["network.peer.address"] = channel.remoteAddress?.ipAddress
314+
attributes["network.peer.port"] = channel.remoteAddress?.port
315+
attributes["server.address"] = address?.hostOrSocketPath
316+
attributes["server.port"] = address?.port == 6379 ? nil : address?.port
317+
}
318+
236319
@usableFromInline
237320
nonisolated func cancel(requestID: Int) {
238321
self.channel.eventLoop.execute {
@@ -298,6 +381,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
298381
connectionID: connectionID,
299382
channelHandler: handler,
300383
configuration: configuration,
384+
address: address,
301385
logger: logger
302386
)
303387
}
@@ -333,6 +417,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
333417
connectionID: 0,
334418
channelHandler: handler,
335419
configuration: configuration,
420+
address: .hostname("127.0.0.1", port: 6379),
336421
logger: logger
337422
)
338423
return channel.connect(to: try SocketAddress(ipAddress: "127.0.0.1", port: 6379)).map {

Sources/Valkey/ValkeyConnectionFactory.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ package final class ValkeyConnectionFactory: Sendable {
9191
connectionID: connectionID,
9292
channelHandler: channelHandler,
9393
configuration: connectionConfig,
94+
address: nil,
9495
logger: logger
9596
)
9697
}.get()

Tests/IntegrationTests/ValkeyTests.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ struct GeneratedCommands {
7474
func testValkeyCommand() async throws {
7575
struct GET: ValkeyCommand {
7676
typealias Response = String?
77+
static let name = "GET"
7778

7879
var key: ValkeyKey
7980

0 commit comments

Comments
 (0)