Skip to content

Commit 1a9f7e2

Browse files
authored
Merge pull request #201 from MrMage/wait-timeout
Add a `timeout` option to blocking `send()` and `receive()` calls
2 parents 328c05c + ca74075 commit 1a9f7e2

15 files changed

+140
-37
lines changed

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ test-echo: all
2424
kill -9 `cat echo.pid`
2525
diff -u test.out Sources/Examples/Echo/test.gold
2626

27-
test-plugin: all
27+
test-plugin:
28+
swift build -v $(CFLAGS) --product protoc-gen-swiftgrpc
2829
protoc Sources/Examples/Echo/echo.proto --proto_path=Sources/Examples/Echo --plugin=.build/debug/protoc-gen-swift --plugin=.build/debug/protoc-gen-swiftgrpc --swiftgrpc_out=/tmp --swiftgrpc_opt=TestStubs=true
2930
diff -u /tmp/echo.grpc.swift Sources/Examples/Echo/Generated/echo.grpc.swift
3031

Sources/Examples/Echo/Generated/echo.grpc.swift

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,17 @@ fileprivate final class Echo_EchoGetCallBase: ClientCallUnaryBase<Echo_EchoReque
3232
}
3333

3434
internal protocol Echo_EchoExpandCall: ClientCallServerStreaming {
35-
/// Call this to wait for a result. Blocking.
36-
func receive() throws -> Echo_EchoResponse?
35+
/// Do not call this directly, call `receive()` in the protocol extension below instead.
36+
func _receive(timeout: DispatchTime) throws -> Echo_EchoResponse?
3737
/// Call this to wait for a result. Nonblocking.
3838
func receive(completion: @escaping (ResultOrRPCError<Echo_EchoResponse?>) -> Void) throws
3939
}
4040

41+
internal extension Echo_EchoExpandCall {
42+
/// Call this to wait for a result. Blocking.
43+
func receive(timeout: DispatchTime = .distantFuture) throws -> Echo_EchoResponse? { return try self._receive(timeout: timeout) }
44+
}
45+
4146
fileprivate final class Echo_EchoExpandCallBase: ClientCallServerStreamingBase<Echo_EchoRequest, Echo_EchoResponse>, Echo_EchoExpandCall {
4247
override class var method: String { return "/echo.Echo/Expand" }
4348
}
@@ -49,15 +54,20 @@ class Echo_EchoExpandCallTestStub: ClientCallServerStreamingTestStub<Echo_EchoRe
4954
internal protocol Echo_EchoCollectCall: ClientCallClientStreaming {
5055
/// Send a message to the stream. Nonblocking.
5156
func send(_ message: Echo_EchoRequest, completion: @escaping (Error?) -> Void) throws
52-
/// Send a message to the stream and wait for the send operation to finish. Blocking.
53-
func send(_ message: Echo_EchoRequest) throws
57+
/// Do not call this directly, call `send()` in the protocol extension below instead.
58+
func _send(_ message: Echo_EchoRequest, timeout: DispatchTime) throws
5459

5560
/// Call this to close the connection and wait for a response. Blocking.
5661
func closeAndReceive() throws -> Echo_EchoResponse
5762
/// Call this to close the connection and wait for a response. Nonblocking.
5863
func closeAndReceive(completion: @escaping (ResultOrRPCError<Echo_EchoResponse>) -> Void) throws
5964
}
6065

66+
internal extension Echo_EchoCollectCall {
67+
/// Send a message to the stream and wait for the send operation to finish. Blocking.
68+
func send(_ message: Echo_EchoRequest, timeout: DispatchTime = .distantFuture) throws { try self._send(message, timeout: timeout) }
69+
}
70+
6171
fileprivate final class Echo_EchoCollectCallBase: ClientCallClientStreamingBase<Echo_EchoRequest, Echo_EchoResponse>, Echo_EchoCollectCall {
6272
override class var method: String { return "/echo.Echo/Collect" }
6373
}
@@ -69,22 +79,32 @@ class Echo_EchoCollectCallTestStub: ClientCallClientStreamingTestStub<Echo_EchoR
6979
}
7080

