@@ -38,36 +38,27 @@ final class MockRequestExecutor {
3838 }
3939 }
4040
41- let eventLoop : EventLoop
42- let _blockingQueue = BlockingQueue < RequestParts > ( )
43- let pauseRequestBodyPartStreamAfterASingleWrite : Bool
44-
4541 var isCancelled : Bool {
46- if self . eventLoop. inEventLoop {
47- return self . _isCancelled
48- } else {
49- return try ! self . eventLoop. submit { self . _isCancelled } . wait ( )
50- }
42+ self . cancellationLock. value
5143 }
5244
5345 var signalledDemandForResponseBody : Bool {
54- if self . eventLoop. inEventLoop {
55- return self . _signaledDemandForResponseBody
56- } else {
57- return try ! self . eventLoop. submit { self . _signaledDemandForResponseBody } . wait ( )
58- }
46+ self . responseBodyDemandLock. value
5947 }
6048
6149 var requestBodyPartsCount : Int {
62- return self . _blockingQueue . count
50+ return self . blockingQueue . count
6351 }
6452
53+ let eventLoop : EventLoop
54+ let pauseRequestBodyPartStreamAfterASingleWrite : Bool
55+
56+ private let blockingQueue = BlockingQueue < RequestParts > ( )
57+ private let responseBodyDemandLock = ConditionLock ( value: false )
58+ private let cancellationLock = ConditionLock ( value: false )
59+
6560 private var request : HTTPExecutableRequest ?
66- private var _requestBodyParts = CircularBuffer < RequestParts > ( )
6761 private var _signaledDemandForRequestBody : Bool = false
68- private var _signaledDemandForResponseBody : Bool = false
69- private var _whenWritable : EventLoopPromise < RequestParts > ?
70- private var _isCancelled : Bool = false
7162
7263 init ( pauseRequestBodyPartStreamAfterASingleWrite: Bool = false , eventLoop: EventLoop ) {
7364 self . pauseRequestBodyPartStreamAfterASingleWrite = pauseRequestBodyPartStreamAfterASingleWrite
@@ -91,13 +82,13 @@ final class MockRequestExecutor {
9182 request. requestHeadSent ( )
9283 }
9384
94- func receiveRequestBody( deadline: NIODeadline = . now( ) + . seconds( 60 ) , _ verify: ( ByteBuffer ) throws -> Void ) throws {
85+ func receiveRequestBody( deadline: NIODeadline = . now( ) + . seconds( 5 ) , _ verify: ( ByteBuffer ) throws -> Void ) throws {
9586 enum ReceiveAction {
9687 case value( RequestParts )
9788 case future( EventLoopFuture < RequestParts > )
9889 }
9990
100- switch try self . _blockingQueue . popFirst ( deadline: deadline) {
91+ switch try self . blockingQueue . popFirst ( deadline: deadline) {
10192 case . body( . byteBuffer( let buffer) ) :
10293 try verify ( buffer)
10394 case . body( . fileRegion) :
@@ -107,13 +98,13 @@ final class MockRequestExecutor {
10798 }
10899 }
109100
110- func receiveEndOfStream( deadline: NIODeadline = . now( ) + . seconds( 60 ) ) throws {
101+ func receiveEndOfStream( deadline: NIODeadline = . now( ) + . seconds( 5 ) ) throws {
111102 enum ReceiveAction {
112103 case value( RequestParts )
113104 case future( EventLoopFuture < RequestParts > )
114105 }
115106
116- switch try self . _blockingQueue . popFirst ( deadline: deadline) {
107+ switch try self . blockingQueue . popFirst ( deadline: deadline) {
117108 case . body( . byteBuffer) :
118109 throw Errors . unexpectedByteBuffer
119110 case . body( . fileRegion) :
@@ -158,17 +149,34 @@ final class MockRequestExecutor {
158149 }
159150
160151 func resetResponseStreamDemandSignal( ) {
161- if self . eventLoop. inEventLoop {
162- self . resetResponseStreamDemandSignal0 ( )
163- } else {
164- self . eventLoop. execute {
165- self . resetResponseStreamDemandSignal0 ( )
166- }
152+ self . responseBodyDemandLock. lock ( )
153+ self . responseBodyDemandLock. unlock ( withValue: false )
154+ }
155+
156+ func receiveResponseDemand( deadline: NIODeadline = . now( ) + . seconds( 5 ) ) throws {
157+ let secondsUntilDeath = deadline - NIODeadline. now ( )
158+ guard self . responseBodyDemandLock. lock (
159+ whenValue: true ,
160+ timeoutSeconds: . init( secondsUntilDeath. nanoseconds / 1_000_000_000 )
161+ )
162+ else {
163+ throw TimeoutError ( )
167164 }
165+
166+ self . responseBodyDemandLock. unlock ( )
168167 }
169168
170- private func resetResponseStreamDemandSignal0( ) {
171- self . _signaledDemandForResponseBody = false
169+ func receiveCancellation( deadline: NIODeadline = . now( ) + . seconds( 5 ) ) throws {
170+ let secondsUntilDeath = deadline - NIODeadline. now ( )
171+ guard self . cancellationLock. lock (
172+ whenValue: true ,
173+ timeoutSeconds: . init( secondsUntilDeath. nanoseconds / 1_000_000_000 )
174+ )
175+ else {
176+ throw TimeoutError ( )
177+ }
178+
179+ self . cancellationLock. unlock ( )
172180 }
173181}
174182
@@ -192,12 +200,12 @@ extension MockRequestExecutor: HTTPRequestExecutor {
192200
193201 let stateChange = { ( ) -> WriteAction in
194202 var pause = false
195- if self . _blockingQueue . isEmpty && self . pauseRequestBodyPartStreamAfterASingleWrite && part. isBody {
203+ if self . blockingQueue . isEmpty && self . pauseRequestBodyPartStreamAfterASingleWrite && part. isBody {
196204 pause = true
197205 self . _signaledDemandForRequestBody = false
198206 }
199207
200- self . _blockingQueue . append ( . success( part) )
208+ self . blockingQueue . append ( . success( part) )
201209
202210 return pause ? . pauseBodyStream : . none
203211 }
@@ -218,29 +226,23 @@ extension MockRequestExecutor: HTTPRequestExecutor {
218226 }
219227
220228 func demandResponseBodyStream( _: HTTPExecutableRequest ) {
221- if self . eventLoop. inEventLoop {
222- self . _signaledDemandForResponseBody = true
223- } else {
224- self . eventLoop. execute { self . _signaledDemandForResponseBody = true }
225- }
229+ self . responseBodyDemandLock. lock ( )
230+ self . responseBodyDemandLock. unlock ( withValue: true )
226231 }
227232
228233 func cancelRequest( _: HTTPExecutableRequest ) {
229- if self . eventLoop. inEventLoop {
230- self . _isCancelled = true
231- } else {
232- self . eventLoop. execute { self . _isCancelled = true }
233- }
234+ self . cancellationLock. lock ( )
235+ self . cancellationLock. unlock ( withValue: true )
234236 }
235237}
236238
237239extension MockRequestExecutor {
240+ public struct TimeoutError : Error { }
241+
238242 final class BlockingQueue < Element> {
239243 private let condition = ConditionLock ( value: false )
240244 private var buffer = CircularBuffer < Result < Element , Error > > ( )
241245
242- public struct TimeoutError : Error { }
243-
244246 internal func append( _ element: Result < Element , Error > ) {
245247 self . condition. lock ( )
246248 self . buffer. append ( element)
0 commit comments