Skip to content

Commit 11520d5

Browse files
committed
transports: Implement 'waiter' param for transport constructors
1 parent 44852da commit 11520d5

File tree

11 files changed

+113
-117
lines changed

11 files changed

+113
-117
lines changed

uvloop/handles/pipe.pxd

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,26 @@ cdef class UVPipeServer(UVStreamServer):
1010
cdef class UVPipeTransport(UVTransport):
1111

1212
@staticmethod
13-
cdef UVPipeTransport new(Loop loop, object protocol, Server server)
13+
cdef UVPipeTransport new(Loop loop, object protocol, Server server,
14+
object waiter)
1415

1516
cdef open(self, int sockfd)
16-
cdef connect(self, char* addr, object callback)
17+
cdef connect(self, char* addr)
1718

1819

1920
cdef class UVReadPipeTransport(UVReadTransport):
2021

2122
@staticmethod
22-
cdef UVReadPipeTransport new(Loop loop, object protocol, Server server)
23+
cdef UVReadPipeTransport new(Loop loop, object protocol, Server server,
24+
object waiter)
2325

2426
cdef open(self, int sockfd)
2527

2628

2729
cdef class UVWritePipeTransport(UVWriteTransport):
2830

2931
@staticmethod
30-
cdef UVWritePipeTransport new(Loop loop, object protocol, Server server)
32+
cdef UVWritePipeTransport new(Loop loop, object protocol, Server server,
33+
object waiter)
3134

3235
cdef open(self, int sockfd)

uvloop/handles/pipe.pyx

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -57,40 +57,43 @@ cdef class UVPipeServer(UVStreamServer):
5757

5858
self._mark_as_open()
5959

60-
cdef UVTransport _make_new_transport(self, object protocol):
60+
cdef UVTransport _make_new_transport(self, object protocol, object waiter):
6161
cdef UVPipeTransport tr
62-
tr = UVPipeTransport.new(self._loop, protocol, self._server)
62+
tr = UVPipeTransport.new(self._loop, protocol, self._server, waiter)
6363
return <UVTransport>tr
6464

6565

6666
@cython.no_gc_clear
6767
cdef class UVPipeTransport(UVTransport):
6868

6969
@staticmethod
70-
cdef UVPipeTransport new(Loop loop, object protocol, Server server):
70+
cdef UVPipeTransport new(Loop loop, object protocol, Server server,
71+
object waiter):
72+
7173
cdef UVPipeTransport handle
7274
handle = UVPipeTransport.__new__(UVPipeTransport)
73-
handle._init(loop, protocol, server)
75+
handle._init(loop, protocol, server, waiter)
7476
__pipe_init_uv_handle(<UVStream>handle, loop)
7577
return handle
7678

7779
cdef open(self, int sockfd):
7880
__pipe_open(<UVStream>self, sockfd)
7981

80-
cdef connect(self, char* addr, object callback):
82+
cdef connect(self, char* addr):
8183
cdef _PipeConnectRequest req
82-
req = _PipeConnectRequest(self._loop, self, callback)
84+
req = _PipeConnectRequest(self._loop, self)
8385
req.connect(addr)
8486

8587

8688
@cython.no_gc_clear
8789
cdef class UVReadPipeTransport(UVReadTransport):
8890

8991
@staticmethod
90-
cdef UVReadPipeTransport new(Loop loop, object protocol, Server server):
92+
cdef UVReadPipeTransport new(Loop loop, object protocol, Server server,
93+
object waiter):
9194
cdef UVReadPipeTransport handle
9295
handle = UVReadPipeTransport.__new__(UVReadPipeTransport)
93-
handle._init(loop, protocol, server)
96+
handle._init(loop, protocol, server, waiter)
9497
__pipe_init_uv_handle(<UVStream>handle, loop)
9598
return handle
9699

@@ -102,10 +105,11 @@ cdef class UVReadPipeTransport(UVReadTransport):
102105
cdef class UVWritePipeTransport(UVWriteTransport):
103106

104107
@staticmethod
105-
cdef UVWritePipeTransport new(Loop loop, object protocol, Server server):
108+
cdef UVWritePipeTransport new(Loop loop, object protocol, Server server,
109+
object waiter):
106110
cdef UVWritePipeTransport handle
107111
handle = UVWritePipeTransport.__new__(UVWritePipeTransport)
108-
handle._init(loop, protocol, server)
112+
handle._init(loop, protocol, server, waiter)
109113
__pipe_init_uv_handle(<UVStream>handle, loop)
110114
return handle
111115

@@ -115,18 +119,15 @@ cdef class UVWritePipeTransport(UVWriteTransport):
115119

