Skip to content

Commit 0046afa

Browse files
authored
Bring HealthService API in line with ReflectionService (#24)
Motivation: The reflection service is a single type called `ReflectionService`. The Health service is a single type (called `Health`) holding an instance of the `HealthService` and a type to provide values to the service. This indirection isn't necessary, the health service can just hold it instead. This makes the API more like the reflection service and removes indirection. Modifications: - Remove indirection from health service and rename from 'Health' to 'HealthService'. - Make the reflection service a struct (it was a class for no good reason). Result: More consistent API
1 parent 60fc75f commit 0046afa

File tree

5 files changed

+222
-228
lines changed

5 files changed

+222
-228
lines changed

Sources/GRPCHealthService/Health.swift

Lines changed: 0 additions & 125 deletions
This file was deleted.
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Copyright 2024, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
internal import GRPCCore
18+
private import Synchronization
19+
20+
extension HealthService {
21+
internal struct Service: Grpc_Health_V1_Health.ServiceProtocol {
22+
private let state = Self.State()
23+
}
24+
}
25+
26+
extension HealthService.Service {
27+
func check(
28+
request: ServerRequest<Grpc_Health_V1_HealthCheckRequest>,
29+
context: ServerContext
30+
) async throws -> ServerResponse<Grpc_Health_V1_HealthCheckResponse> {
31+
let service = request.message.service
32+
33+
guard let status = self.state.currentStatus(ofService: service) else {
34+
throw RPCError(code: .notFound, message: "Requested service unknown.")
35+
}
36+
37+
var response = Grpc_Health_V1_HealthCheckResponse()
38+
response.status = status
39+
40+
return ServerResponse(message: response)
41+
}
42+
43+
func watch(
44+
request: ServerRequest<Grpc_Health_V1_HealthCheckRequest>,
45+
context: ServerContext
46+
) async -> StreamingServerResponse<Grpc_Health_V1_HealthCheckResponse> {
47+
let service = request.message.service
48+
let statuses = AsyncStream.makeStream(of: Grpc_Health_V1_HealthCheckResponse.ServingStatus.self)
49+
50+
self.state.addContinuation(statuses.continuation, forService: service)
51+
52+
return StreamingServerResponse(of: Grpc_Health_V1_HealthCheckResponse.self) { writer in
53+
var response = Grpc_Health_V1_HealthCheckResponse()
54+
55+
for await status in statuses.stream {
56+
response.status = status
57+
try await writer.write(response)
58+
}
59+
60+
return [:]
61+
}
62+
}
63+
64+
func updateStatus(
65+
_ status: Grpc_Health_V1_HealthCheckResponse.ServingStatus,
66+
forService service: String
67+
) {
68+
self.state.updateStatus(status, forService: service)
69+
}
70+
}
71+
72+
extension HealthService.Service {
73+
private final class State: Sendable {
74+
// The state of each service keyed by the fully qualified service name.
75+
private let lockedStorage = Mutex([String: ServiceState]())
76+
77+
fileprivate func currentStatus(
78+
ofService service: String
79+
) -> Grpc_Health_V1_HealthCheckResponse.ServingStatus? {
80+
return self.lockedStorage.withLock { $0[service]?.currentStatus }
81+
}
82+
83+
fileprivate func updateStatus(
84+
_ status: Grpc_Health_V1_HealthCheckResponse.ServingStatus,
85+
forService service: String
86+
) {
87+
self.lockedStorage.withLock { storage in
88+
storage[service, default: ServiceState(status: status)].updateStatus(status)
89+
}
90+
}
91+
92+
fileprivate func addContinuation(
93+
_ continuation: AsyncStream<Grpc_Health_V1_HealthCheckResponse.ServingStatus>.Continuation,
94+
forService service: String
95+
) {
96+
self.lockedStorage.withLock { storage in
97+
storage[service, default: ServiceState(status: .serviceUnknown)]
98+
.addContinuation(continuation)
99+
}
100+
}
101+
}
102+
103+
// Encapsulates the current status of a service and the continuations of its watch streams.
104+
private struct ServiceState: Sendable {
105+
private(set) var currentStatus: Grpc_Health_V1_HealthCheckResponse.ServingStatus
106+
private var continuations:
107+
[AsyncStream<Grpc_Health_V1_HealthCheckResponse.ServingStatus>.Continuation]
108+
109+
fileprivate mutating func updateStatus(
110+
_ status: Grpc_Health_V1_HealthCheckResponse.ServingStatus
111+
) {
112+
guard status != self.currentStatus else {
113+
return
114+
}
115+
116+
self.currentStatus = status
117+
118+
for continuation in self.continuations {
119+
continuation.yield(status)
120+
}
121+
}
122+
123+
fileprivate mutating func addContinuation(
124+
_ continuation: AsyncStream<Grpc_Health_V1_HealthCheckResponse.ServingStatus>.Continuation
125+
) {
126+
self.continuations.append(continuation)
127+
continuation.yield(self.currentStatus)
128+
}
129+
130+
fileprivate init(status: Grpc_Health_V1_HealthCheckResponse.ServingStatus = .unknown) {
131+
self.currentStatus = status
132+
self.continuations = []
133+
}
134+
}
135+
}

0 commit comments

Comments
 (0)