|
14 | 14 | * limitations under the License.
|
15 | 15 | */
|
16 | 16 | #if compiler(>=5.6)
|
| 17 | +import NIOCore |
17 | 18 |
|
18 | 19 | @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
19 | 20 | extension Call where Request: Sendable, Response: Sendable {
|
20 |
| - internal func makeRequestStreamWriter() -> GRPCAsyncRequestStreamWriter<Request> { |
21 |
| - let delegate = GRPCAsyncRequestStreamWriter<Request>.Delegate( |
22 |
| - compressionEnabled: self.options.messageEncoding.enabledForRequests |
23 |
| - ) { request, metadata in |
24 |
| - self.send(.message(request, metadata), promise: nil) |
25 |
| - } finish: { |
26 |
| - self.send(.end, promise: nil) |
27 |
| - } |
| 21 | + typealias AsyncWriter = NIOAsyncWriter< |
| 22 | + (Request, Compression), |
| 23 | + GRPCAsyncWriterSinkDelegate<(Request, Compression)> |
| 24 | + > |
| 25 | + internal func makeRequestStreamWriter() |
| 26 | + -> (GRPCAsyncRequestStreamWriter<Request>, AsyncWriter.Sink) { |
| 27 | + let delegate = GRPCAsyncWriterSinkDelegate<(Request, Compression)>( |
| 28 | + didYield: { requests in |
| 29 | + for (request, compression) in requests { |
| 30 | + let compress = compression |
| 31 | + .isEnabled(callDefault: self.options.messageEncoding.enabledForRequests) |
| 32 | + |
| 33 | + // TODO: be smarter about inserting flushes. |
| 34 | + // We currently always flush after every write which may trigger more syscalls than necessary. |
| 35 | + let metadata = MessageMetadata(compress: compress, flush: true) |
| 36 | + self.send(.message(request, metadata), promise: nil) |
| 37 | + } |
| 38 | + }, |
| 39 | + didTerminate: { _ in self.send(.end, promise: nil) } |
| 40 | + ) |
| 41 | + |
| 42 | + let writer = NIOAsyncWriter.makeWriter(isWritable: false, delegate: delegate) |
28 | 43 |
|
29 | 44 | // Start as not-writable; writability will be toggled when the stream comes up.
|
30 |
| - return GRPCAsyncRequestStreamWriter(asyncWriter: .init(isWritable: false, delegate: delegate)) |
| 45 | + return (GRPCAsyncRequestStreamWriter<Request>(asyncWriter: writer.writer), writer.sink) |
31 | 46 | }
|
32 | 47 | }
|
33 | 48 |
|
|
0 commit comments