116120
cdef class _PipeConnectRequest(UVRequest):
117121
cdef:
118-
object callback
119122
UVPipeTransport transport
120123

121-
def __cinit__(self, loop, transport, callback):
124+
def __cinit__(self, loop, transport):
122125
self.request = <uv.uv_req_t*> PyMem_Malloc(sizeof(uv.uv_connect_t))
123126
if self.request is NULL:
124127
self.on_done()
125128
raise MemoryError()
126129
self.request.data = <void*>self
127-
128130
self.transport = transport
129-
self.callback = callback
130131

131132
cdef connect(self, char* addr):
132133
# uv_pipe_connect returns void
@@ -137,19 +138,19 @@ cdef class _PipeConnectRequest(UVRequest):
137138

138139
cdef void __pipe_connect_callback(uv.uv_connect_t* req, int status) with gil:
139140
cdef:
140-
_TCPConnectRequest wrapper
141-
object callback
141+
_PipeConnectRequest wrapper
142+
UVPipeTransport transport
142143

143-
wrapper = <_TCPConnectRequest> req.data
144-
callback = wrapper.callback
144+
wrapper = <_PipeConnectRequest> req.data
145+
transport = wrapper.transport
145146

146147
if status < 0:
147148
exc = convert_error(status)
148149
else:
149150
exc = None
150151

151152
try:
152-
callback(exc)
153+
transport._on_connect(exc)
153154
except BaseException as ex:
154155
wrapper.transport._error(ex, False)
155156
finally:

uvloop/handles/process.pyx

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,8 @@ cdef class UVProcessTransport(UVProcess):
246246
if stdin is not None:
247247
if stdin == subprocess_PIPE:
248248
proto = WriteSubprocessPipeProto(self, 0)
249-
self.stdin = UVWritePipeTransport.new(self._loop, proto, None)
249+
self.stdin = UVWritePipeTransport.new(
250+
self._loop, proto, None, None)
250251

251252
iocnt = &self.iocnt[0]
252253
iocnt.flags = <uv.uv_stdio_flags>(uv.UV_CREATE_PIPE |
@@ -275,7 +276,8 @@ cdef class UVProcessTransport(UVProcess):
275276
io[1] = w
276277

277278
proto = ReadSubprocessPipeProto(self, 1)
278-
self.stdout = UVReadPipeTransport.new(self._loop, proto, None)
279+
self.stdout = UVReadPipeTransport.new(
280+
self._loop, proto, None, None)
279281
self.stdout.open(r)
280282
elif stdout == subprocess_DEVNULL:
281283
io[1] = self._file_devnull()
@@ -294,7 +296,8 @@ cdef class UVProcessTransport(UVProcess):
294296
io[2] = w
295297

296298
proto = ReadSubprocessPipeProto(self, 2)
297-
self.stderr = UVReadPipeTransport.new(self._loop, proto, None)
299+
self.stderr = UVReadPipeTransport.new(
300+
self._loop, proto, None, None)
298301
self.stderr.open(r)
299302
elif stderr == subprocess_STDOUT:
300303
if io[1] is None:
@@ -336,12 +339,14 @@ cdef class UVProcessTransport(UVProcess):
336339
pass_fds)
337340

338341
if handle.stdin is not None:
339-
handle.stdin._init_protocol(None)
342+
handle.stdin._init_protocol()
340343
if handle.stdout is not None:
341-
handle.stdout._init_protocol(None)
344+
handle.stdout._init_protocol()
342345
if handle.stderr is not None:
343-
handle.stderr._init_protocol(None)
346+
handle.stderr._init_protocol()
344347

348+
# By the time `protocol.connection_made` is called,
349+
# all three pipes should be done with initializing.
345350
loop.call_soon(protocol.connection_made, handle)
346351
if waiter is not None:
347352
loop.call_soon(_set_result_unless_cancelled, waiter, True)

uvloop/handles/stream.pxd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,4 @@ cdef class UVStreamServer(UVStream):
5555

5656
cdef listen(self, int backlog=?)
5757
cdef _on_listen(self)
58-
cdef UVTransport _make_new_transport(self, object protocol)
58+
cdef UVTransport _make_new_transport(self, object protocol, object waiter)

uvloop/handles/stream.pyx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,13 +312,13 @@ cdef class UVStreamServer(UVStream):
312312
cdef _on_listen(self):
313313
# Implementation for UVStream._on_listen
314314
protocol = self.protocol_factory()
315-
client = self._make_new_transport(protocol)
315+
client = self._make_new_transport(protocol, None)
316316
client._accept(<UVStream>self)
317317

318318
cdef inline _mark_as_open(self):
319319
self.opened = 1
320320

