Skip to content

Commit 6d475d7

Browse files
committed
Improve tracing support
1 parent dd4fe92 commit 6d475d7

File tree

5 files changed

+109
-51
lines changed

5 files changed

+109
-51
lines changed

Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,18 @@ import Logging
1717
import NIOCore
1818
import NIOPosix
1919
import Synchronization
20+
#if DistributedTracingSupport
21+
import Tracing
22+
#endif
2023
import Valkey
2124

2225
@available(valkeySwift 1.0, *)
2326
func connectionBenchmarks() {
2427
makeConnectionCreateAndDropBenchmark()
2528
makeConnectionGETBenchmark()
29+
#if DistributedTracingSupport
30+
makeConnectionGETNoOpTracerBenchmark()
31+
#endif
2632
makeConnectionPipelineBenchmark()
2733
}
2834

@@ -60,9 +66,45 @@ func makeConnectionGETBenchmark() -> Benchmark? {
6066
return Benchmark("Connection: GET benchmark", configuration: .init(metrics: defaultMetrics, scalingFactor: .kilo)) { benchmark in
6167
let port = serverMutex.withLock { $0 }!.localAddress!.port!
6268
let logger = Logger(label: "test")
69+
#if DistributedTracingSupport
70+
// explicitly set tracer to nil, if trait is enabled
71+
var configuration = ValkeyConnectionConfiguration()
72+
configuration.tracing.tracer = nil
73+
#endif
6374
try await ValkeyConnection.withConnection(
6475
address: .hostname("127.0.0.1", port: port),
65-
configuration: .init(),
76+
configuration: configuration,
77+
logger: logger
78+
) { connection in
79+
benchmark.startMeasurement()
80+
for _ in benchmark.scaledIterations {
81+
let foo = try await connection.get("foo")
82+
precondition(foo.map { String(buffer: $0) } == "Bar")
83+
}
84+
benchmark.stopMeasurement()
85+
}
86+
} setup: {
87+
let server = try await makeLocalServer()
88+
serverMutex.withLock { $0 = server }
89+
} teardown: {
90+
try await serverMutex.withLock { $0 }?.close().get()
91+
}
92+
}
93+
94+
#if DistributedTracingSupport
95+
@available(valkeySwift 1.0, *)
96+
@discardableResult
97+
func makeConnectionGETNoOpTracerBenchmark() -> Benchmark? {
98+
let serverMutex = Mutex<(any Channel)?>(nil)
99+
100+
return Benchmark("Connection: GET benchmark – NoOpTracer", configuration: .init(metrics: defaultMetrics, scalingFactor: .kilo)) { benchmark in
101+
let port = serverMutex.withLock { $0 }!.localAddress!.port!
102+
let logger = Logger(label: "test")
103+
var configuration = ValkeyConnectionConfiguration()
104+
configuration.tracing.tracer = NoOpTracer()
105+
try await ValkeyConnection.withConnection(
106+
address: .hostname("127.0.0.1", port: port),
107+
configuration: configuration,
66108
logger: logger
67109
) { connection in
68110
benchmark.startMeasurement()
@@ -79,6 +121,7 @@ func makeConnectionGETBenchmark() -> Benchmark? {
79121
try await serverMutex.withLock { $0 }?.close().get()
80122
}
81123
}
124+
#endif
82125

83126
@available(valkeySwift 1.0, *)
84127
@discardableResult

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 22 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
3939
public let id: ID
4040
/// Logger used by Server
4141
let logger: Logger
42+
#if DistributedTracingSupport
43+
@usableFromInline
44+
let tracer: (any Tracer)?
45+
#endif
4246
@usableFromInline
4347
let channel: any Channel
4448
@usableFromInline
@@ -63,6 +67,9 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
6367
self.configuration = configuration
6468
self.id = connectionID
6569
self.logger = logger
70+
#if DistributedTracingSupport
71+
self.tracer = configuration.tracing.tracer
72+
#endif
6673
switch address?.value {
6774
case let .hostname(host, port):
6875
self.address = (host, port)
@@ -175,12 +182,11 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
175182
@inlinable
176183
func _execute<Command: ValkeyCommand>(command: Command) async throws -> RESPToken {
177184
#if DistributedTracingSupport
178-
let span = startSpan(Command.name, ofKind: .client)
179-
defer { span.end() }
185+
let span = self.tracer?.startSpan(Command.name, ofKind: .client)
186+
defer { span?.end() }
180187

181-
span.updateAttributes { attributes in
182-
attributes["db.operation.name"] = Command.name
183-
applyCommonAttributes(to: &attributes)
188+
span?.updateAttributes { attributes in
189+
self.applyCommonAttributes(to: &attributes, commandName: Command.name)
184190
}
185191
#endif
186192

@@ -199,21 +205,21 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
199205
}
200206
} catch let error as ValkeyClientError {
201207
#if DistributedTracingSupport
202-
span.recordError(error)
208+
span?.recordError(error)
203209
if let message = error.message {
204210
var prefixEndIndex = message.startIndex
205211
while prefixEndIndex < message.endIndex, message[prefixEndIndex] != " " {
206212
message.formIndex(after: &prefixEndIndex)
207213
}
208214
let prefix = message[message.startIndex..<prefixEndIndex]
209-
span.attributes["db.response.status_code"] = "\(prefix)"
210-
span.setStatus(SpanStatus(code: .error))
215+
span?.attributes["db.response.status_code"] = "\(prefix)"
216+
span?.setStatus(SpanStatus(code: .error))
211217
}
212218
#endif
213219
throw error
214220
} catch {
215221
#if DistributedTracingSupport
216-
span.recordError(error)
222+
span?.recordError(error)
217223
#endif
218224
throw error
219225
}
@@ -230,41 +236,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
230236
public func execute<each Command: ValkeyCommand>(
231237
_ commands: repeat each Command
232238
) async -> sending (repeat Result<(each Command).Response, Error>) {
233-
#if DistributedTracingSupport
234-
let span = startSpan("MULTI", ofKind: .client)
235-
defer { span.end() }
236-
237-
// We want to suffix the `db.operation.name` if all pipelined commands are of the same type.
238-
var commandName: String?
239-
var operationNameSuffix: String?
240-
var commandCount = 0
241-
242-
for command in repeat each commands {
243-
commandCount += 1
244-
if commandName == nil {
245-
commandName = Swift.type(of: command).name
246-
operationNameSuffix = commandName
247-
} else if commandName != Swift.type(of: command).name {
248-
// We should only add a suffix if all commands in the transaction are the same.
249-
operationNameSuffix = nil
250-
}
251-
}
252-
let operationName = operationNameSuffix.map { "MULTI \($0)" } ?? "MULTI"
253-
254-
span.updateAttributes { attributes in
255-
attributes["db.operation.name"] = operationName
256-
attributes["db.operation.batch.size"] = commandCount > 1 ? commandCount : nil
257-
applyCommonAttributes(to: &attributes)
258-
}
259-
#endif
260-
261239
func convert<Response: RESPTokenDecodable>(_ result: Result<RESPToken, Error>, to: Response.Type) -> Result<Response, Error> {
262-
#if DistributedTracingSupport
263-
if case .failure(let error) = result {
264-
span.recordError(error)
265-
}
266-
#endif
267-
268240
return result.flatMap {
269241
do {
270242
return try .success(Response(fromRESP: $0))
@@ -301,12 +273,13 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
301273
}
302274

303275
@usableFromInline
304-
func applyCommonAttributes(to attributes: inout SpanAttributes) {
305-
attributes["db.system.name"] = "valkey"
306-
attributes["network.peer.address"] = channel.remoteAddress?.ipAddress
307-
attributes["network.peer.port"] = channel.remoteAddress?.port
308-
attributes["server.address"] = address?.hostOrSocketPath
309-
attributes["server.port"] = address?.port == 6379 ? nil : address?.port
276+
func applyCommonAttributes(to attributes: inout SpanAttributes, commandName: String) {
277+
attributes[self.configuration.tracing.attributeNames.databaseOperationName] = commandName
278+
attributes[self.configuration.tracing.attributeNames.databaseSystemName] = self.configuration.tracing.attributeValue.databaseSystem
279+
attributes[self.configuration.tracing.attributeNames.networkPeerAddress] = channel.remoteAddress?.ipAddress
280+
attributes[self.configuration.tracing.attributeNames.networkPeerPort] = channel.remoteAddress?.port
281+
attributes[self.configuration.tracing.attributeNames.serverAddress] = address?.hostOrSocketPath
282+
attributes[self.configuration.tracing.attributeNames.serverPort] = address?.port == 6379 ? nil : address?.port
310283
}
311284

312285
@usableFromInline

Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import NIOSSL
16+
#if DistributedTracingSupport
17+
import Tracing
18+
#endif
1619

1720
/// A configuration object that defines how to connect to a Valkey server.
1821
///
@@ -117,6 +120,10 @@ public struct ValkeyConnectionConfiguration: Sendable {
117120
/// Default value is `nil` (no client name is set).
118121
public var clientName: String?
119122

123+
#if DistributedTracingSupport
124+
public var tracing: ValkeyTracingConfiguration = .init()
125+
#endif
126+
120127
/// Creates a new Valkey connection configuration.
121128
///
122129
/// Use this initializer to create a configuration object that can be used to establish
@@ -142,3 +149,26 @@ public struct ValkeyConnectionConfiguration: Sendable {
142149
self.clientName = clientName
143150
}
144151
}
152+
153+
#if DistributedTracingSupport
154+
public struct ValkeyTracingConfiguration: Sendable {
155+
156+
public var tracer: (any Tracer)? = InstrumentationSystem.tracer
157+
158+
public var attributeNames: AttributeNames = .init()
159+
public var attributeValue: AttributeValues = .init()
160+
161+
public struct AttributeNames: Sendable {
162+
public var databaseOperationName: String = "db.operation.name"
163+
public var databaseSystemName: String = "db.system.name"
164+
public var networkPeerAddress: String = "network.peer.address"
165+
public var networkPeerPort: String = "network.peer.port"
166+
public var serverAddress: String = "server.address"
167+
public var serverPort: String = "server.port"
168+
}
169+
170+
public struct AttributeValues: Sendable {
171+
public var databaseSystem: String = "valkey"
172+
}
173+
}
174+
#endif

Sources/Valkey/ValkeyClientConfiguration.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ public struct ValkeyClientConfiguration: Sendable {
124124
/// The TLS to use for the Valkey connection.
125125
public var tls: TLS
126126

127+
#if DistributedTracingSupport
128+
public var tracing: ValkeyTracingConfiguration = .init()
129+
#endif
130+
127131
/// Creates a Valkey client connection configuration.
128132
///
129133
/// - Parameters:

Sources/Valkey/ValkeyConnectionFactory.swift

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ package final class ValkeyConnectionFactory: Sendable {
109109
try await .enable(self.cache!.getSSLContext(), tlsServerName: clientName)
110110
}
111111

112-
return ValkeyConnectionConfiguration(
112+
let newConfig = ValkeyConnectionConfiguration(
113113
authentication: self.configuration.authentication.flatMap {
114114
.init(username: $0.username, password: $0.password)
115115
},
@@ -118,5 +118,13 @@ package final class ValkeyConnectionFactory: Sendable {
118118
tls: tls,
119119
clientName: nil
120120
)
121+
122+
#if DistributedTracingSupport
123+
var mConfig = newConfig
124+
mConfig.tracing = self.configuration.tracing
125+
return mConfig
126+
#else
127+
return newConfig
128+
#endif
121129
}
122130
}

0 commit comments

Comments
 (0)