Skip to content

Commit ca548bc

Browse files
committed
Buffer sequential transport.write calls into one uv_write call
This increases throughput ~100% when many small writes are issued.
1 parent 7d78e50 commit ca548bc

File tree

8 files changed

+325
-54
lines changed

8 files changed

+325
-54
lines changed

tests/test_tcp.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,57 @@ async def start_server():
497497

498498

499499
class Test_UV_TCP(_TestTCP, tb.UVTestCase):
500-
pass
500+
501+
def test_many_small_writes(self):
502+
N = 10000
503+
TOTAL = 0
504+
505+
fut = self.loop.create_future()
506+
507+
async def server(reader, writer):
508+
nonlocal TOTAL
509+
while True:
510+
d = await reader.read(10000)
511+
if not d:
512+
break
513+
TOTAL += len(d)
514+
fut.set_result(True)
515+
writer.close()
516+
517+
async def run():
518+
srv = await asyncio.start_server(
519+
server,
520+
'127.0.0.1', 0,
521+
family=socket.AF_INET,
522+
loop=self.loop)
523+
524+
addr = srv.sockets[0].getsockname()
525+
r, w = await asyncio.open_connection(*addr, loop=self.loop)
526+
527+
DATA = b'x' * 102400
528+
529+
for _ in range(N):
530+
w.write(DATA)
531+
532+
try:
533+
w.write('a')
534+
except TypeError:
535+
pass
536+
537+
await w.drain()
538+
for _ in range(N):
539+
w.write(DATA)
540+
await w.drain()
541+
542+
w.close()
543+
await fut
544+
545+
srv.close()
546+
await srv.wait_closed()
547+
548+
self.assertEqual(TOTAL, N * 2 * len(DATA))
549+
550+
self.loop.run_until_complete(run())
501551

502552

503553
class Test_AIO_TCP(_TestTCP, tb.AIOTestCase):

uvloop/handles/check.pxd

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
cdef class UVCheck(UVHandle):
2+
cdef:
3+
Handle h
4+
bint running
5+
6+
# All "inline" methods are final
7+
8+
cdef _init(self, Loop loop, Handle h)
9+
10+
cdef inline stop(self)
11+
cdef inline start(self)
12+
13+
@staticmethod
14+
cdef UVCheck new(Loop loop, Handle h)

uvloop/handles/check.pyx

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
@cython.no_gc_clear
2+
cdef class UVCheck(UVHandle):
3+
cdef _init(self, Loop loop, Handle h):
4+
cdef int err
5+
6+
self._start_init(loop)
7+
8+
self._handle = <uv.uv_handle_t*> \
9+
PyMem_Malloc(sizeof(uv.uv_check_t))
10+
if self._handle is NULL:
11+
self._abort_init()
12+
raise MemoryError()
13+
14+
err = uv.uv_check_init(self._loop.uvloop, <uv.uv_check_t*>self._handle)
15+
if err < 0:
16+
self._abort_init()
17+
raise convert_error(err)
18+
19+
self._finish_init()
20+
21+
self.h = h
22+
self.running = 0
23+
24+
cdef inline stop(self):
25+
cdef int err
26+
27+
if not self._is_alive():
28+
self.running = 0
29+
return
30+
31+
if self.running == 1:
32+
err = uv.uv_check_stop(<uv.uv_check_t*>self._handle)
33+
self.running = 0
34+
if err < 0:
35+
exc = convert_error(err)
36+
self._fatal_error(exc, True)
37+
return
38+
39+
cdef inline start(self):
40+
cdef int err
41+
42+
self._ensure_alive()
43+
44+
if self.running == 0:
45+
err = uv.uv_check_start(<uv.uv_check_t*>self._handle,
46+
cb_check_callback)
47+
if err < 0:
48+
exc = convert_error(err)
49+
self._fatal_error(exc, True)
50+
return
51+
self.running = 1
52+
53+
@staticmethod
54+
cdef UVCheck new(Loop loop, Handle h):
55+
cdef UVCheck handle
56+
handle = UVCheck.__new__(UVCheck)
57+
handle._init(loop, h)
58+
return handle
59+
60+
61+
cdef void cb_check_callback(uv.uv_check_t* handle) with gil:
62+
if __ensure_handle_data(<uv.uv_handle_t*>handle, "UVCheck callback") == 0:
63+
return
64+
65+
cdef:
66+
UVCheck check = <UVCheck> handle.data
67+
Handle h = check.h
68+
try:
69+
h._run()
70+
except BaseException as ex:
71+
check._error(ex, False)

uvloop/handles/stream.pxd

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,16 @@ cdef class UVStream(UVBaseTransport):
55
bint __reading
66
bint __read_error_close
77
bint _eof
8+
list _buffer
9+
size_t _buffer_size
810

911
# All "inline" methods are final
1012

1113
cdef inline _init(self, Loop loop, object protocol, Server server,
1214
object waiter)
1315

16+
cdef inline _exec_write(self)
17+
1418
cdef inline _shutdown(self)
1519
cdef inline _accept(self, UVStream server)
1620

0 commit comments

Comments
 (0)