Skip to content

Commit 064e92f

Browse files
slashmofabianfett
andauthored
Add Distributed Tracing support (#177)
* Add static name to Valkey commands Signed-off-by: Moritz Lang <[email protected]> * Add Distributed Tracing support behind new trait Signed-off-by: Moritz Lang <[email protected]> * Add open-telemetry example showcasing Distributed Tracing support Signed-off-by: Moritz Lang <[email protected]> * Run swift-format Signed-off-by: Moritz Lang <[email protected]> * Document open-telemetry example Signed-off-by: Moritz Lang <[email protected]> * Remove left-over TODO Signed-off-by: Moritz Lang <[email protected]> * Re-format docker-compose.yml Signed-off-by: Moritz Lang <[email protected]> * Re-format ValkeyConnectionTests.swift Signed-off-by: Moritz Lang <[email protected]> * Clean up service bootstrap in open-telemetry example Signed-off-by: Moritz Lang <[email protected]> * Remove open-telemetry example Signed-off-by: Moritz Lang <[email protected]> * Improve tracing support Signed-off-by: Fabian Fett <[email protected]> * Fix tests Signed-off-by: Fabian Fett <[email protected]> * swift-format Signed-off-by: Fabian Fett <[email protected]> * swift-format Signed-off-by: Fabian Fett <[email protected]> * Use TestTracer from TracingTestKit Signed-off-by: Moritz Lang <[email protected]> * Put ValkeyConnection.address behind Tracing trait Signed-off-by: Moritz Lang <[email protected]> * Extract simple error prefix into computed variable Signed-off-by: Moritz Lang <[email protected]> * Add missing availability macro to ValkeyTracingConfiguration Signed-off-by: Moritz Lang <[email protected]> * Switch to swift-distributed-tracing upstream Signed-off-by: Moritz Lang <[email protected]> * Fix compilation without DistributedTracingSupport trait Signed-off-by: Moritz Lang <[email protected]> * Fix benchmark compilation Signed-off-by: Moritz Lang <[email protected]> * Document tracing configuration Signed-off-by: Moritz Lang <[email protected]> * Set span status code for all errors Signed-off-by: Moritz Lang <[email protected]> --------- Signed-off-by: Moritz Lang <[email protected]> Signed-off-by: Fabian Fett <[email protected]> Co-authored-by: Fabian Fett <[email protected]>
1 parent cab1563 commit 064e92f

File tree

9 files changed

+463
-13
lines changed

9 files changed

+463
-13
lines changed

.gitignore

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
.DS_Store
2-
/.build
2+
.build
3+
.swiftpm
34
/Packages
45
xcuserdata/
56
DerivedData/
@@ -10,4 +11,4 @@ DerivedData/
1011
Package.resolved
1112
.benchmarkBaselines/
1213
.swift-version
13-
.docc-build
14+
.docc-build

Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,17 @@ import NIOPosix
1414
import Synchronization
1515
import Valkey
1616

17+
#if DistributedTracingSupport
18+
import Tracing
19+
#endif
20+
1721
@available(valkeySwift 1.0, *)
1822
func connectionBenchmarks() {
1923
makeConnectionCreateAndDropBenchmark()
2024
makeConnectionGETBenchmark()
25+
#if DistributedTracingSupport
26+
makeConnectionGETNoOpTracerBenchmark()
27+
#endif
2128
makeConnectionPipelineBenchmark()
2229
}
2330

@@ -58,9 +65,47 @@ func makeConnectionGETBenchmark() -> Benchmark? {
5865
return Benchmark("Connection: GET benchmark", configuration: .init(metrics: defaultMetrics, scalingFactor: .kilo)) { benchmark in
5966
let port = serverMutex.withLock { $0 }!.localAddress!.port!
6067
let logger = Logger(label: "test")
68+
#if DistributedTracingSupport
69+
// explicitly set tracer to nil, if trait is enabled
70+
var configuration = ValkeyConnectionConfiguration()
71+
configuration.tracing.tracer = nil
72+
#else
73+
let configuration = ValkeyConnectionConfiguration()
74+
#endif
6175
try await ValkeyConnection.withConnection(
6276
address: .hostname("127.0.0.1", port: port),
63-
configuration: .init(),
77+
configuration: configuration,
78+
logger: logger
79+
) { connection in
80+
benchmark.startMeasurement()
81+
for _ in benchmark.scaledIterations {
82+
let foo = try await connection.get("foo")
83+
precondition(foo.map { String(buffer: $0) } == "Bar")
84+
}
85+
benchmark.stopMeasurement()
86+
}
87+
} setup: {
88+
let server = try await makeLocalServer()
89+
serverMutex.withLock { $0 = server }
90+
} teardown: {
91+
try await serverMutex.withLock { $0 }?.close().get()
92+
}
93+
}
94+
95+
#if DistributedTracingSupport
96+
@available(valkeySwift 1.0, *)
97+
@discardableResult
98+
func makeConnectionGETNoOpTracerBenchmark() -> Benchmark? {
99+
let serverMutex = Mutex<(any Channel)?>(nil)
100+
101+
return Benchmark("Connection: GET benchmark – NoOpTracer", configuration: .init(metrics: defaultMetrics, scalingFactor: .kilo)) { benchmark in
102+
let port = serverMutex.withLock { $0 }!.localAddress!.port!
103+
let logger = Logger(label: "test")
104+
var configuration = ValkeyConnectionConfiguration()
105+
configuration.tracing.tracer = NoOpTracer()
106+
try await ValkeyConnection.withConnection(
107+
address: .hostname("127.0.0.1", port: port),
108+
configuration: configuration,
64109
logger: logger
65110
) { connection in
66111
benchmark.startMeasurement()
@@ -77,6 +122,7 @@ func makeConnectionGETBenchmark() -> Benchmark? {
77122
try await serverMutex.withLock { $0 }?.close().get()
78123
}
79124
}
125+
#endif
80126

81127
@available(valkeySwift 1.0, *)
82128
@discardableResult

Package.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@ let package = Package(
1818
],
1919
traits: [
2020
.trait(name: "ServiceLifecycleSupport"),
21-
.default(enabledTraits: ["ServiceLifecycleSupport"]),
21+
.trait(name: "DistributedTracingSupport"),
22+
.default(enabledTraits: ["ServiceLifecycleSupport", "DistributedTracingSupport"]),
2223
],
2324
dependencies: [
2425
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.0"),
2526
.package(url: "https://github.com/apple/swift-collections.git", from: "1.1.4"),
2627
.package(url: "https://github.com/apple/swift-log.git", from: "1.6.3"),
28+
.package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.3.0"),
2729
.package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"),
2830
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.29.0"),
2931
.package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.23.0"),
@@ -36,6 +38,7 @@ let package = Package(
3638
.byName(name: "_ValkeyConnectionPool"),
3739
.product(name: "DequeModule", package: "swift-collections"),
3840
.product(name: "Logging", package: "swift-log"),
41+
.product(name: "Tracing", package: "swift-distributed-tracing", condition: .when(traits: ["DistributedTracingSupport"])),
3942
.product(name: "NIOCore", package: "swift-nio"),
4043
.product(name: "NIOPosix", package: "swift-nio"),
4144
.product(name: "NIOSSL", package: "swift-nio-ssl"),
@@ -90,6 +93,7 @@ let package = Package(
9093
.product(name: "NIOTestUtils", package: "swift-nio"),
9194
.product(name: "Logging", package: "swift-log"),
9295
.product(name: "NIOEmbedded", package: "swift-nio"),
96+
.product(name: "InMemoryTracing", package: "swift-distributed-tracing", condition: .when(traits: ["DistributedTracingSupport"])),
9397
],
9498
swiftSettings: defaultSwiftSettings
9599
),

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 91 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ import Network
1717
import NIOTransportServices
1818
#endif
1919

20+
#if DistributedTracingSupport
21+
import Tracing
22+
#endif
23+
2024
/// A single connection to a Valkey database.
2125
@available(valkeySwift 1.0, *)
2226
public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
@@ -29,6 +33,12 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
2933
public let id: ID
3034
/// Logger used by Server
3135
let logger: Logger
36+
#if DistributedTracingSupport
37+
@usableFromInline
38+
let tracer: (any Tracer)?
39+
@usableFromInline
40+
let address: (hostOrSocketPath: String, port: Int?)?
41+
#endif
3242
@usableFromInline
3343
let channel: any Channel
3444
@usableFromInline
@@ -42,6 +52,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
4252
connectionID: ID,
4353
channelHandler: ValkeyChannelHandler,
4454
configuration: ValkeyConnectionConfiguration,
55+
address: ValkeyServerAddress?,
4556
logger: Logger
4657
) {
4758
self.unownedExecutor = channel.eventLoop.executor.asUnownedSerialExecutor()
@@ -50,6 +61,17 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
5061
self.configuration = configuration
5162
self.id = connectionID
5263
self.logger = logger
64+
#if DistributedTracingSupport
65+
self.tracer = configuration.tracing.tracer
66+
switch address?.value {
67+
case let .hostname(host, port):
68+
self.address = (host, port)
69+
case let .unixDomainSocket(path):
70+
self.address = (path, nil)
71+
case nil:
72+
self.address = nil
73+
}
74+
#endif
5375
self.isClosed = .init(false)
5476
}
5577

@@ -153,16 +175,47 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
153175

154176
@inlinable
155177
func _execute<Command: ValkeyCommand>(command: Command) async throws -> RESPToken {
178+
#if DistributedTracingSupport
179+
let span = self.tracer?.startSpan(Command.name, ofKind: .client)
180+
defer { span?.end() }
181+
182+
span?.updateAttributes { attributes in
183+
self.applyCommonAttributes(to: &attributes, commandName: Command.name)
184+
}
185+
#endif
186+
156187
let requestID = Self.requestIDGenerator.next()
157-
return try await withTaskCancellationHandler {
158-
if Task.isCancelled {
159-
throw ValkeyClientError(.cancelled)
188+
189+
do {
190+
return try await withTaskCancellationHandler {
191+
if Task.isCancelled {
192+
throw ValkeyClientError(.cancelled)
193+
}
194+
return try await withCheckedThrowingContinuation { continuation in
195+
self.channelHandler.write(command: command, continuation: continuation, requestID: requestID)
196+
}
197+
} onCancel: {
198+
self.cancel(requestID: requestID)
199+
}
200+
} catch let error as ValkeyClientError {
201+
#if DistributedTracingSupport
202+
if let span {
203+
span.recordError(error)
204+
span.setStatus(SpanStatus(code: .error))
205+
if let prefix = error.simpleErrorPrefix {
206+
span.attributes["db.response.status_code"] = "\(prefix)"
207+
}
160208
}
161-
return try await withCheckedThrowingContinuation { continuation in
162-
self.channelHandler.write(command: command, continuation: continuation, requestID: requestID)
209+
#endif
210+
throw error
211+
} catch {
212+
#if DistributedTracingSupport
213+
if let span {
214+
span.recordError(error)
215+
span.setStatus(SpanStatus(code: .error))
163216
}
164-
} onCancel: {
165-
self.cancel(requestID: requestID)
217+
#endif
218+
throw error
166219
}
167220
}
168221

@@ -213,6 +266,18 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
213266
}
214267
}
215268

269+
#if DistributedTracingSupport
270+
@usableFromInline
271+
func applyCommonAttributes(to attributes: inout SpanAttributes, commandName: String) {
272+
attributes[self.configuration.tracing.attributeNames.databaseOperationName] = commandName
273+
attributes[self.configuration.tracing.attributeNames.databaseSystemName] = self.configuration.tracing.attributeValues.databaseSystem
274+
attributes[self.configuration.tracing.attributeNames.networkPeerAddress] = channel.remoteAddress?.ipAddress
275+
attributes[self.configuration.tracing.attributeNames.networkPeerPort] = channel.remoteAddress?.port
276+
attributes[self.configuration.tracing.attributeNames.serverAddress] = address?.hostOrSocketPath
277+
attributes[self.configuration.tracing.attributeNames.serverPort] = address?.port == 6379 ? nil : address?.port
278+
}
279+
#endif
280+
216281
@usableFromInline
217282
nonisolated func cancel(requestID: Int) {
218283
self.channel.eventLoop.execute {
@@ -278,6 +343,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
278343
connectionID: connectionID,
279344
channelHandler: handler,
280345
configuration: configuration,
346+
address: address,
281347
logger: logger
282348
)
283349
}
@@ -313,6 +379,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
313379
connectionID: 0,
314380
channelHandler: handler,
315381
configuration: configuration,
382+
address: .hostname("127.0.0.1", port: 6379),
316383
logger: logger
317384
)
318385
return channel.connect(to: try SocketAddress(ipAddress: "127.0.0.1", port: 6379)).map {
@@ -390,3 +457,20 @@ struct AutoIncrementingInteger {
390457
return value - 1
391458
}
392459
}
460+
461+
#if DistributedTracingSupport
462+
extension ValkeyClientError {
463+
/// Extract the simple error prefix from this error.
464+
///
465+
/// - SeeAlso: [](https://valkey.io/topics/protocol/#simple-errors)
466+
@usableFromInline
467+
var simpleErrorPrefix: Substring? {
468+
guard let message else { return nil }
469+
var prefixEndIndex = message.startIndex
470+
while prefixEndIndex < message.endIndex, message[prefixEndIndex] != " " {
471+
message.formIndex(after: &prefixEndIndex)
472+
}
473+
return message[message.startIndex..<prefixEndIndex]
474+
}
475+
}
476+
#endif

Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88

99
import NIOSSL
1010

11+
#if DistributedTracingSupport
12+
import Tracing
13+
#endif
14+
1115
/// A configuration object that defines how to connect to a Valkey server.
1216
///
1317
/// `ValkeyConnectionConfiguration` allows you to customize various aspects of the connection,
@@ -112,6 +116,12 @@ public struct ValkeyConnectionConfiguration: Sendable {
112116
/// Default value is `nil` (no client name is set).
113117
public var clientName: String?
114118

119+
#if DistributedTracingSupport
120+
/// The distributed tracing configuration to use for this connection.
121+
/// Defaults to using the globally bootstrapped tracer with OpenTelemetry semantic conventions.
122+
public var tracing: ValkeyTracingConfiguration = .init()
123+
#endif
124+
115125
/// Creates a new Valkey connection configuration.
116126
///
117127
/// Use this initializer to create a configuration object that can be used to establish
@@ -137,3 +147,34 @@ public struct ValkeyConnectionConfiguration: Sendable {
137147
self.clientName = clientName
138148
}
139149
}
150+
151+
#if DistributedTracingSupport
152+
@available(valkeySwift 1.0, *)
153+
/// A configuration object that defines distributed tracing behavior of a Valkey client.
154+
public struct ValkeyTracingConfiguration: Sendable {
155+
/// The tracer to use, or `nil` to disable tracing.
156+
/// Defaults to the globally bootstrapped tracer.
157+
public var tracer: (any Tracer)? = InstrumentationSystem.tracer
158+
159+
/// The attribute names used in spans created by Valkey. Defaults to OpenTelemetry semantics.
160+
public var attributeNames: AttributeNames = .init()
161+
162+
/// The static attribute values used in spans created by Valkey.
163+
public var attributeValues: AttributeValues = .init()
164+
165+
/// Attribute names used in spans created by Valkey.
166+
public struct AttributeNames: Sendable {
167+
public var databaseOperationName: String = "db.operation.name"
168+
public var databaseSystemName: String = "db.system.name"
169+
public var networkPeerAddress: String = "network.peer.address"
170+
public var networkPeerPort: String = "network.peer.port"
171+
public var serverAddress: String = "server.address"
172+
public var serverPort: String = "server.port"
173+
}
174+
175+
/// Static attribute values used in spans created by Valkey.
176+
public struct AttributeValues: Sendable {
177+
public var databaseSystem: String = "valkey"
178+
}
179+
}
180+
#endif

Sources/Valkey/ValkeyClientConfiguration.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@ public struct ValkeyClientConfiguration: Sendable {
118118
/// The TLS to use for the Valkey connection.
119119
public var tls: TLS
120120

121+
#if DistributedTracingSupport
122+
/// The distributed tracing configuration to use for the Valkey connection.
123+
/// Defaults to using the globally bootstrapped tracer with OpenTelemetry semantic conventions.
124+
public var tracing: ValkeyTracingConfiguration = .init()
125+
#endif
126+
121127
/// Creates a Valkey client connection configuration.
122128
///
123129
/// - Parameters:

Sources/Valkey/ValkeyConnectionFactory.swift

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ package final class ValkeyConnectionFactory: Sendable {
8585
connectionID: connectionID,
8686
channelHandler: channelHandler,
8787
configuration: connectionConfig,
88+
address: nil,
8889
logger: logger
8990
)
9091
}.get()
@@ -102,7 +103,7 @@ package final class ValkeyConnectionFactory: Sendable {
102103
try await .enable(self.cache!.getSSLContext(), tlsServerName: clientName)
103104
}
104105

105-
return ValkeyConnectionConfiguration(
106+
let newConfig = ValkeyConnectionConfiguration(
106107
authentication: self.configuration.authentication.flatMap {
107108
.init(username: $0.username, password: $0.password)
108109
},
@@ -111,5 +112,13 @@ package final class ValkeyConnectionFactory: Sendable {
111112
tls: tls,
112113
clientName: nil
113114
)
115+
116+
#if DistributedTracingSupport
117+
var mConfig = newConfig
118+
mConfig.tracing = self.configuration.tracing
119+
return mConfig
120+
#else
121+
return newConfig
122+
#endif
114123
}
115124
}

Tests/IntegrationTests/ClientIntegrationTests.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ struct ClientIntegratedTests {
7070
func testValkeyCommand() async throws {
7171
struct GET: ValkeyCommand {
7272
typealias Response = String?
73-
7473
static let name = "GET"
7574

7675
var key: ValkeyKey

0 commit comments

Comments
 (0)