Skip to content

Commit 9129908

Browse files
committed
Improve tracing support
1 parent d09f0f2 commit 9129908

File tree

5 files changed

+109
-51
lines changed

5 files changed

+109
-51
lines changed

Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,17 @@ import Logging
1717
import NIOCore
1818
import NIOPosix
1919
import Synchronization
20+
#if DistributedTracingSupport
21+
import Tracing
22+
#endif
2023
import Valkey
2124

2225
@available(valkeySwift 1.0, *)
2326
func connectionBenchmarks() {
2427
makeConnectionGETBenchmark()
28+
#if DistributedTracingSupport
29+
makeConnectionGETNoOpTracerBenchmark()
30+
#endif
2531
makeConnectionPipelineBenchmark()
2632
}
2733

@@ -33,9 +39,45 @@ func makeConnectionGETBenchmark() -> Benchmark? {
3339
return Benchmark("Connection: GET benchmark", configuration: .init(metrics: defaultMetrics, scalingFactor: .kilo)) { benchmark in
3440
let port = serverMutex.withLock { $0 }!.localAddress!.port!
3541
let logger = Logger(label: "test")
42+
#if DistributedTracingSupport
43+
// explicitly set tracer to nil, if trait is enabled
44+
var configuration = ValkeyConnectionConfiguration()
45+
configuration.tracing.tracer = nil
46+
#endif
3647
try await ValkeyConnection.withConnection(
3748
address: .hostname("127.0.0.1", port: port),
38-
configuration: .init(),
49+
configuration: configuration,
50+
logger: logger
51+
) { connection in
52+
benchmark.startMeasurement()
53+
for _ in benchmark.scaledIterations {
54+
let foo = try await connection.get("foo")
55+
precondition(foo.map { String(buffer: $0) } == "Bar")
56+
}
57+
benchmark.stopMeasurement()
58+
}
59+
} setup: {
60+
let server = try await makeLocalServer()
61+
serverMutex.withLock { $0 = server }
62+
} teardown: {
63+
try await serverMutex.withLock { $0 }?.close().get()
64+
}
65+
}
66+
67+
#if DistributedTracingSupport
68+
@available(valkeySwift 1.0, *)
69+
@discardableResult
70+
func makeConnectionGETNoOpTracerBenchmark() -> Benchmark? {
71+
let serverMutex = Mutex<(any Channel)?>(nil)
72+
73+
return Benchmark("Connection: GET benchmark – NoOpTracer", configuration: .init(metrics: defaultMetrics, scalingFactor: .kilo)) { benchmark in
74+
let port = serverMutex.withLock { $0 }!.localAddress!.port!
75+
let logger = Logger(label: "test")
76+
var configuration = ValkeyConnectionConfiguration()
77+
configuration.tracing.tracer = NoOpTracer()
78+
try await ValkeyConnection.withConnection(
79+
address: .hostname("127.0.0.1", port: port),
80+
configuration: configuration,
3981
logger: logger
4082
) { connection in
4183
benchmark.startMeasurement()
@@ -52,6 +94,7 @@ func makeConnectionGETBenchmark() -> Benchmark? {
5294
try await serverMutex.withLock { $0 }?.close().get()
5395
}
5496
}
97+
#endif
5598

5699
@available(valkeySwift 1.0, *)
57100
@discardableResult

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 22 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
3939
public let id: ID
4040
/// Logger used by Server
4141
let logger: Logger
42+
#if DistributedTracingSupport
43+
@usableFromInline
44+
let tracer: (any Tracer)?
45+
#endif
4246
@usableFromInline
4347
let channel: any Channel
4448
@usableFromInline
@@ -63,6 +67,9 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
6367
self.configuration = configuration
6468
self.id = connectionID
6569
self.logger = logger
70+
#if DistributedTracingSupport
71+
self.tracer = configuration.tracing.tracer
72+
#endif
6673
switch address?.value {
6774
case let .hostname(host, port):
6875
self.address = (host, port)
@@ -177,12 +184,11 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
177184
@inlinable
178185
func _execute<Command: ValkeyCommand>(command: Command) async throws -> RESPToken {
179186
#if DistributedTracingSupport
180-
let span = startSpan(Command.name, ofKind: .client)
181-
defer { span.end() }
187+
let span = self.tracer?.startSpan(Command.name, ofKind: .client)
188+
defer { span?.end() }
182189

183-
span.updateAttributes { attributes in
184-
attributes["db.operation.name"] = Command.name
185-
applyCommonAttributes(to: &attributes)
190+
span?.updateAttributes { attributes in
191+
self.applyCommonAttributes(to: &attributes, commandName: Command.name)
186192
}
187193
#endif
188194

@@ -201,21 +207,21 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
201207
}
202208
} catch let error as ValkeyClientError {
203209
#if DistributedTracingSupport
204-
span.recordError(error)
210+
span?.recordError(error)
205211
if let message = error.message {
206212
var prefixEndIndex = message.startIndex
207213
while prefixEndIndex < message.endIndex, message[prefixEndIndex] != " " {
208214
message.formIndex(after: &prefixEndIndex)
209215
}
210216
let prefix = message[message.startIndex..<prefixEndIndex]
211-
span.attributes["db.response.status_code"] = "\(prefix)"
212-
span.setStatus(SpanStatus(code: .error))
217+
span?.attributes["db.response.status_code"] = "\(prefix)"
218+
span?.setStatus(SpanStatus(code: .error))
213219
}
214220
#endif
215221
throw error
216222
} catch {
217223
#if DistributedTracingSupport
218-
span.recordError(error)
224+
span?.recordError(error)
219225
#endif
220226
throw error
221227
}
@@ -236,41 +242,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
236242
public func execute<each Command: ValkeyCommand>(
237243
_ commands: repeat each Command
238244
) async -> sending (repeat Result<(each Command).Response, Error>) {
239-
#if DistributedTracingSupport
240-
let span = startSpan("MULTI", ofKind: .client)
241-
defer { span.end() }
242-
243-
// We want to suffix the `db.operation.name` if all pipelined commands are of the same type.
244-
var commandName: String?
245-
var operationNameSuffix: String?
246-
var commandCount = 0
247-
248-
for command in repeat each commands {
249-
commandCount += 1
250-
if commandName == nil {
251-
commandName = Swift.type(of: command).name
252-
operationNameSuffix = commandName
253-
} else if commandName != Swift.type(of: command).name {
254-
// We should only add a suffix if all commands in the transaction are the same.
255-
operationNameSuffix = nil
256-
}
257-
}
258-
let operationName = operationNameSuffix.map { "MULTI \($0)" } ?? "MULTI"
259-
260-
span.updateAttributes { attributes in
261-
attributes["db.operation.name"] = operationName
262-
attributes["db.operation.batch.size"] = commandCount > 1 ? commandCount : nil
263-
applyCommonAttributes(to: &attributes)
264-
}
265-
#endif
266-
267245
func convert<Response: RESPTokenDecodable>(_ result: Result<RESPToken, Error>, to: Response.Type) -> Result<Response, Error> {
268-
#if DistributedTracingSupport
269-
if case .failure(let error) = result {
270-
span.recordError(error)
271-
}
272-
#endif
273-
274246
return result.flatMap {
275247
do {
276248
return try .success(Response(fromRESP: $0))
@@ -307,12 +279,13 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
307279
}
308280

309281
@usableFromInline
310-
func applyCommonAttributes(to attributes: inout SpanAttributes) {
311-
attributes["db.system.name"] = "valkey"
312-
attributes["network.peer.address"] = channel.remoteAddress?.ipAddress
313-
attributes["network.peer.port"] = channel.remoteAddress?.port
314-
attributes["server.address"] = address?.hostOrSocketPath
315-
attributes["server.port"] = address?.port == 6379 ? nil : address?.port
282+
func applyCommonAttributes(to attributes: inout SpanAttributes, commandName: String) {
283+
attributes[self.configuration.tracing.attributeNames.databaseOperationName] = commandName
284+
attributes[self.configuration.tracing.attributeNames.databaseSystemName] = self.configuration.tracing.attributeValue.databaseSystem
285+
attributes[self.configuration.tracing.attributeNames.networkPeerAddress] = channel.remoteAddress?.ipAddress
286+
attributes[self.configuration.tracing.attributeNames.networkPeerPort] = channel.remoteAddress?.port
287+
attributes[self.configuration.tracing.attributeNames.serverAddress] = address?.hostOrSocketPath
288+
attributes[self.configuration.tracing.attributeNames.serverPort] = address?.port == 6379 ? nil : address?.port
316289
}
317290

318291
@usableFromInline

Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import NIOSSL
16+
#if DistributedTracingSupport
17+
import Tracing
18+
#endif
1619

1720
/// A configuration object that defines how to connect to a Valkey server.
1821
///
@@ -117,6 +120,10 @@ public struct ValkeyConnectionConfiguration: Sendable {
117120
/// Default value is `nil` (no client name is set).
118121
public var clientName: String?
119122

123+
#if DistributedTracingSupport
124+
public var tracing: ValkeyTracingConfiguration = .init()
125+
#endif
126+
120127
/// Creates a new Valkey connection configuration.
121128
///
122129
/// Use this initializer to create a configuration object that can be used to establish
@@ -142,3 +149,26 @@ public struct ValkeyConnectionConfiguration: Sendable {
142149
self.clientName = clientName
143150
}
144151
}
152+
153+
#if DistributedTracingSupport
154+
public struct ValkeyTracingConfiguration: Sendable {
155+
156+
public var tracer: (any Tracer)? = InstrumentationSystem.tracer
157+
158+
public var attributeNames: AttributeNames = .init()
159+
public var attributeValue: AttributeValues = .init()
160+
161+
public struct AttributeNames: Sendable {
162+
public var databaseOperationName: String = "db.operation.name"
163+
public var databaseSystemName: String = "db.system.name"
164+
public var networkPeerAddress: String = "network.peer.address"
165+
public var networkPeerPort: String = "network.peer.port"
166+
public var serverAddress: String = "server.address"
167+
public var serverPort: String = "server.port"
168+
}
169+
170+
public struct AttributeValues: Sendable {
171+
public var databaseSystem: String = "valkey"
172+
}
173+
}
174+
#endif

Sources/Valkey/ValkeyClientConfiguration.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ public struct ValkeyClientConfiguration: Sendable {
124124
/// The TLS to use for the Valkey connection.
125125
public var tls: TLS
126126

127+
#if DistributedTracingSupport
128+
public var tracing: ValkeyTracingConfiguration = .init()
129+
#endif
130+
127131
/// Creates a Valkey client connection configuration.
128132
///
129133
/// - Parameters:

Sources/Valkey/ValkeyConnectionFactory.swift

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ package final class ValkeyConnectionFactory: Sendable {
109109
try await .enable(self.cache!.getSSLContext(), tlsServerName: clientName)
110110
}
111111

112-
return ValkeyConnectionConfiguration(
112+
let newConfig = ValkeyConnectionConfiguration(
113113
authentication: self.configuration.authentication.flatMap {
114114
.init(username: $0.username, password: $0.password)
115115
},
@@ -118,5 +118,13 @@ package final class ValkeyConnectionFactory: Sendable {
118118
tls: tls,
119119
clientName: nil
120120
)
121+
122+
#if DistributedTracingSupport
123+
var mConfig = newConfig
124+
mConfig.tracing = self.configuration.tracing
125+
return mConfig
126+
#else
127+
return newConfig
128+
#endif
121129
}
122130
}

0 commit comments

Comments
 (0)