forked from apple/swift-nio-http2
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathBench1Conn10kRequests.swift
More file actions
207 lines (180 loc) · 7.8 KB
/
Bench1Conn10kRequests.swift
File metadata and controls
207 lines (180 loc) · 7.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2019-2021 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOCore
import NIOHTTP1
import NIOHTTP2
import NIOPosix
final class Bench1Conn10kRequests: Benchmark {
var group: MultiThreadedEventLoopGroup!
var server: Channel!
var client: Channel!
func setUp() throws {
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
self.server = try setupServer(group: self.group)
self.client = try setupClient(group: self.group, address: self.server.localAddress!)
}
func tearDown() {
try! self.client.close().wait()
try! self.server.close().wait()
try! self.group.syncShutdownGracefully()
self.group = nil
}
func run() throws -> Int {
var bodyByteCount = 0
for _ in 0..<10_000 {
bodyByteCount += try sendOneRequest(channel: self.client)
}
return bodyByteCount
}
}
func setupServer(group: EventLoopGroup) throws -> Channel {
let bootstrap = ServerBootstrap(group: group)
// Specify backlog and enable SO_REUSEADDR for the server itself
.serverChannelOption(ChannelOptions.backlog, value: 256)
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
// Set the handlers that are applied to the accepted Channels
.childChannelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
let sync = channel.pipeline.syncOperations
let _ = try sync.configureHTTP2Pipeline(
mode: .server,
connectionConfiguration: .init(),
streamConfiguration: .init()
) { streamChannel -> EventLoopFuture<Void> in
streamChannel.eventLoop.makeCompletedFuture {
let sync = streamChannel.pipeline.syncOperations
try sync.addHandler(HTTP2FramePayloadToHTTP1ServerCodec())
try sync.addHandler(HTTP1TestServer())
try sync.addHandler(ErrorHandler())
}
}
try sync.addHandler(ErrorHandler())
}
}
// Enable TCP_NODELAY and SO_REUSEADDR for the accepted Channels
.childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
.childChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelOption(ChannelOptions.maxMessagesPerRead, value: 1)
return try bootstrap.bind(host: "127.0.0.1", port: 12345).wait()
}
func sendOneRequest(channel: Channel) throws -> Int {
let responseReceivedPromise = channel.eventLoop.makePromise(of: Int.self)
channel.pipeline.handler(type: HTTP2StreamMultiplexer.self).whenSuccess { multiplexer in
multiplexer.createStreamChannel(promise: nil) { streamChannel in
streamChannel.eventLoop.makeCompletedFuture {
let sync = streamChannel.pipeline.syncOperations
try sync.addHandler(HTTP2FramePayloadToHTTP1ClientCodec(httpProtocol: .https))
let requestHandler = SendRequestHandler(
host: "127.0.0.1",
request: .init(
version: .init(major: 2, minor: 0),
method: .GET,
uri: "/",
headers: ["host": "localhost"]
),
responseReceivedPromise: responseReceivedPromise
)
try sync.addHandler(requestHandler)
try sync.addHandler(ErrorHandler())
}
}
}
return try responseReceivedPromise.futureResult.wait()
}
func setupClient(
group: EventLoopGroup,
address: SocketAddress
) throws -> Channel {
try ClientBootstrap(group: group)
.channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
.channelInitializer { channel in
channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(ErrorHandler())
}
}
.connect(to: address).flatMap { channel in
channel.configureHTTP2Pipeline(mode: .client, position: .first) { channel in
channel.eventLoop.makeSucceededFuture(())
}.map { _ in channel }
}.wait()
}
final class HTTP1TestServer: ChannelInboundHandler {
public typealias InboundIn = HTTPServerRequestPart
public typealias OutboundOut = HTTPServerResponsePart
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
guard case .end = self.unwrapInboundIn(data) else {
return
}
let loopBoundContext = NIOLoopBound(context, eventLoop: context.eventLoop)
// Insert an event loop tick here. This more accurately represents real workloads in SwiftNIO, which will not
// re-entrantly write their response frames.
let channel = context.channel
context.eventLoop.execute {
channel.getOption(HTTP2StreamChannelOptions.streamID).flatMap { (streamID) -> EventLoopFuture<Void> in
var headers = HTTPHeaders()
headers.add(name: "content-length", value: "5")
headers.add(name: "x-stream-id", value: String(Int(streamID)))
channel.write(
HTTPServerResponsePart.head(
HTTPResponseHead(version: .init(major: 2, minor: 0), status: .ok, headers: headers)
),
promise: nil
)
var buffer = channel.allocator.buffer(capacity: 12)
buffer.writeStaticString("hello")
channel.write(
HTTPServerResponsePart.body(.byteBuffer(buffer)),
promise: nil
)
return channel.writeAndFlush(HTTPServerResponsePart.end(nil))
}.whenComplete { _ in
loopBoundContext.value.close(promise: nil)
}
}
}
}
final class ErrorHandler: ChannelInboundHandler {
typealias InboundIn = Never
func errorCaught(context: ChannelHandlerContext, error: Error) {
print("Server received error: \(error)")
context.close(promise: nil)
}
}
final class SendRequestHandler: ChannelInboundHandler {
typealias InboundIn = HTTPClientResponsePart
typealias OutboundOut = HTTPClientRequestPart
private let responseReceivedPromise: EventLoopPromise<Int>
private let host: String
private let request: HTTPRequestHead
private var bytesReceived: Int = 0
init(host: String, request: HTTPRequestHead, responseReceivedPromise: EventLoopPromise<Int>) {
self.responseReceivedPromise = responseReceivedPromise
self.host = host
self.request = request
}
func channelActive(context: ChannelHandlerContext) {
assert(context.channel.parent!.isActive)
context.write(self.wrapOutboundOut(.head(self.request)), promise: nil)
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
}
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let resPart = self.unwrapInboundIn(data)
if case .body(let buffer) = resPart {
self.bytesReceived += buffer.readableBytes
}
if case .end = resPart {
self.responseReceivedPromise.succeed(self.bytesReceived)
}
}
}