7181
internal protocol Echo_EchoUpdateCall: ClientCallBidirectionalStreaming {
72-
/// Call this to wait for a result. Blocking.
73-
func receive() throws -> Echo_EchoResponse?
82+
/// Do not call this directly, call `receive()` in the protocol extension below instead.
83+
func _receive(timeout: DispatchTime) throws -> Echo_EchoResponse?
7484
/// Call this to wait for a result. Nonblocking.
7585
func receive(completion: @escaping (ResultOrRPCError<Echo_EchoResponse?>) -> Void) throws
7686

7787
/// Send a message to the stream. Nonblocking.
7888
func send(_ message: Echo_EchoRequest, completion: @escaping (Error?) -> Void) throws
79-
/// Send a message to the stream and wait for the send operation to finish. Blocking.
80-
func send(_ message: Echo_EchoRequest) throws
89+
/// Do not call this directly, call `send()` in the protocol extension below instead.
90+
func _send(_ message: Echo_EchoRequest, timeout: DispatchTime) throws
8191

8292
/// Call this to close the sending connection. Blocking.
8393
func closeSend() throws
8494
/// Call this to close the sending connection. Nonblocking.
8595
func closeSend(completion: (() -> Void)?) throws
8696
}
8797

98+
internal extension Echo_EchoUpdateCall {
99+
/// Call this to wait for a result. Blocking.
100+
func receive(timeout: DispatchTime = .distantFuture) throws -> Echo_EchoResponse? { return try self._receive(timeout: timeout) }
101+
}
102+
103+
internal extension Echo_EchoUpdateCall {
104+
/// Send a message to the stream and wait for the send operation to finish. Blocking.
105+
func send(_ message: Echo_EchoRequest, timeout: DispatchTime = .distantFuture) throws { try self._send(message, timeout: timeout) }
106+
}
107+
88108
fileprivate final class Echo_EchoUpdateCallBase: ClientCallBidirectionalStreamingBase<Echo_EchoRequest, Echo_EchoResponse>, Echo_EchoUpdateCall {
89109
override class var method: String { return "/echo.Echo/Update" }
90110
}
@@ -207,21 +227,26 @@ class Echo_EchoGetSessionTestStub: ServerSessionUnaryTestStub, Echo_EchoGetSessi
207227
internal protocol Echo_EchoExpandSession: ServerSessionServerStreaming {
208228
/// Send a message to the stream. Nonblocking.
209229
func send(_ message: Echo_EchoResponse, completion: @escaping (Error?) -> Void) throws
210-
/// Send a message to the stream and wait for the send operation to finish. Blocking.
211-
func send(_ message: Echo_EchoResponse) throws
230+
/// Do not call this directly, call `send()` in the protocol extension below instead.
231+
func _send(_ message: Echo_EchoResponse, timeout: DispatchTime) throws
212232

213233
/// Close the connection and send the status. Non-blocking.
214234
/// You MUST call this method once you are done processing the request.
215235
func close(withStatus status: ServerStatus, completion: (() -> Void)?) throws
216236
}
217237

238+
internal extension Echo_EchoExpandSession {
239+
/// Send a message to the stream and wait for the send operation to finish. Blocking.
240+
func send(_ message: Echo_EchoResponse, timeout: DispatchTime = .distantFuture) throws { try self._send(message, timeout: timeout) }
241+
}
242+
218243
fileprivate final class Echo_EchoExpandSessionBase: ServerSessionServerStreamingBase<Echo_EchoRequest, Echo_EchoResponse>, Echo_EchoExpandSession {}
219244

220245
class Echo_EchoExpandSessionTestStub: ServerSessionServerStreamingTestStub<Echo_EchoResponse>, Echo_EchoExpandSession {}
221246

