Skip to content

Commit 48530eb

Browse files
committed
Handle response failures + do not record custom events
1 parent 2929500 commit 48530eb

File tree

4 files changed

+146
-132
lines changed

4 files changed

+146
-132
lines changed
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2025, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
internal struct HookedRPCAsyncSequence<Wrapped: AsyncSequence & Sendable>: AsyncSequence, Sendable where Wrapped.Element: Sendable {
18+
private let wrapped: Wrapped
19+
20+
private let forEachElement: @Sendable (Wrapped.Element) -> Void
21+
private let onFinish: @Sendable () -> Void
22+
private let onFailure: @Sendable (any Error) -> Void
23+
24+
init(
25+
wrapping sequence: Wrapped,
26+
forEachElement: @escaping @Sendable (Wrapped.Element) -> Void,
27+
onFinish: @escaping @Sendable () -> Void,
28+
onFailure: @escaping @Sendable (any Error) -> Void
29+
) {
30+
self.wrapped = sequence
31+
self.forEachElement = forEachElement
32+
self.onFinish = onFinish
33+
self.onFailure = onFailure
34+
}
35+
36+
func makeAsyncIterator() -> HookedAsyncIterator {
37+
HookedAsyncIterator(
38+
self.wrapped,
39+
forEachElement: self.forEachElement,
40+
onFinish: self.onFinish,
41+
onFailure: self.onFailure
42+
)
43+
}
44+
45+
struct HookedAsyncIterator: AsyncIteratorProtocol {
46+
typealias Element = Wrapped.Element
47+
48+
private var wrapped: Wrapped.AsyncIterator
49+
private let forEachElement: @Sendable (Wrapped.Element) -> Void
50+
private let onFinish: @Sendable () -> Void
51+
private let onFailure: @Sendable (any Error) -> Void
52+
53+
init(
54+
_ sequence: Wrapped,
55+
forEachElement: @escaping @Sendable (Wrapped.Element) -> Void,
56+
onFinish: @escaping @Sendable () -> Void,
57+
onFailure: @escaping @Sendable (any Error) -> Void
58+
) {
59+
self.wrapped = sequence.makeAsyncIterator()
60+
self.forEachElement = forEachElement
61+
self.onFinish = onFinish
62+
self.onFailure = onFailure
63+
}
64+
65+
mutating func next(isolation actor: isolated (any Actor)?) async throws(Wrapped.Failure) -> Wrapped.Element? {
66+
do {
67+
if let element = try await self.wrapped.next(isolation: actor) {
68+
self.forEachElement(element)
69+
return element
70+
}
71+
72+
self.onFinish()
73+
return nil
74+
} catch {
75+
self.onFailure(error)
76+
throw error
77+
}
78+
}
79+
80+
mutating func next() async throws -> Wrapped.Element? {
81+
do {
82+
if let element = try await self.wrapped.next() {
83+
self.forEachElement(element)
84+
return element
85+
}
86+
87+
self.onFinish()
88+
return nil
89+
} catch {
90+
self.onFailure(error)
91+
throw error
92+
}
93+
}
94+
}
95+
}

Sources/GRPCInterceptors/OnFinishAsyncSequence.swift

Lines changed: 0 additions & 56 deletions
This file was deleted.

Sources/GRPCInterceptors/Tracing/ClientOTelTracingInterceptor.swift

