Skip to content

Commit 3835e14

Browse files
authored
Add new Server object (#1721)
Motivation: The server is the main entry point for running gRPC services. It combines a set of transports, services, and interceptors. Modifications: - Add the 'Server' and 'ServerError' Result: Can piece together transports, interceptors and services
1 parent 688287c commit 3835e14

File tree

12 files changed

+1369
-6
lines changed

12 files changed

+1369
-6
lines changed

Sources/GRPCCore/Call/Server/RPCRouter.swift

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,27 @@ public struct RPCRouter: Sendable {
133133
return self.handlers.removeValue(forKey: descriptor) != nil
134134
}
135135
}
136+
137+
@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *)
138+
extension RPCRouter {
139+
internal func handle(
140+
stream: RPCStream<RPCAsyncSequence<RPCRequestPart>, RPCWriter<RPCResponsePart>.Closable>,
141+
interceptors: [any ServerInterceptor]
142+
) async {
143+
if let handler = self.handlers[stream.descriptor] {
144+
await handler.handle(stream: stream, interceptors: interceptors)
145+
} else {
146+
// If this throws then the stream must be closed which we can't do anything about, so ignore
147+
// any error.
148+
try? await stream.outbound.write(.status(.rpcNotImplemented, [:]))
149+
stream.outbound.finish()
150+
}
151+
}
152+
}
153+
154+
extension Status {
155+
fileprivate static let rpcNotImplemented = Status(
156+
code: .unimplemented,
157+
message: "Requested RPC isn't implemented by this server."
158+
)
159+
}

Sources/GRPCCore/Call/Server/ServerRequest.swift

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,24 @@ extension ServerRequest {
7272

7373
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
7474
extension ServerRequest.Stream {
75-
@_spi(Testing)
7675
public init(single request: ServerRequest.Single<Message>) {
7776
self.init(metadata: request.metadata, messages: .one(request.message))
7877
}
7978
}
79+
80+
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
81+
extension ServerRequest.Single {
82+
public init(stream request: ServerRequest.Stream<Message>) async throws {
83+
var iterator = request.messages.makeAsyncIterator()
84+
85+
guard let message = try await iterator.next() else {
86+
throw RPCError(code: .internalError, message: "Empty stream.")
87+
}
88+
89+
guard try await iterator.next() == nil else {
90+
throw RPCError(code: .internalError, message: "Too many messages.")
91+
}
92+
93+
self = ServerRequest.Single(metadata: request.metadata, message: message)
94+
}
95+
}

Sources/GRPCCore/Call/Server/ServerResponse.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,6 @@ extension ServerResponse.Stream {
327327

328328
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
329329
extension ServerResponse.Stream {
330-
@_spi(Testing)
331330
public init(single response: ServerResponse.Single<Message>) {
332331
switch response.accepted {
333332
case .success(let contents):

0 commit comments

Comments
 (0)