Skip to content

Commit 46dd8f3

Browse files
committed
Introduce new way of scheduling transport writes (3x faster!)
Instead of making a `write` syscall on each `transport.write` call, we now have an internal buffer of queued write calls. After all callbacks are done, or after the selector poll is complete, we flush the buffers using as low number of syscalls as possible. We now use zero-copy aggressively. For instance, writelines doesn't concat all buffers into one bytes object; instead each buffer passed to writelines will be repackaged and passed to the writev syscall (along with all other buffered data). This commit (+ 2 before this one) completes the refactoring of transport writes. Sequential writes of small buffers are now ~3x faster. For instance: transport.write 15360 buffers by 0.01 MB: - before: 0.32 seconds (466.87 MB/sec) - after: 0.09 seconds (1657.38 MB/sec)
1 parent 83ae4ef commit 46dd8f3

File tree

4 files changed

+256
-114
lines changed

4 files changed

+256
-114
lines changed

tests/test_pipes.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ def connect():
135135

136136
def test_write_pipe(self):
137137
rpipe, wpipe = os.pipe()
138+
os.set_blocking(rpipe, False)
138139
pipeobj = io.open(wpipe, 'wb', 1024)
139140

140141
proto = MyWritePipeProto(loop=self.loop)
@@ -149,7 +150,10 @@ def test_write_pipe(self):
149150
data = bytearray()
150151

151152
def reader(data):
152-
chunk = os.read(rpipe, 1024)
153+
try:
154+
chunk = os.read(rpipe, 1024)
155+
except BlockingIOError:
156+
return len(data)
153157
data += chunk
154158
return len(data)
155159

@@ -174,6 +178,7 @@ def reader(data):
174178
def test_write_pipe_disconnect_on_close(self):
175179
rsock, wsock = test_utils.socketpair()
176180
rsock.setblocking(False)
181+
177182
pipeobj = io.open(wsock.detach(), 'wb', 1024)
178183

179184
proto = MyWritePipeProto(loop=self.loop)
@@ -194,6 +199,8 @@ def test_write_pipe_disconnect_on_close(self):
194199

195200
def test_write_pty(self):
196201
master, slave = os.openpty()
202+
os.set_blocking(master, False)
203+
197204
slave_write_obj = io.open(slave, 'wb', 0)
198205

199206
proto = MyWritePipeProto(loop=self.loop)
@@ -208,7 +215,10 @@ def test_write_pty(self):
208215
data = bytearray()
209216

210217
def reader(data):
211-
chunk = os.read(master, 1024)
218+
try:
219+
chunk = os.read(master, 1024)
220+
except BlockingIOError:
221+
return len(data)
212222
data += chunk
213223
return len(data)
214224

0 commit comments

Comments
 (0)