Skip to content

Commit d2a1480

Browse files
committed
Restructure in-process transport
Motivation: To create a connected pair of `ServerTransport` and `ClientTransport`, users need to call `InProcessTransport.makePair(serviceConfig:)` which returns a tuple. The spelling of this API can be more intuitive and simple. Modifications: - Restructure `InProcessTransport` to include two properties: `server` (the `ServerTransport`) and `client` (the `ClientTransport`), and an `init(serviceConfig:)` that pairs these properties. - Add missing GRPCInProcessTransport dependency to GRPCCore test target. Result: The spelling of the `InProcessTransport` API will be better.
1 parent cf29048 commit d2a1480

14 files changed

+477
-478
lines changed

Package.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ let targets: [Target] = [
6565
name: "GRPCCoreTests",
6666
dependencies: [
6767
.target(name: "GRPCCore"),
68+
.target(name: "GRPCInProcessTransport"),
6869
.product(name: "SwiftProtobuf", package: "swift-protobuf")
6970
],
7071
resources: [

Sources/GRPCCore/GRPCClient.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ private import Synchronization
6565
/// )
6666
///
6767
/// // Finally create a transport and instantiate the client, adding an interceptor.
68-
/// let inProcessServerTransport = InProcessServerTransport()
69-
/// let inProcessClientTransport = InProcessClientTransport(serverTransport: inProcessServerTransport)
68+
/// let inProcessServerTransport = InProcessTransport.Server()
69+
/// let inProcessClientTransport = InProcessTransport.Client(serverTransport: inProcessServerTransport)
7070
///
7171
/// let client = GRPCClient(
7272
/// transport: inProcessClientTransport,

Sources/GRPCCore/GRPCServer.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ private import Synchronization
3535
///
3636
/// ```swift
3737
/// // Create and an in-process transport.
38-
/// let inProcessTransport = InProcessServerTransport()
38+
/// let inProcessTransport = InProcessTransport.Server()
3939
///
4040
/// // Create the 'Greeter' and 'Echo' services.
4141
/// let greeter = GreeterService()
Lines changed: 356 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,356 @@
1+
/*
2+
* Copyright 2024, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
public import GRPCCore
18+
private import Synchronization
19+
20+
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
21+
extension InProcessTransport {
22+
/// An in-process implementation of a ``ClientTransport``.
23+
///
24+
/// This is useful when you're interested in testing your application without any actual networking layers
25+
/// involved, as the client and server will communicate directly with each other via in-process streams.
26+
///
27+
/// To use this client, you'll have to provide a ``ServerTransport`` upon creation, as well
28+
/// as a ``ServiceConfig``.
29+
///
30+
/// Once you have a client, you must keep a long-running task executing ``connect()``, which
31+
/// will return only once all streams have been finished and ``beginGracefulShutdown()`` has been called on this client; or
32+
/// when the containing task is cancelled.
33+
///
34+
/// To execute requests using this client, use ``withStream(descriptor:options:_:)``. If this function is
35+
/// called before ``connect()`` is called, then any streams will remain pending and the call will
36+
/// block until ``connect()`` is called or the task is cancelled.
37+
///
38+
/// - SeeAlso: ``ClientTransport``
39+
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
40+
public final class Client: ClientTransport {
41+
private enum State: Sendable {
42+
struct UnconnectedState {
43+
var serverTransport: InProcessTransport.Server
44+
var pendingStreams: [AsyncStream<Void>.Continuation]
45+
46+
init(serverTransport: InProcessTransport.Server) {
47+
self.serverTransport = serverTransport
48+
self.pendingStreams = []
49+
}
50+
}
51+
52+
struct ConnectedState {
53+
var serverTransport: InProcessTransport.Server
54+
var nextStreamID: Int
55+
var openStreams:
56+
[Int: (
57+
RPCStream<Inbound, Outbound>,
58+
RPCStream<
59+
RPCAsyncSequence<RPCRequestPart, any Error>, RPCWriter<RPCResponsePart>.Closable
60+
>
61+
)]
62+
var signalEndContinuation: AsyncStream<Void>.Continuation
63+
64+
init(
65+
fromUnconnected state: UnconnectedState,
66+
signalEndContinuation: AsyncStream<Void>.Continuation
67+
) {
68+
self.serverTransport = state.serverTransport
69+
self.nextStreamID = 0
70+
self.openStreams = [:]
71+
self.signalEndContinuation = signalEndContinuation
72+
}
73+
}
74+
75+
struct ClosedState {
76+
var openStreams:
77+
[Int: (
78+
RPCStream<Inbound, Outbound>,
79+
RPCStream<
80+
RPCAsyncSequence<RPCRequestPart, any Error>, RPCWriter<RPCResponsePart>.Closable
81+
>
82+
)]
83+
var signalEndContinuation: AsyncStream<Void>.Continuation?
84+
85+
init() {
86+
self.openStreams = [:]
87+
self.signalEndContinuation = nil
88+
}
89+
90+
init(fromConnected state: ConnectedState) {
91+
self.openStreams = state.openStreams
92+
self.signalEndContinuation = state.signalEndContinuation
93+
}
94+
}
95+
96+
case unconnected(UnconnectedState)
97+
case connected(ConnectedState)
98+
case closed(ClosedState)
99+
}
100+
101+
public typealias Inbound = RPCAsyncSequence<RPCResponsePart, any Error>
102+
public typealias Outbound = RPCWriter<RPCRequestPart>.Closable
103+
104+
public let retryThrottle: RetryThrottle?
105+
106+
private let methodConfig: MethodConfigs
107+
private let state: Mutex<State>
108+
109+
/// Creates a new in-process client transport.
110+
///
111+
/// - Parameters:
112+
/// - server: The in-process server transport to connect to.
113+
/// - serviceConfig: Service configuration.
114+
public init(
115+
server: InProcessTransport.Server,
116+
serviceConfig: ServiceConfig = ServiceConfig()
117+
) {
118+
self.retryThrottle = serviceConfig.retryThrottling.map { RetryThrottle(policy: $0) }
119+
self.methodConfig = MethodConfigs(serviceConfig: serviceConfig)
120+
self.state = Mutex(.unconnected(.init(serverTransport: server)))
121+
}
122+
123+
/// Establish and maintain a connection to the remote destination.
124+
///
125+
/// Maintains a long-lived connection, or set of connections, to a remote destination.
126+
/// Connections may be added or removed over time as required by the implementation and the
127+
/// demand for streams by the client.
128+
///
129+
/// Implementations of this function will typically create a long-lived task group which
130+
/// maintains connections. The function exits when all open streams have been closed and new connections
131+
/// are no longer required by the caller who signals this by calling ``beginGracefulShutdown()``, or by cancelling the
132+
/// task this function runs in.
133+
public func connect() async throws {
134+
let (stream, continuation) = AsyncStream<Void>.makeStream()
135+
try self.state.withLock { state in
136+
switch state {
137+
case .unconnected(let unconnectedState):
138+
state = .connected(
139+
.init(
140+
fromUnconnected: unconnectedState,
141+
signalEndContinuation: continuation
142+
)
143+
)
144+
for pendingStream in unconnectedState.pendingStreams {
145+
pendingStream.finish()
146+
}
147+
case .connected:
148+
throw RPCError(
149+
code: .failedPrecondition,
150+
message: "Already connected to server."
151+
)
152+
case .closed:
153+
throw RPCError(
154+
code: .failedPrecondition,
155+
message: "Can't connect to server, transport is closed."
156+
)
157+
}
158+
}
159+
160+
for await _ in stream {
161+
// This for-await loop will exit (and thus `connect()` will return)
162+
// only when the task is cancelled, or when the stream's continuation is
163+
// finished - whichever happens first.
164+
// The continuation will be finished when `close()` is called and there
165+
// are no more open streams.
166+
}
167+
168+
// If at this point there are any open streams, it's because Cancellation
169+
// occurred and all open streams must now be closed.
170+
let openStreams = self.state.withLock { state in
171+
switch state {
172+
case .unconnected:
173+
// We have transitioned to connected, and we can't transition back.
174+
fatalError("Invalid state")
175+
case .connected(let connectedState):
176+
state = .closed(.init())
177+
return connectedState.openStreams.values
178+
case .closed(let closedState):
179+
return closedState.openStreams.values
180+
}
181+
}
182+
183+
for (clientStream, serverStream) in openStreams {
184+
await clientStream.outbound.finish(throwing: CancellationError())
185+
await serverStream.outbound.finish(throwing: CancellationError())
186+
}
187+
}
188+
189+
/// Signal to the transport that no new streams may be created.
190+
///
191+
/// Existing streams may run to completion naturally but calling ``withStream(descriptor:options:_:)``
192+
/// will result in an ``RPCError`` with code ``RPCError/Code/failedPrecondition`` being thrown.
193+
///
194+
/// If you want to forcefully cancel all active streams then cancel the task running ``connect()``.
195+
public func beginGracefulShutdown() {
196+
let maybeContinuation: AsyncStream<Void>.Continuation? = self.state.withLock { state in
197+
switch state {
198+
case .unconnected:
199+
state = .closed(.init())
200+
return nil
201+
case .connected(let connectedState):
202+
if connectedState.openStreams.count == 0 {
203+
state = .closed(.init())
204+
return connectedState.signalEndContinuation
205+
} else {
206+
state = .closed(.init(fromConnected: connectedState))
207+
return nil
208+
}
209+
case .closed:
210+
return nil
211+
}
212+
}
213+
maybeContinuation?.finish()
214+
}
215+
216+
/// Opens a stream using the transport, and uses it as input into a user-provided closure.
217+
///
218+
/// - Important: The opened stream is closed after the closure is finished.
219+
///
220+
/// This transport implementation throws ``RPCError/Code/failedPrecondition`` if the transport
221+
/// is closing or has been closed.
222+
///
223+
/// This implementation will queue any streams (and thus block this call) if this function is called before
224+
/// ``connect()``, until a connection is established - at which point all streams will be
225+
/// created.
226+
///
227+
/// - Parameters:
228+
/// - descriptor: A description of the method to open a stream for.
229+
/// - options: Options specific to the stream.
230+
/// - closure: A closure that takes the opened stream as parameter.
231+
/// - Returns: Whatever value was returned from `closure`.
232+
public func withStream<T>(
233+
descriptor: MethodDescriptor,
234+
options: CallOptions,
235+
_ closure: (RPCStream<Inbound, Outbound>) async throws -> T
236+
) async throws -> T {
237+
let request = GRPCAsyncThrowingStream.makeStream(of: RPCRequestPart.self)
238+
let response = GRPCAsyncThrowingStream.makeStream(of: RPCResponsePart.self)
239+
240+
let clientStream = RPCStream(
241+
descriptor: descriptor,
242+
inbound: RPCAsyncSequence(wrapping: response.stream),
243+
outbound: RPCWriter.Closable(wrapping: request.continuation)
244+
)
245+
246+
let serverStream = RPCStream(
247+
descriptor: descriptor,
248+
inbound: RPCAsyncSequence(wrapping: request.stream),
249+
outbound: RPCWriter.Closable(wrapping: response.continuation)
250+
)
251+
252+
let waitForConnectionStream: AsyncStream<Void>? = self.state.withLock { state in
253+
if case .unconnected(var unconnectedState) = state {
254+
let (stream, continuation) = AsyncStream<Void>.makeStream()
255+
unconnectedState.pendingStreams.append(continuation)
256+
state = .unconnected(unconnectedState)
257+
return stream
258+
}
259+
return nil
260+
}
261+
262+
if let waitForConnectionStream {
263+
for await _ in waitForConnectionStream {
264+
// This loop will exit either when the task is cancelled or when the
265+
// client connects and this stream can be opened.
266+
}
267+
try Task.checkCancellation()
268+
}
269+
270+
let acceptStream: Result<Int, RPCError> = self.state.withLock { state in
271+
switch state {
272+
case .unconnected:
273+
// The state cannot be unconnected because if it was, then the above
274+
// for-await loop on `pendingStream` would have not returned.
275+
// The only other option is for the task to have been cancelled,
276+
// and that's why we check for cancellation right after the loop.
277+
fatalError("Invalid state.")
278+
279+
case .connected(var connectedState):
280+
let streamID = connectedState.nextStreamID
281+
do {
282+
try connectedState.serverTransport.acceptStream(serverStream)
283+
connectedState.openStreams[streamID] = (clientStream, serverStream)
284+
connectedState.nextStreamID += 1
285+
state = .connected(connectedState)
286+
return .success(streamID)
287+
} catch let acceptStreamError as RPCError {
288+
return .failure(acceptStreamError)
289+
} catch {
290+
return .failure(RPCError(code: .unknown, message: "Unknown error: \(error)."))
291+
}
292+
293+
case .closed:
294+
let error = RPCError(code: .failedPrecondition, message: "The client transport is closed.")
295+
return .failure(error)
296+
}
297+
}
298+
299+
switch acceptStream {
300+
case .success(let streamID):
301+
let streamHandlingResult: Result<T, any Error>
302+
do {
303+
let result = try await closure(clientStream)
304+
streamHandlingResult = .success(result)
305+
} catch {
306+
streamHandlingResult = .failure(error)
307+
}
308+
309+
await clientStream.outbound.finish()
310+
self.removeStream(id: streamID)
311+
312+
return try streamHandlingResult.get()
313+
314+
case .failure(let error):
315+
await serverStream.outbound.finish(throwing: error)
316+
await clientStream.outbound.finish(throwing: error)
317+
throw error
318+
}
319+
}
320+
321+
private func removeStream(id streamID: Int) {
322+
let maybeEndContinuation = self.state.withLock { state in
323+
switch state {
324+
case .unconnected:
325+
// The state cannot be unconnected at this point, because if we made
326+
// it this far, it's because the transport was connected.
327+
// Once connected, it's impossible to transition back to unconnected,
328+
// so this is an invalid state.
329+
fatalError("Invalid state")
330+
case .connected(var connectedState):
331+
connectedState.openStreams.removeValue(forKey: streamID)
332+
state = .connected(connectedState)
333+
case .closed(var closedState):
334+
closedState.openStreams.removeValue(forKey: streamID)
335+
state = .closed(closedState)
336+
if closedState.openStreams.isEmpty {
337+
// This was the last open stream: signal the closure of the client.
338+
return closedState.signalEndContinuation
339+
}
340+
}
341+
return nil
342+
}
343+
maybeEndContinuation?.finish()
344+
}
345+
346+
/// Returns the execution configuration for a given method.
347+
///
348+
/// - Parameter descriptor: The method to lookup configuration for.
349+
/// - Returns: Execution configuration for the method, if it exists.
350+
public func config(
351+
forMethod descriptor: MethodDescriptor
352+
) -> MethodConfig? {
353+
self.methodConfig[descriptor]
354+
}
355+
}
356+
}

0 commit comments

Comments
 (0)