222247
internal protocol Echo_EchoCollectSession: ServerSessionClientStreaming {
223-
/// Call this to wait for a result. Blocking.
224-
func receive() throws -> Echo_EchoRequest?
248+
/// Do not call this directly, call `receive()` in the protocol extension below instead.
249+
func _receive(timeout: DispatchTime) throws -> Echo_EchoRequest?
225250
/// Call this to wait for a result. Nonblocking.
226251
func receive(completion: @escaping (ResultOrRPCError<Echo_EchoRequest?>) -> Void) throws
227252

@@ -234,26 +259,41 @@ internal protocol Echo_EchoCollectSession: ServerSessionClientStreaming {
234259
func sendErrorAndClose(status: ServerStatus, completion: (() -> Void)?) throws
235260
}
236261

262+
internal extension Echo_EchoCollectSession {
263+
/// Call this to wait for a result. Blocking.
264+
func receive(timeout: DispatchTime = .distantFuture) throws -> Echo_EchoRequest? { return try self._receive(timeout: timeout) }
265+
}
266+
237267
fileprivate final class Echo_EchoCollectSessionBase: ServerSessionClientStreamingBase<Echo_EchoRequest, Echo_EchoResponse>, Echo_EchoCollectSession {}
238268

239269
class Echo_EchoCollectSessionTestStub: ServerSessionClientStreamingTestStub<Echo_EchoRequest, Echo_EchoResponse>, Echo_EchoCollectSession {}
240270

241271
internal protocol Echo_EchoUpdateSession: ServerSessionBidirectionalStreaming {
242-
/// Call this to wait for a result. Blocking.
243-
func receive() throws -> Echo_EchoRequest?
272+
/// Do not call this directly, call `receive()` in the protocol extension below instead.
273+
func _receive(timeout: DispatchTime) throws -> Echo_EchoRequest?
244274
/// Call this to wait for a result. Nonblocking.
245275
func receive(completion: @escaping (ResultOrRPCError<Echo_EchoRequest?>) -> Void) throws
246276

247277
/// Send a message to the stream. Nonblocking.
248278
func send(_ message: Echo_EchoResponse, completion: @escaping (Error?) -> Void) throws
249-
/// Send a message to the stream and wait for the send operation to finish. Blocking.
250-
func send(_ message: Echo_EchoResponse) throws
279+
/// Do not call this directly, call `send()` in the protocol extension below instead.
280+
func _send(_ message: Echo_EchoResponse, timeout: DispatchTime) throws
251281

252282
/// Close the connection and send the status. Non-blocking.
253283
/// You MUST call this method once you are done processing the request.
254284
func close(withStatus status: ServerStatus, completion: (() -> Void)?) throws
255285
}
256286

287+
internal extension Echo_EchoUpdateSession {
288+
/// Call this to wait for a result. Blocking.
289+
func receive(timeout: DispatchTime = .distantFuture) throws -> Echo_EchoRequest? { return try self._receive(timeout: timeout) }
290+
}
291+
292+
internal extension Echo_EchoUpdateSession {
293+
/// Send a message to the stream and wait for the send operation to finish. Blocking.
294+
func send(_ message: Echo_EchoResponse, timeout: DispatchTime = .distantFuture) throws { try self._send(message, timeout: timeout) }
295+
}
296+
257297
fileprivate final class Echo_EchoUpdateSessionBase: ServerSessionBidirectionalStreamingBase<Echo_EchoRequest, Echo_EchoResponse>, Echo_EchoUpdateSession {}
258298

259299
class Echo_EchoUpdateSessionTestStub: ServerSessionBidirectionalStreamingTestStub<Echo_EchoRequest, Echo_EchoResponse>, Echo_EchoUpdateSession {}

Sources/SwiftGRPC/Runtime/ClientCallBidirectionalStreaming.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,20 +59,20 @@ open class ClientCallBidirectionalStreamingTestStub<InputType: Message, OutputTy
5959

6060
public init() {}
6161

62-
open func receive() throws -> OutputType? {
62+
open func _receive(timeout: DispatchTime) throws -> OutputType? {
6363
defer { if !outputs.isEmpty { outputs.removeFirst() } }
6464
return outputs.first
6565
}
6666

6767
open func receive(completion: @escaping (ResultOrRPCError<OutputType?>) -> Void) throws {
68-
completion(.result(try self.receive()))
68+
completion(.result(try self._receive(timeout: .distantFuture)))
6969
}
7070

7171
open func send(_ message: InputType, completion _: @escaping (Error?) -> Void) throws {
7272
inputs.append(message)
7373
}
7474

75-
open func send(_ message: InputType) throws {
75+
open func _send(_ message: InputType, timeout: DispatchTime) throws {
7676
inputs.append(message)
7777
}
7878

Sources/SwiftGRPC/Runtime/ClientCallClientStreaming.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ open class ClientCallClientStreamingTestStub<InputType: Message, OutputType: Mes
7676
inputs.append(message)
7777
}
7878

79-
open func send(_ message: InputType) throws {
79+
open func _send(_ message: InputType, timeout: DispatchTime) throws {
8080
inputs.append(message)
8181
}
8282

Sources/SwiftGRPC/Runtime/ClientCallServerStreaming.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,13 @@ open class ClientCallServerStreamingTestStub<OutputType: Message>: ClientCallSer
4545

4646
public init() {}
4747

48-
open func receive() throws -> OutputType? {
48+
open func _receive(timeout: DispatchTime) throws -> OutputType? {
4949
defer { if !outputs.isEmpty { outputs.removeFirst() } }
5050
return outputs.first
5151
}
5252

5353
open func receive(completion: @escaping (ResultOrRPCError<OutputType?>) -> Void) throws {
54-
completion(.result(try self.receive()))
54+
completion(.result(try self._receive(timeout: .distantFuture)))
5555
}
5656

5757
open func cancel() {}

Sources/SwiftGRPC/Runtime/RPCError.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ import Foundation
1919
/// Type for errors thrown from generated client code.
2020
public enum RPCError: Error {
2121
case invalidMessageReceived
22+
case timedOut
2223
case callError(CallResult)
2324
}
2425

2526
public extension RPCError {
2627
var callResult: CallResult? {
2728
switch self {
28-
case .invalidMessageReceived: return nil
29+
case .invalidMessageReceived, .timedOut: return nil
2930
case .callError(let callResult): return callResult
3031
}
3132
}

Sources/SwiftGRPC/Runtime/ServerSessionBidirectionalStreaming.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,20 +69,20 @@ open class ServerSessionBidirectionalStreamingTestStub<InputType: Message, Outpu
6969
open var outputs: [OutputType] = []
7070
open var status: ServerStatus?
7171

72-
open func receive() throws -> InputType? {
72+
open func _receive(timeout: DispatchTime) throws -> InputType? {
7373
defer { if !inputs.isEmpty { inputs.removeFirst() } }
7474
return inputs.first
7575
}
7676

7777
open func receive(completion: @escaping (ResultOrRPCError<InputType?>) -> Void) throws {
78-
completion(.result(try self.receive()))
78+
completion(.result(try self._receive(timeout: .distantFuture)))
7979
}
8080

8181
open func send(_ message: OutputType, completion _: @escaping (Error?) -> Void) throws {
8282
outputs.append(message)
8383
}
8484

85-
open func send(_ message: OutputType) throws {
85+
open func _send(_ message: OutputType, timeout: DispatchTime) throws {
8686
outputs.append(message)
8787
}
8888

Sources/SwiftGRPC/Runtime/ServerSessionClientStreaming.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,13 @@ open class ServerSessionClientStreamingTestStub<InputType: Message, OutputType:
7575
open var output: OutputType?
7676
open var status: ServerStatus?
7777

78-
open func receive() throws -> InputType? {
78+
open func _receive(timeout: DispatchTime) throws -> InputType? {
7979
defer { if !inputs.isEmpty { inputs.removeFirst() } }
8080
return inputs.first
8181
}
8282

8383
open func receive(completion: @escaping (ResultOrRPCError<InputType?>) -> Void) throws {
84-
completion(.result(try self.receive()))
84+
completion(.result(try self._receive(timeout: .distantFuture)))
8585
}
8686

8787
open func sendAndClose(response: OutputType, status: ServerStatus, completion: (() -> Void)?) throws {

Sources/SwiftGRPC/Runtime/ServerSessionServerStreaming.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ open class ServerSessionServerStreamingTestStub<OutputType: Message>: ServerSess
7272
outputs.append(message)
7373
}
7474

75-
open func send(_ message: OutputType) throws {
75+
open func _send(_ message: OutputType, timeout: DispatchTime) throws {
7676
outputs.append(message)
7777
}
7878

Sources/SwiftGRPC/Runtime/StreamReceiving.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,16 @@ extension StreamReceiving {
4343
}
4444
}
4545

46-
public func receive() throws -> ReceivedType? {
46+
public func _receive(timeout: DispatchTime) throws -> ReceivedType? {
4747
var result: ResultOrRPCError<ReceivedType?>?
4848
let sem = DispatchSemaphore(value: 0)
4949
try receive {
5050
result = $0
5151
sem.signal()
5252
}
53-
_ = sem.wait()
53+
if sem.wait(timeout: timeout) == .timedOut {
54+
throw RPCError.timedOut
55+
}
5456
switch result! {
5557
case .result(let response): return response
5658
case .error(let error): throw error

0 commit comments

Comments
 (0)