Skip to content

Commit eee6984

Browse files
committed
Rewrite UDP transport from scratch.
The new implementation follows asyncio pretty closely. We no longer use uv_udp_* functions from libuv. Instead, we use uv_poll_* and inherit asyncio logic from selector_events.py. When `loop.create_datagram_endpoint()` is called with a `remote_addr` argument, asyncio attempts to *connect* the socket. The old uvloop implementation didn't do that, as there's no 'uv_udp_connect()' function in libuv (at least yet). For UDP it doesn't really matter if the socket is connected or not. The main difference is that when a UDP socket is connected, we can use `socket.send(data)` method, instead of `socket.sendto(data, addr)`. The subtle difference appears in two scenarios: * using getsockname(); * raising an exception from `create_datagram_endpoint()` when `remote_addr` is unreachable. It doesn't seem practical to wait until we have `uv_udp_connect()` function in libuv, hence the rewrite. In the future, when `uv_udp_connect()` is available, it should be easy to rollback to the current implementation. Addresses issues #109, #108, #85.
1 parent ce654bd commit eee6984

File tree

11 files changed

+262
-510
lines changed

11 files changed

+262
-510
lines changed

tests/test_udp.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,19 @@ def datagram_received(self, data, addr):
4646
super().datagram_received(data, addr)
4747
self.transport.sendto(b'resp:' + data, addr)
4848

49-
for lc in (('127.0.0.1', 0), None):
50-
if lc is None and not isinstance(self.loop, uvloop.Loop):
51-
# TODO This looks like a bug in asyncio -- if no local_addr
52-
# and no remote_addr are specified, the connection
53-
# that asyncio creates is not bound anywhere.
54-
return
49+
for family, lc_host, lc_port in ((socket.AF_INET, '127.0.0.1', 0),
50+
(socket.AF_INET6, '::1', 0)):
51+
52+
lc = (lc_host, lc_port)
5553

5654
with self.subTest(local_addr=lc):
5755
coro = self.loop.create_datagram_endpoint(
58-
TestMyDatagramProto, local_addr=lc, family=socket.AF_INET)
56+
TestMyDatagramProto,
57+
local_addr=lc,
58+
family=family)
59+
5960
s_transport, server = self.loop.run_until_complete(coro)
60-
host, port = s_transport.get_extra_info('sockname')
61+
host, port, *_ = s_transport.get_extra_info('sockname')
6162

6263
self.assertIsInstance(server, TestMyDatagramProto)
6364
self.assertEqual('INITIALIZED', server.state)
@@ -70,16 +71,16 @@ def datagram_received(self, data, addr):
7071

7172
coro = self.loop.create_datagram_endpoint(
7273
lambda: MyDatagramProto(loop=self.loop),
73-
family=socket.AF_INET,
74-
remote_addr=None if lc is None else (host, port),
74+
family=family,
75+
remote_addr=(host, port),
7576
**extra)
7677
transport, client = self.loop.run_until_complete(coro)
7778

7879
self.assertIsInstance(client, MyDatagramProto)
7980
self.assertEqual('INITIALIZED', client.state)
8081
self.assertIs(client.transport, transport)
8182

82-
transport.sendto(b'xxx', (host, port) if lc is None else None)
83+
transport.sendto(b'xxx')
8384
test_utils.run_until(self.loop, lambda: server.nbytes)
8485
self.assertEqual(3, server.nbytes)
8586
test_utils.run_until(self.loop, lambda: client.nbytes)

uvloop/handles/handle.pxd

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
cdef class UVHandle:
22
cdef:
33
uv.uv_handle_t *_handle
4-
bint _closed
5-
bint _inited
64
Loop _loop
75
readonly _source_traceback
6+
bint _closed
7+
bint _inited
8+
9+
# Added to enable current UDPTransport implementation,
10+
# which doesn't use libuv handles.
11+
bint _has_handle
812

913
# All "inline" methods are final
1014

@@ -20,7 +24,8 @@ cdef class UVHandle:
2024

2125
cdef _warn_unclosed(self)
2226

23-
cdef inline _free(self)
27+
cdef _dealloc_impl(self)
28+
cdef _free(self)
2429
cdef _close(self)
2530
cdef _after_close(self)
2631

@@ -35,7 +40,7 @@ cdef class UVSocketHandle(UVHandle):
3540

3641
# All "inline" methods are final
3742

38-
cdef inline _fileno(self)
43+
cdef _fileno(self)
3944

4045
cdef _new_socket(self)
4146
cdef inline _get_socket(self)

uvloop/handles/handle.pyx

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ cdef class UVHandle:
1515
def __cinit__(self):
1616
self._closed = 0
1717
self._inited = 0
18+
self._has_handle = 1
1819
self._handle = NULL
1920
self._loop = None
2021
self._source_traceback = None
@@ -25,6 +26,9 @@ cdef class UVHandle:
2526
self.__class__.__name__))
2627

