@@ -60,11 +60,13 @@ public final class TCPSocketSink: Async.InputStream {
6060 switch event {
6161 case . next( let input, let ready) :
6262 guard inputBuffer == nil else {
63- fatalError ( " SocketSink upstream is illegally overproducing input buffers. " )
63+ ERROR ( " SocketSink upstream is illegally overproducing input buffers. " )
64+ return
6465 }
6566 inputBuffer = input
6667 guard currentReadyPromise == nil else {
67- fatalError ( " SocketSink currentReadyPromise illegally not nil during input. " )
68+ ERROR ( " SocketSink currentReadyPromise illegally not nil during input. " )
69+ return
6870 }
6971 currentReadyPromise = ready
7072 resumeIfSuspended ( )
@@ -81,7 +83,8 @@ public final class TCPSocketSink: Async.InputStream {
8183 return
8284 }
8385 guard let writeSource = self . writeSource else {
84- fatalError ( " SocketSink writeSource illegally nil during close. " )
86+ ERROR ( " SocketSink writeSource illegally nil during close. " )
87+ return
8588 }
8689 writeSource. cancel ( )
8790 socket. close ( )
@@ -93,7 +96,8 @@ public final class TCPSocketSink: Async.InputStream {
9396 private func writeData( ready: Promise < Void > ) {
9497 do {
9598 guard let buffer = self . inputBuffer else {
96- fatalError ( " Unexpected nil SocketSink inputBuffer during writeData " )
99+ ERROR ( " Unexpected nil SocketSink inputBuffer during writeData " )
100+ return
97101 }
98102
99103 let write = try socket. write ( from: buffer)
@@ -102,7 +106,7 @@ public final class TCPSocketSink: Async.InputStream {
102106 switch count {
103107 case buffer. count:
104108 self . inputBuffer = nil
105- ready. complete ( )
109+ ready. complete ( onNextTick : eventLoop )
106110 default :
107111 inputBuffer = ByteBuffer (
108112 start: buffer. baseAddress? . advanced ( by: count) ,
@@ -113,13 +117,14 @@ public final class TCPSocketSink: Async.InputStream {
113117 case . wouldBlock:
114118 resumeIfSuspended ( )
115119 guard currentReadyPromise == nil else {
116- fatalError ( " SocketSink currentReadyPromise illegally not nil during wouldBlock. " )
120+ ERROR ( " SocketSink currentReadyPromise illegally not nil during wouldBlock. " )
121+ return
117122 }
118123 currentReadyPromise = ready
119124 }
120125 } catch {
121126 self . error ( error)
122- ready. complete ( )
127+ ready. complete ( onNextTick : eventLoop )
123128 }
124129 }
125130
@@ -136,7 +141,8 @@ public final class TCPSocketSink: Async.InputStream {
136141 excessSignalCount = excessSignalCount &+ 1
137142 if excessSignalCount >= maxExcessSignalCount {
138143 guard let writeSource = self . writeSource else {
139- fatalError ( " SocketSink writeSource illegally nil during signal. " )
144+ ERROR ( " SocketSink writeSource illegally nil during signal. " )
145+ return
140146 }
141147 writeSource. suspend ( )
142148 sourceIsSuspended = true
@@ -145,7 +151,8 @@ public final class TCPSocketSink: Async.InputStream {
145151 }
146152
147153 guard let ready = currentReadyPromise else {
148- fatalError ( " SocketSink currentReadyPromise illegaly nil during signal. " )
154+ ERROR ( " SocketSink currentReadyPromise illegaly nil during signal. " )
155+ return
149156 }
150157 currentReadyPromise = nil
151158 writeData ( ready: ready)
@@ -157,7 +164,8 @@ public final class TCPSocketSink: Async.InputStream {
157164 }
158165
159166 guard let writeSource = self . writeSource else {
160- fatalError ( " SocketSink writeSource illegally nil during resumeIfSuspended. " )
167+ ERROR ( " SocketSink writeSource illegally nil during resumeIfSuspended. " )
168+ return
161169 }
162170 sourceIsSuspended = false
163171 // start listening for ready notifications
@@ -177,7 +185,8 @@ extension TCPSocket {
177185 @available ( * , deprecated)
178186 public func sink( on eventLoop: Worker ) -> TCPSocketSink {
179187 return . init( socket: self , on: eventLoop) { _, error in
180- fatalError ( " Uncaught error in TCPSocketSink: \( error) . " )
188+ ERROR ( " Uncaught error in TCPSocketSink: \( error) . " )
189+ return
181190 }
182191 }
183192}
0 commit comments