Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import GRPCCore
import GRPCProtobuf
import SwiftProtobuf

// MARK: - grpc.testing.BenchmarkService

Expand Down Expand Up @@ -437,7 +436,7 @@ extension Grpc_Testing_BenchmarkService {

// Default implementation of 'registerMethods(with:)'.
extension Grpc_Testing_BenchmarkService.StreamingServiceProtocol {
internal func registerMethods(with router: inout GRPCCore.RPCRouter) {
internal func registerMethods<Transport>(with router: inout GRPCCore.RPCRouter<Transport>) where Transport: GRPCCore.ServerTransport {
router.registerHandler(
forMethod: Grpc_Testing_BenchmarkService.Method.UnaryCall.descriptor,
deserializer: GRPCProtobuf.ProtobufDeserializer<Grpc_Testing_SimpleRequest>(),
Expand Down Expand Up @@ -747,14 +746,14 @@ extension Grpc_Testing_BenchmarkService {
/// The ``Client`` provides an implementation of ``ClientProtocol`` which wraps
/// a `GRPCCore.GRPCCClient`. The underlying `GRPCClient` provides the long-lived
/// means of communication with the remote peer.
internal struct Client: ClientProtocol {
private let client: GRPCCore.GRPCClient
internal struct Client<Transport>: ClientProtocol where Transport: GRPCCore.ClientTransport {
private let client: GRPCCore.GRPCClient<Transport>

/// Creates a new client wrapping the provided `GRPCCore.GRPCClient`.
///
/// - Parameters:
/// - client: A `GRPCCore.GRPCClient` providing a communication channel to the service.
internal init(wrapping client: GRPCCore.GRPCClient) {
internal init(wrapping client: GRPCCore.GRPCClient<Transport>) {
self.client = client
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// DO NOT EDIT.
// swift-format-ignore-file
//
// Generated by the gRPC Swift generator plugin for the protocol buffer compiler.
// Source: grpc/testing/control.proto
//
// For information on using the generated types, please see the documentation:
// https://github.com/grpc/grpc-swift

// This file contained no services.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2015-2016 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Message definitions to be used by integration test service definitions.

// DO NOT EDIT.
// swift-format-ignore-file
//
// Generated by the gRPC Swift generator plugin for the protocol buffer compiler.
// Source: grpc/testing/messages.proto
//
// For information on using the generated types, please see the documentation:
// https://github.com/grpc/grpc-swift

// This file contained no services.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// DO NOT EDIT.
// swift-format-ignore-file
//
// Generated by the gRPC Swift generator plugin for the protocol buffer compiler.
// Source: grpc/testing/payloads.proto
//
// For information on using the generated types, please see the documentation:
// https://github.com/grpc/grpc-swift

// This file contained no services.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// DO NOT EDIT.
// swift-format-ignore-file
//
// Generated by the gRPC Swift generator plugin for the protocol buffer compiler.
// Source: grpc/testing/stats.proto
//
// For information on using the generated types, please see the documentation:
// https://github.com/grpc/grpc-swift

// This file contained no services.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import GRPCCore
import GRPCProtobuf
import SwiftProtobuf

// MARK: - grpc.testing.WorkerService

Expand Down Expand Up @@ -381,7 +380,7 @@ extension Grpc_Testing_WorkerService {

// Default implementation of 'registerMethods(with:)'.
extension Grpc_Testing_WorkerService.StreamingServiceProtocol {
internal func registerMethods(with router: inout GRPCCore.RPCRouter) {
internal func registerMethods<Transport>(with router: inout GRPCCore.RPCRouter<Transport>) where Transport: GRPCCore.ServerTransport {
router.registerHandler(
forMethod: Grpc_Testing_WorkerService.Method.RunServer.descriptor,
deserializer: GRPCProtobuf.ProtobufDeserializer<Grpc_Testing_ServerArgs>(),
Expand Down
31 changes: 20 additions & 11 deletions Sources/GRPCNIOTransportCore/Client/Connection/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ package final class Connection: Sendable {
wrappingChannelSynchronously: channel,
configuration: NIOAsyncChannel.Configuration(
isOutboundHalfClosureEnabled: true,
inboundType: RPCResponsePart.self,
outboundType: RPCRequestPart.self
inboundType: RPCResponsePart<GRPCNIOTransportBytes>.self,
outboundType: RPCRequestPart<GRPCNIOTransportBytes>.self
)
)
}
Expand Down Expand Up @@ -389,23 +389,32 @@ package final class Connection: Sendable {

extension Connection {
package struct Stream {
package typealias Inbound = NIOAsyncChannelInboundStream<RPCResponsePart>
package typealias Inbound = NIOAsyncChannelInboundStream<RPCResponsePart<GRPCNIOTransportBytes>>

typealias RequestWriter = NIOAsyncChannelOutboundWriter<
RPCRequestPart<GRPCNIOTransportBytes>
>

typealias HTTP2Stream = NIOAsyncChannel<
RPCResponsePart<GRPCNIOTransportBytes>,
RPCRequestPart<GRPCNIOTransportBytes>
>

package struct Outbound: ClosableRPCWriterProtocol {
package typealias Element = RPCRequestPart
package typealias Element = RPCRequestPart<GRPCNIOTransportBytes>

private let requestWriter: NIOAsyncChannelOutboundWriter<RPCRequestPart>
private let http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
private let requestWriter: RequestWriter
private let http2Stream: HTTP2Stream

fileprivate init(
requestWriter: NIOAsyncChannelOutboundWriter<RPCRequestPart>,
http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
requestWriter: RequestWriter,
http2Stream: HTTP2Stream
) {
self.requestWriter = requestWriter
self.http2Stream = http2Stream
}

package func write(_ element: RPCRequestPart) async throws {
package func write(_ element: RPCRequestPart<GRPCNIOTransportBytes>) async throws {
try await self.requestWriter.write(element)
}

Expand All @@ -425,10 +434,10 @@ extension Connection {

let context: ClientContext

private let http2Stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>
private let http2Stream: HTTP2Stream

init(
wrapping stream: NIOAsyncChannel<RPCResponsePart, RPCRequestPart>,
wrapping stream: HTTP2Stream,
context: ClientContext
) {
self.http2Stream = stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package import GRPCCore
private import Synchronization

package final class GRPCChannel: ClientTransport {
package typealias Bytes = GRPCNIOTransportBytes

private enum Input: Sendable {
/// Close the channel, if possible.
case close
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ internal import NIOHTTP2

final class GRPCClientStreamHandler: ChannelDuplexHandler {
typealias InboundIn = HTTP2Frame.FramePayload
typealias InboundOut = RPCResponsePart
typealias InboundOut = RPCResponsePart<GRPCNIOTransportBytes>

typealias OutboundIn = RPCRequestPart
typealias OutboundIn = RPCRequestPart<GRPCNIOTransportBytes>
typealias OutboundOut = HTTP2Frame.FramePayload

private var stateMachine: GRPCStreamStateMachine
Expand Down Expand Up @@ -80,7 +80,8 @@ extension GRPCClientStreamHandler {
loop: while true {
switch self.stateMachine.nextInboundMessage() {
case .receiveMessage(let message):
context.fireChannelRead(self.wrapInboundOut(.message(message)))
let wrapped = GRPCNIOTransportBytes(message)
context.fireChannelRead(self.wrapInboundOut(.message(wrapped)))
case .awaitMoreMessages:
break loop
case .noMoreMessages:
Expand Down Expand Up @@ -193,7 +194,7 @@ extension GRPCClientStreamHandler {

case .message(let message):
do {
try self.stateMachine.send(message: message, promise: promise)
try self.stateMachine.send(message: message.buffer, promise: promise)
} catch let invalidState {
let error = RPCError(invalidState)
promise?.fail(error)
Expand Down
46 changes: 26 additions & 20 deletions Sources/GRPCNIOTransportCore/Compression/Zlib.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ extension Zlib {
/// - Parameter output: The `ByteBuffer` into which the compressed message should be written.
/// - Returns: The number of bytes written into the `output` buffer.
@discardableResult
func compress(_ input: [UInt8], into output: inout ByteBuffer) throws(ZlibError) -> Int {
func compress(_ input: ByteBuffer, into output: inout ByteBuffer) throws(ZlibError) -> Int {
defer { self.reset() }
let upperBound = self.stream.deflateBound(inputBytes: input.count)
let upperBound = self.stream.deflateBound(inputBytes: input.readableBytes)
return try self.stream.deflate(input, into: &output, upperBound: upperBound)
}

Expand Down Expand Up @@ -110,7 +110,7 @@ extension Zlib {
/// - Parameters:
/// - input: The buffer read compressed bytes from.
/// - limit: The largest size a decompressed payload may be.
func decompress(_ input: inout ByteBuffer, limit: Int) throws -> [UInt8] {
func decompress(_ input: inout ByteBuffer, limit: Int) throws -> ByteBuffer {
defer { self.reset() }
return try self.stream.inflate(input: &input, limit: limit)
}
Expand Down Expand Up @@ -302,24 +302,30 @@ extension UnsafeMutablePointer<z_stream> {
self.pointee.msg.map { String(cString: $0) }
}

func inflate(input: inout ByteBuffer, limit: Int) throws -> [UInt8] {
return try input.readWithUnsafeMutableReadableBytes { inputPointer in
func inflate(input: inout ByteBuffer, limit: Int) throws -> ByteBuffer {
return try input.readWithUnsafeMutableReadableBytes { inputPointer -> (Int, ByteBuffer) in
self.setNextInputBuffer(inputPointer)
defer {
self.setNextInputBuffer(nil)
self.setNextOutputBuffer(nil)
}

// Assume the output will be twice as large as the input.
var output = [UInt8](repeating: 0, count: min(inputPointer.count * 2, limit))
var offset = 0
var output = ByteBuffer()
var outputSize = min(inputPointer.count * 2, limit)
var finished = false
var totalBytesWritten = 0

while true {
let (finished, written) = try output[offset...].withUnsafeMutableBytes { outPointer in
let written = try output.writeWithUnsafeMutableBytes(
minimumWritableBytes: outputSize
) { pointer in
let outPointer = UnsafeMutableRawBufferPointer(
start: pointer.baseAddress,
count: min(pointer.count, outputSize)
)
self.setNextOutputBuffer(outPointer)

let finished: Bool

// Possible return codes:
// - Z_OK: some progress has been made
// - Z_STREAM_END: the end of the compressed data has been reached and all uncompressed
Expand Down Expand Up @@ -347,29 +353,29 @@ extension UnsafeMutablePointer<z_stream> {
)
}

let size = outPointer.count - self.availableOutputBytes
return (finished, size)
return outPointer.count - self.availableOutputBytes
}

if finished {
output.removeLast(output.count - self.totalOutputBytes)
let bytesRead = inputPointer.count - self.availableInputBytes
return (bytesRead, output)
} else {
offset += written
let newSize = min(output.count * 2, limit)
if newSize == output.count {
// There are still more bytes to decompress. Increase the size of the extra space in the
// buffer we're writing into.
totalBytesWritten += written
let newSize = min(outputSize * 2, limit - totalBytesWritten)
if newSize <= 0 {
assert(newSize == 0)
throw RPCError(code: .resourceExhausted, message: "Message is too large to decompress.")
} else {
output.append(contentsOf: repeatElement(0, count: newSize - output.count))
}
outputSize = newSize
}
}
}
}

func deflate(
_ input: [UInt8],
_ input: ByteBuffer,
into output: inout ByteBuffer,
upperBound: Int
) throws(ZlibError) -> Int {
Expand All @@ -380,7 +386,7 @@ extension UnsafeMutablePointer<z_stream> {

do {
var input = input
return try input.withUnsafeMutableBytes { input in
return try input.withUnsafeMutableReadableBytes { input in
self.setNextInputBuffer(input)

return try output.writeWithUnsafeMutableBytes(minimumWritableBytes: upperBound) { output in
Expand Down
Loading
Loading