Lines changed: 33 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ internal import Tracing
2626
/// For more information, refer to the documentation for `swift-distributed-tracing`.
2727
public struct ClientOTelTracingInterceptor: ClientInterceptor {
2828
private let injector: ClientRequestInjector
29-
private let emitEventOnEachWrite: Bool
29+
private let traceEachMessage: Bool
3030
private var serverHostname: String
3131
private var networkTransportMethod: String
3232

@@ -36,17 +36,17 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor {
3636
/// - severHostname: The hostname of the RPC server. This will be the value for the `server.address` attribute in spans.
3737
/// - networkTransportMethod: The transport in use (e.g. "tcp", "udp"). This will be the value for the
3838
/// `network.transport` attribute in spans.
39-
/// - emitEventOnEachWrite: If `true`, each request part sent and response part received will be recorded as a separate
39+
/// - traceEachMessage: If `true`, each request part sent and response part received will be recorded as a separate
4040
/// event in a tracing span. Otherwise, only the request/response start and end will be recorded as events.
4141
public init(
4242
serverHostname: String,
4343
networkTransportMethod: String,
44-
emitEventOnEachWrite: Bool = false
44+
traceEachMessage: Bool = true
4545
) {
4646
self.injector = ClientRequestInjector()
4747
self.serverHostname = serverHostname
4848
self.networkTransportMethod = networkTransportMethod
49-
self.emitEventOnEachWrite = emitEventOnEachWrite
49+
self.traceEachMessage = traceEachMessage
5050
}
5151

5252
/// This interceptor will inject as the request's metadata whatever `ServiceContext` key-value pairs
@@ -84,10 +84,8 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor {
8484
) { span in
8585
self.setOTelSpanAttributes(into: span, context: context)
8686

87-
span.addEvent("Request started")
88-
89-
let wrappedProducer = request.producer
90-
if self.emitEventOnEachWrite {
87+
if self.traceEachMessage {
88+
let wrappedProducer = request.producer
9189
request.producer = { writer in
9290
let messageSentCounter = Atomic(1)
9391
let eventEmittingWriter = HookedWriter(
@@ -104,54 +102,53 @@ public struct ClientOTelTracingInterceptor: ClientInterceptor {
104102
}
105103
)
106104
try await wrappedProducer(RPCWriter(wrapping: eventEmittingWriter))
107-
span.addEvent("Request ended")
108-
}
109-
} else {
110-
request.producer = { writer in
111-
try await wrappedProducer(RPCWriter(wrapping: writer))
112-
span.addEvent("Request ended")
113105
}
114106
}
115107

116108
var response = try await next(request, context)
117109
switch response.accepted {
118110
case .success(var success):
119-
span.addEvent("Received response start")
120-
span.attributes.rpc.grpcStatusCode = 0
121-
if self.emitEventOnEachWrite {
111+
let hookedSequence: HookedRPCAsyncSequence<
112+
RPCAsyncSequence<StreamingClientResponse<Output>.Contents.BodyPart, any Error>
113+
>
114+
if self.traceEachMessage {
122115
let messageReceivedCounter = Atomic(1)
123-
let onEachPartRecordingSequence = success.bodyParts.map { element in
116+
hookedSequence = HookedRPCAsyncSequence(wrapping: success.bodyParts) { _ in
124117
var event = SpanEvent(name: "rpc.message")
125118
event.attributes.rpc.messageType = "RECEIVED"
126-
event.attributes.rpc.messageID =
127-
messageReceivedCounter
119+
event.attributes.rpc.messageID = messageReceivedCounter
128120
.wrappingAdd(1, ordering: .sequentiallyConsistent)
129121
.oldValue
130122
span.addEvent(event)
131-
return element
132-
}
133-
134-
let onFinishRecordingSequence = OnFinishAsyncSequence(
135-
wrapping: onEachPartRecordingSequence
136-
) {
137-
span.addEvent("Received response end")
123+
} onFinish: {
124+
span.attributes.rpc.grpcStatusCode = 0
125+
} onFailure: { error in
126+
if let rpcError = error as? RPCError {
127+
span.attributes.rpc.grpcStatusCode = rpcError.code.rawValue
128+
}
129+
span.setStatus(SpanStatus(code: .error))
130+
span.recordError(error)
138131
}
139-
140-
success.bodyParts = RPCAsyncSequence(wrapping: onFinishRecordingSequence)
141-
response.accepted = .success(success)
142132
} else {
143-
let onFinishRecordingSequence = OnFinishAsyncSequence(wrapping: success.bodyParts) {
144-
span.addEvent("Received response end")
133+
hookedSequence = HookedRPCAsyncSequence(wrapping: success.bodyParts) { _ in
134+
// Nothing to do if traceEachMessage is false
135+
} onFinish: {
136+
span.attributes.rpc.grpcStatusCode = 0
137+
} onFailure: { error in
138+
if let rpcError = error as? RPCError {
139+
span.attributes.rpc.grpcStatusCode = rpcError.code.rawValue
140+
}
141+
span.setStatus(SpanStatus(code: .error))
142+
span.recordError(error)
145143
}
146-
147-
success.bodyParts = RPCAsyncSequence(wrapping: onFinishRecordingSequence)
148-
response.accepted = .success(success)
149144
}
150145

146+
success.bodyParts = RPCAsyncSequence(wrapping: hookedSequence)
147+
response.accepted = .success(success)
148+
151149
case .failure(let error):
152150
span.attributes.rpc.grpcStatusCode = error.code.rawValue
153151
span.setStatus(SpanStatus(code: .error))
154-
span.addEvent("Received error response")
155152
span.recordError(error)
156153
}
157154

0 commit comments

Comments
 (0)