321-
cdef UVTransport _make_new_transport(self, object protocol):
321+
cdef UVTransport _make_new_transport(self, object protocol, object waiter):
322322
raise NotImplementedError
323323

324324

uvloop/handles/tcp.pxd

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ cdef class UVTCPServer(UVStreamServer):
99
cdef class UVTCPTransport(UVTransport):
1010
cdef bind(self, system.sockaddr* addr, unsigned int flags=*)
1111
cdef open(self, int sockfd)
12-
cdef connect(self, system.sockaddr* addr, object callback)
12+
cdef connect(self, system.sockaddr* addr)
1313

1414
@staticmethod
15-
cdef UVTCPTransport new(Loop loop, object protocol, Server server)
15+
cdef UVTCPTransport new(Loop loop, object protocol, Server server,
16+
object waiter)

uvloop/handles/tcp.pyx

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -62,20 +62,22 @@ cdef class UVTCPServer(UVStreamServer):
6262
else:
6363
self._mark_as_open()
6464

65-
cdef UVTransport _make_new_transport(self, object protocol):
65+
cdef UVTransport _make_new_transport(self, object protocol, object waiter):
6666
cdef UVTCPTransport tr
67-
tr = UVTCPTransport.new(self._loop, protocol, self._server)
67+
tr = UVTCPTransport.new(self._loop, protocol, self._server, waiter)
6868
return <UVTransport>tr
6969

7070

7171
@cython.no_gc_clear
7272
cdef class UVTCPTransport(UVTransport):
7373

7474
@staticmethod
75-
cdef UVTCPTransport new(Loop loop, object protocol, Server server):
75+
cdef UVTCPTransport new(Loop loop, object protocol, Server server,
76+
object waiter):
77+
7678
cdef UVTCPTransport handle
7779
handle = UVTCPTransport.__new__(UVTCPTransport)
78-
handle._init(loop, protocol, server)
80+
handle._init(loop, protocol, server, waiter)
7981
__tcp_init_uv_handle(<UVStream>handle, loop)
8082
return handle
8183

@@ -87,26 +89,23 @@ cdef class UVTCPTransport(UVTransport):
8789
self._ensure_alive()
8890
__tcp_open(<UVStream>self, sockfd)
8991

90-
cdef connect(self, system.sockaddr* addr, object callback):
92+
cdef connect(self, system.sockaddr* addr):
9193
cdef _TCPConnectRequest req
92-
req = _TCPConnectRequest(self._loop, self, callback)
94+
req = _TCPConnectRequest(self._loop, self)
9395
req.connect(addr)
9496

9597

9698
cdef class _TCPConnectRequest(UVRequest):
9799
cdef:
98-
object callback
99100
UVTCPTransport transport
100101

101-
def __cinit__(self, loop, transport, callback):
102+
def __cinit__(self, loop, transport):
102103
self.request = <uv.uv_req_t*> PyMem_Malloc(sizeof(uv.uv_connect_t))
103104
if self.request is NULL:
104105
self.on_done()
105106
raise MemoryError()
106107
self.request.data = <void*>self
107-
108108
self.transport = transport
109-
self.callback = callback
110109

111110
cdef connect(self, system.sockaddr* addr):
112111
cdef int err
@@ -123,18 +122,18 @@ cdef class _TCPConnectRequest(UVRequest):
123122
cdef void __tcp_connect_callback(uv.uv_connect_t* req, int status) with gil:
124123
cdef:
125124
_TCPConnectRequest wrapper
126-
object callback
125+
UVTCPTransport transport
127126

128127
wrapper = <_TCPConnectRequest> req.data
129-
callback = wrapper.callback
128+
transport = wrapper.transport
130129

131130
if status < 0:
132131
exc = convert_error(status)
133132
else:
134133
exc = None
135134

136135
try:
137-
callback(exc)
136+
transport._on_connect(exc)
138137
except BaseException as ex:
139138
wrapper.transport._error(ex, False)
140139
finally:

uvloop/handles/transport.pxd

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@ cdef class UVTransport(UVStream):
1515
object _protocol_data_received
1616

1717
Server _server
18+
object _waiter
1819

19-
cdef _init(self, Loop loop, object protocol, Server server)
20+
cdef _init(self, Loop loop, object protocol, Server server, object waiter)
2021

21-
cdef _set_protocol(self, object protocol)
22-
cdef _set_server(self, Server server)
23-
24-
cdef _init_protocol(self, waiter)
22+
cdef _init_protocol(self)
23+
cdef _on_connect(self, object exc)
2524

2625
cdef _set_write_buffer_limits(self, int high=*, int low=*)
2726
cdef _maybe_pause_protocol(self)

0 commit comments

Comments
 (0)