2728
def __dealloc__(self):
29+
self._dealloc_impl()
30+
31+
cdef _dealloc_impl(self):
2832
if UVLOOP_DEBUG:
2933
if self._loop is not None:
3034
self._loop._debug_handles_current.subtract([
@@ -36,6 +40,9 @@ cdef class UVHandle:
3640
.format(self.__class__.__name__))
3741

3842
if self._handle is NULL:
43+
if UVLOOP_DEBUG:
44+
if self._has_handle == 0:
45+
self._loop._debug_uv_handles_freed += 1
3946
return
4047

4148
# -> When we're at this point, something is wrong <-
@@ -67,7 +74,7 @@ cdef class UVHandle:
6774
self._closed = 1
6875
self._free()
6976

70-
cdef inline _free(self):
77+
cdef _free(self):
7178
if self._handle == NULL:
7279
return
7380

@@ -98,6 +105,8 @@ cdef class UVHandle:
98105
if self._handle is not NULL:
99106
self._free()
100107

108+
self._closed = 1
109+
101110
if UVLOOP_DEBUG:
102111
name = self.__class__.__name__
103112
if self._inited:
@@ -107,11 +116,10 @@ cdef class UVHandle:
107116
raise RuntimeError(
108117
'_abort_init: {}._closed is set'.format(name))
109118

110-
self._closed = 1
111-
112119
cdef inline _finish_init(self):
113120
self._inited = 1
114-
self._handle.data = <void*>self
121+
if self._has_handle == 1:
122+
self._handle.data = <void*>self
115123
if self._loop._debug:
116124
self._source_traceback = extract_stack()
117125
if UVLOOP_DEBUG:
@@ -134,7 +142,7 @@ cdef class UVHandle:
134142
cdef bint res
135143
res = self._closed != 1 and self._inited == 1
136144
if UVLOOP_DEBUG:
137-
if res:
145+
if res and self._has_handle == 1:
138146
name = self.__class__.__name__
139147
if self._handle is NULL:
140148
raise RuntimeError(
@@ -228,7 +236,7 @@ cdef class UVSocketHandle(UVHandle):
228236
self._fileobj = None
229237
self.__cached_socket = None
230238

231-
cdef inline _fileno(self):
239+
cdef _fileno(self):
232240
cdef:
233241
int fd
234242
int err
@@ -349,6 +357,9 @@ cdef void __uv_close_handle_cb(uv.uv_handle_t* handle) with gil:
349357
h = <UVHandle>handle.data
350358
try:
351359
if UVLOOP_DEBUG:
360+
if not h._has_handle:
361+
raise RuntimeError(
362+
'has_handle=0 in __uv_close_handle_cb')
352363
h._loop._debug_handles_closed.update([
353364
h.__class__.__name__])
354365
h._free()

uvloop/handles/poll.pxd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ cdef class UVPoll(UVHandle):
1212

1313
cdef int is_active(self)
1414

15+
cdef is_reading(self)
1516
cdef start_reading(self, Handle callback)
1617
cdef start_writing(self, Handle callback)
1718
cdef stop_reading(self)

uvloop/handles/poll.pyx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ cdef class UVPoll(UVHandle):
8484
self.fd,
8585
&dummy_event) # ignore errors
8686

87+
cdef is_reading(self):
88+
return self._is_alive() and self.reading_handle is not None
89+
8790
cdef start_reading(self, Handle callback):
8891
cdef:
8992
int mask = 0

uvloop/handles/udp.pxd

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,14 @@
11
cdef class UDPTransport(UVBaseTransport):
22
cdef:
3-
bint __receiving
4-
int _family
3+
object sock
4+
UVPoll poll
5+
object address
6+
object buffer
57

6-
bint _address_set
7-
system.sockaddr_storage _address
8-
object _cached_py_address
8+
cdef _init(self, Loop loop, object sock, object r_addr)
99

10-
cdef _init(self, Loop loop, unsigned int family)
10+
cdef _on_read_ready(self)
11+
cdef _on_write_ready(self)
1112

12-
cdef _set_remote_address(self, system.sockaddr* addr,
13-
size_t addr_len)
14-
15-
cdef _bind(self, system.sockaddr* addr, bint reuse_addr)
16-
cdef open(self, int family, int sockfd)
17-
cdef _set_broadcast(self, bint on)
18-
19-
cdef inline __receiving_started(self)
20-
cdef inline __receiving_stopped(self)
21-
22-
cdef _send(self, object data, object addr)
23-
24-
cdef _on_receive(self, bytes data, object exc, object addr)
25-
cdef _on_sent(self, object exc)
13+
@staticmethod
14+
cdef UDPTransport new(Loop loop, object sock, object r_addr)

0 commit comments

Comments
 (0)