Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 151 additions & 91 deletions Sources/NIOHTTP2/HTTP2PipelineHelpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,38 @@ extension ChannelPipeline {
deprecated,
renamed: "Channel.configureHTTP2SecureUpgrade(h2ChannelConfigurator:http1ChannelConfigurator:)"
)
@preconcurrency
public func configureHTTP2SecureUpgrade(
h2PipelineConfigurator: @escaping (ChannelPipeline) -> EventLoopFuture<Void>,
http1PipelineConfigurator: @escaping (ChannelPipeline) -> EventLoopFuture<Void>
h2PipelineConfigurator: @escaping @Sendable (ChannelPipeline) -> EventLoopFuture<Void>,
http1PipelineConfigurator: @escaping @Sendable (ChannelPipeline) -> EventLoopFuture<Void>
) -> EventLoopFuture<Void> {
let alpnHandler = ApplicationProtocolNegotiationHandler { result in
switch result {
case .negotiated("h2"):
// Successful upgrade to HTTP/2. Let the user configure the pipeline.
return h2PipelineConfigurator(self)
case .negotiated("http/1.1"), .fallback:
// Explicit or implicit HTTP/1.1 choice.
return http1PipelineConfigurator(self)
case .negotiated:
// We negotiated something that isn't HTTP/1.1. This is a bad scene, and is a good indication
// of a user configuration error. We're going to close the connection directly.
return self.close().flatMap { self.eventLoop.makeFailedFuture(NIOHTTP2Errors.InvalidALPNToken()) }
@Sendable
func makeALPNHandler() -> ApplicationProtocolNegotiationHandler {
ApplicationProtocolNegotiationHandler { result in
switch result {
case .negotiated("h2"):
// Successful upgrade to HTTP/2. Let the user configure the pipeline.
return h2PipelineConfigurator(self)
case .negotiated("http/1.1"), .fallback:
// Explicit or implicit HTTP/1.1 choice.
return http1PipelineConfigurator(self)
case .negotiated:
// We negotiated something that isn't HTTP/1.1. This is a bad scene, and is a good indication
// of a user configuration error. We're going to close the connection directly.
return self.close().flatMap { self.eventLoop.makeFailedFuture(NIOHTTP2Errors.InvalidALPNToken()) }
}
}
}

return self.addHandler(alpnHandler)
if self.eventLoop.inEventLoop {
return self.eventLoop.assumeIsolatedUnsafeUnchecked().makeCompletedFuture {
try self.syncOperations.addHandler(makeALPNHandler())
}
} else {
return self.eventLoop.submit {
try self.syncOperations.addHandler(makeALPNHandler())
}
}
}
}

Expand Down Expand Up @@ -156,20 +168,29 @@ extension Channel {
targetWindowSize: Int,
inboundStreamStateInitializer: NIOChannelInitializerWithStreamID? = nil
) -> EventLoopFuture<HTTP2StreamMultiplexer> {
var handlers = [ChannelHandler]()
handlers.reserveCapacity(2) // Update this if we need to add more handlers, to avoid unnecessary reallocation.
handlers.append(NIOHTTP2Handler(mode: mode, initialSettings: initialLocalSettings))
handlers.append(
HTTP2StreamMultiplexer(
@Sendable
func configure() throws -> HTTP2StreamMultiplexer {
let http2 = NIOHTTP2Handler(mode: mode, initialSettings: initialLocalSettings)
let multiplexer = HTTP2StreamMultiplexer(
mode: mode,
channel: self,
targetWindowSize: targetWindowSize,
inboundStreamStateInitializer: inboundStreamStateInitializer
)
)

return self.pipeline.addHandlers(handlers, position: position).flatMap {
self.pipeline.handler(type: HTTP2StreamMultiplexer.self)
let handlers: [any ChannelHandler] = [http2, multiplexer]
try self.pipeline.syncOperations.addHandlers(handlers, position: position)
return multiplexer
}

if self.eventLoop.inEventLoop {
return self.eventLoop.assumeIsolatedUnsafeUnchecked().makeCompletedFuture {
try configure()
}
} else {
return self.eventLoop.submit {
try configure()
}
}
}

Expand All @@ -195,28 +216,25 @@ extension Channel {
targetWindowSize: Int = 65535,
inboundStreamInitializer: NIOChannelInitializer?
) -> EventLoopFuture<HTTP2StreamMultiplexer> {
@Sendable
func configure() throws -> HTTP2StreamMultiplexer {
try self.pipeline.syncOperations.configureHTTP2Pipeline(
mode: mode,
channel: self,
initialLocalSettings: initialLocalSettings,
position: ChannelPipeline.SynchronousOperations.Position(position),
targetWindowSize: targetWindowSize,
inboundStreamInitializer: inboundStreamInitializer
)
}

if self.eventLoop.inEventLoop {
return self.eventLoop.makeCompletedFuture {
try self.pipeline.syncOperations.configureHTTP2Pipeline(
mode: mode,
channel: self,
initialLocalSettings: initialLocalSettings,
position: position,
targetWindowSize: targetWindowSize,
inboundStreamInitializer: inboundStreamInitializer
)
return self.eventLoop.assumeIsolatedUnsafeUnchecked().makeCompletedFuture {
try configure()
}
} else {
return self.eventLoop.submit {
try self.pipeline.syncOperations.configureHTTP2Pipeline(
mode: mode,
channel: self,
initialLocalSettings: initialLocalSettings,
position: position,
targetWindowSize: targetWindowSize,
inboundStreamInitializer: inboundStreamInitializer
)
try configure()
}
}
}
Expand Down Expand Up @@ -298,22 +316,35 @@ extension Channel {
h2ChannelConfigurator: @escaping NIOChannelInitializer,
http1ChannelConfigurator: @escaping NIOChannelInitializer
) -> EventLoopFuture<Void> {
let alpnHandler = ApplicationProtocolNegotiationHandler { result in
switch result {
case .negotiated("h2"):
// Successful upgrade to HTTP/2. Let the user configure the pipeline.
return h2ChannelConfigurator(self)
case .negotiated("http/1.1"), .fallback:
// Explicit or implicit HTTP/1.1 choice.
return http1ChannelConfigurator(self)
case .negotiated:
// We negotiated something that isn't HTTP/1.1. This is a bad scene, and is a good indication
// of a user configuration error. We're going to close the connection directly.
return self.close().flatMap { self.eventLoop.makeFailedFuture(NIOHTTP2Errors.invalidALPNToken()) }
@Sendable
func makeALPNHandler() -> ApplicationProtocolNegotiationHandler {
ApplicationProtocolNegotiationHandler { result in
switch result {
case .negotiated("h2"):
// Successful upgrade to HTTP/2. Let the user configure the pipeline.
return h2ChannelConfigurator(self)
case .negotiated("http/1.1"), .fallback:
// Explicit or implicit HTTP/1.1 choice.
return http1ChannelConfigurator(self)
case .negotiated:
// We negotiated something that isn't HTTP/1.1. This is a bad scene, and is a good indication
// of a user configuration error. We're going to close the connection directly.
return self.close().flatMap { self.eventLoop.makeFailedFuture(NIOHTTP2Errors.invalidALPNToken()) }
}
}
}

return self.pipeline.addHandler(alpnHandler)
if self.eventLoop.inEventLoop {
let alpnHandler = makeALPNHandler()
return self.eventLoop.makeCompletedFuture {
try self.pipeline.syncOperations.addHandler(alpnHandler)
}
} else {
return self.eventLoop.submit {
let alpnHandler = makeALPNHandler()
try self.pipeline.syncOperations.addHandler(alpnHandler)
}
}
}

/// Configures a `ChannelPipeline` to speak either HTTP/1.1 or HTTP/2 according to what can be negotiated with the client.
Expand Down Expand Up @@ -369,12 +400,18 @@ extension Channel {
configurator: configurator,
h2ConnectionChannelConfigurator: h2ConnectionChannelConfigurator
) { channel in
channel.configureHTTP2Pipeline(mode: .server, targetWindowSize: targetWindowSize) {
streamChannel -> EventLoopFuture<Void> in
streamChannel.pipeline.addHandler(HTTP2FramePayloadToHTTP1ServerCodec()).flatMap {
() -> EventLoopFuture<Void> in
configurator(streamChannel)
channel.configureHTTP2Pipeline(
mode: .server,
targetWindowSize: targetWindowSize
) { streamChannel -> EventLoopFuture<Void> in
do {
let sync = streamChannel.pipeline.syncOperations
try sync.addHandlers(HTTP2FramePayloadToHTTP1ServerCodec())
} catch {
return streamChannel.eventLoop.makeFailedFuture(error)
}

return configurator(streamChannel)
}.map { _ in () }
}
}
Expand Down Expand Up @@ -416,10 +453,14 @@ extension Channel {
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate
) { streamChannel -> EventLoopFuture<Void> in
streamChannel.pipeline.addHandler(HTTP2FramePayloadToHTTP1ServerCodec()).flatMap {
() -> EventLoopFuture<Void> in
configurator(streamChannel)
do {
let sync = streamChannel.pipeline.syncOperations
try sync.addHandlers(HTTP2FramePayloadToHTTP1ServerCodec())
} catch {
return streamChannel.eventLoop.makeFailedFuture(error)
}

return configurator(streamChannel)
}.map { _ in () }
}
}
Expand Down Expand Up @@ -485,7 +526,7 @@ extension ChannelPipeline.SynchronousOperations {
inboundStreamInitializer: inboundStreamInitializer
)

try self.addHandler(handler, position: position)
try self.addHandler(handler, position: Position(position))

// `multiplexer` will always be non-nil when we are initializing with an `inboundStreamInitializer`
return try handler.syncMultiplexer()
Expand All @@ -511,11 +552,10 @@ extension ChannelPipeline.SynchronousOperations {
mode: NIOHTTP2Handler.ParserMode,
channel: Channel,
initialLocalSettings: [HTTP2Setting] = nioDefaultSettings,
position: ChannelPipeline.Position = .last,
position: ChannelPipeline.SynchronousOperations.Position = .last,
targetWindowSize: Int = 65535,
inboundStreamInitializer: NIOChannelInitializer?
) throws -> HTTP2StreamMultiplexer {

let http2Handler = NIOHTTP2Handler(mode: mode, initialSettings: initialLocalSettings)
let multiplexer = HTTP2StreamMultiplexer(
mode: mode,
Expand Down Expand Up @@ -636,36 +676,24 @@ extension Channel {
public func configureHTTP2AsyncSecureUpgrade<HTTP1Output: Sendable, HTTP2Output: Sendable>(
http1ConnectionInitializer: @escaping NIOChannelInitializerWithOutput<HTTP1Output>,
http2ConnectionInitializer: @escaping NIOChannelInitializerWithOutput<HTTP2Output>

) -> EventLoopFuture<EventLoopFuture<NIONegotiatedHTTPVersion<HTTP1Output, HTTP2Output>>> {
let alpnHandler = NIOTypedApplicationProtocolNegotiationHandler<
NIONegotiatedHTTPVersion<HTTP1Output, HTTP2Output>
> { result in
switch result {
case .negotiated("h2"):
// Successful upgrade to HTTP/2. Let the user configure the pipeline.
return http2ConnectionInitializer(self).map { http2Output in .http2(http2Output) }
case .negotiated("http/1.1"), .fallback:
// Explicit or implicit HTTP/1.1 choice.
return http1ConnectionInitializer(self).map { http1Output in .http1_1(http1Output) }
case .negotiated:
// We negotiated something that isn't HTTP/1.1. This is a bad scene, and is a good indication
// of a user configuration error. We're going to close the connection directly.
return self.close().flatMap { self.eventLoop.makeFailedFuture(NIOHTTP2Errors.invalidALPNToken()) }
if self.eventLoop.inEventLoop {
return self.eventLoop.makeCompletedFuture {
self.pipeline.syncOperations.configureHTTP2AsyncSecureUpgrade(
on: self,
http1ConnectionInitializer: http1ConnectionInitializer,
http2ConnectionInitializer: http2ConnectionInitializer
)
}
}

return self.pipeline
.addHandler(alpnHandler)
.flatMap { _ in
self.pipeline.handler(
type: NIOTypedApplicationProtocolNegotiationHandler<
NIONegotiatedHTTPVersion<HTTP1Output, HTTP2Output>
>.self
).map { alpnHandler in
alpnHandler.protocolNegotiationResult
}
} else {
return self.eventLoop.submit {
self.pipeline.syncOperations.configureHTTP2AsyncSecureUpgrade(
on: self,
http1ConnectionInitializer: http1ConnectionInitializer,
http2ConnectionInitializer: http2ConnectionInitializer
)
}
}
}

/// Configures a `ChannelPipeline` to speak either HTTP/1.1 or HTTP/2 according to what can be negotiated with the client.
Expand Down Expand Up @@ -861,6 +889,38 @@ extension ChannelPipeline.SynchronousOperations {
inboundStreamChannels: inboundStreamChannels
)
}

@inlinable
func configureHTTP2AsyncSecureUpgrade<HTTP1Output: Sendable, HTTP2Output: Sendable>(
on channel: any Channel,
http1ConnectionInitializer: @escaping NIOChannelInitializerWithOutput<HTTP1Output>,
http2ConnectionInitializer: @escaping NIOChannelInitializerWithOutput<HTTP2Output>
) -> EventLoopFuture<NIONegotiatedHTTPVersion<HTTP1Output, HTTP2Output>> {
let alpnHandler = NIOTypedApplicationProtocolNegotiationHandler<
NIONegotiatedHTTPVersion<HTTP1Output, HTTP2Output>
> { result in
switch result {
case .negotiated("h2"):
// Successful upgrade to HTTP/2. Let the user configure the pipeline.
return http2ConnectionInitializer(channel).map { http2Output in .http2(http2Output) }
case .negotiated("http/1.1"), .fallback:
// Explicit or implicit HTTP/1.1 choice.
return http1ConnectionInitializer(channel).map { http1Output in .http1_1(http1Output) }
case .negotiated:
// We negotiated something that isn't HTTP/1.1. This is a bad scene, and is a good indication
// of a user configuration error. We're going to close the connection directly.
return channel.close().flatMap { channel.eventLoop.makeFailedFuture(NIOHTTP2Errors.invalidALPNToken()) }
}
}

do {
try self.addHandler(alpnHandler)
} catch {
return channel.eventLoop.makeFailedFuture(error)
}

return alpnHandler.protocolNegotiationResult
}
}

/// `NIONegotiatedHTTPVersion` is a generic negotiation result holder for HTTP/1.1 and HTTP/2
Expand Down
35 changes: 35 additions & 0 deletions Sources/NIOHTTP2/NIOIsolatedEventLoop+MakeFuture.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIOCore

// TODO: remove this extension and bump the NIO dependency
// when https://github.com/apple/swift-nio/pull/3152 is released.
extension NIOIsolatedEventLoop {
@inlinable
@available(*, noasync)
func makeCompletedFuture<Success>(_ result: Result<Success, Error>) -> EventLoopFuture<Success> {
let promise = self.nonisolated().makePromise(of: Success.self)
promise.assumeIsolatedUnsafeUnchecked().completeWith(result)
return promise.futureResult
}

@inlinable
@available(*, noasync)
func makeCompletedFuture<Success>(
withResultOf body: () throws -> Success
) -> EventLoopFuture<Success> {
self.makeCompletedFuture(Result(catching: body))
}
}
Loading