@@ -45,6 +45,7 @@ public final class TCPSocketSource: Async.OutputStream {
4545
4646 /// Creates a new `SocketSource`
4747 internal init ( socket: TCPSocket , on worker: Worker , bufferSize: Int ) {
48+ DEBUG ( " TCPSocketSource.init(bufferSize: \( bufferSize) ) " )
4849 self . socket = socket
4950 self . eventLoop = worker. eventLoop
5051 self . isClosed = false
@@ -59,12 +60,14 @@ public final class TCPSocketSource: Async.OutputStream {
5960
6061 /// See OutputStream.output
6162 public func output< S> ( to inputStream: S ) where S: Async . InputStream , S. Input == Output {
63+ DEBUG ( " TCPSocketSource.output< \( S . self) >(to: \( inputStream) ) " )
6264 downstream = AnyInputStream ( inputStream)
6365 resumeIfSuspended ( )
6466 }
6567
6668 /// Cancels reading
6769 public func close( ) {
70+ DEBUG ( " TCPSocketSource.close() " )
6871 guard !isClosed else {
6972 return
7073 }
@@ -84,12 +87,14 @@ public final class TCPSocketSource: Async.OutputStream {
8487 /// important: the socket _must_ be ready to read data
8588 /// as indicated by a read source.
8689 private func readData( ) {
90+ DEBUG ( " TCPSocketSource.readData() " )
8791 guard let downstream = self . downstream else {
8892 ERROR ( " Unexpected nil downstream on SocketSource during readData. " )
8993 return
9094 }
9195 do {
9296 let read = try socket. read ( into: buffer)
97+ DEBUG ( " TCPSocketSource.socket.read() -> \( read) " )
9398 switch read {
9499 case . success( let count) :
95100 guard count > 0 else {
@@ -98,10 +103,12 @@ public final class TCPSocketSource: Async.OutputStream {
98103 }
99104
100105 let view = ByteBuffer ( start: buffer. baseAddress, count: count)
106+ DEBUG ( " TCPSocketSource.view = \( String ( bytes: view, encoding: . ascii) ?? " nil " ) " )
101107 downstreamIsReady = false
102108 let promise = Promise ( Void . self)
103109 downstream. input ( . next( view, promise) )
104110 promise. future. addAwaiter { result in
111+ DEBUG ( " TCPSocketSource.downstream.input.future.complete( \( result) ) [cancelIsPending: \( self . cancelIsPending) ] " )
105112 switch result {
106113 case . error( let e) : downstream. error ( e)
107114 case . expectation:
@@ -129,6 +136,7 @@ public final class TCPSocketSource: Async.OutputStream {
129136
130137 /// Called when the read source signals.
131138 private func readSourceSignal( isCancelled: Bool ) {
139+ DEBUG ( " TCPSocketSource.readSourceSignal( \( isCancelled) ) " )
132140 guard !isCancelled else {
133141 // source is cancelled, we will never receive signals again
134142 cancelIsPending = true
@@ -146,6 +154,7 @@ public final class TCPSocketSource: Async.OutputStream {
146154 ERROR ( " SocketSource readSource illegally nil during signal. " )
147155 return
148156 }
157+ DEBUG ( " TCPSocketSource.resumeIfSuspended() [sourceIsSuspended: \( sourceIsSuspended) ] " )
149158 readSource. suspend ( )
150159 sourceIsSuspended = true
151160 }
@@ -159,6 +168,7 @@ public final class TCPSocketSource: Async.OutputStream {
159168
160169 /// Resumes the readSource if it was currently suspended.
161170 private func resumeIfSuspended( ) {
171+ DEBUG ( " TCPSocketSource.resumeIfSuspended() [sourceIsSuspended: \( sourceIsSuspended) ] " )
162172 guard sourceIsSuspended else {
163173 return
164174 }
0 commit comments