Skip to content

Commit 6329091

Browse files
committed
Optimize ReadBuffer.redirect_messages()
1 parent 2cbd1dd commit 6329091

File tree

2 files changed

+44
-4
lines changed

2 files changed

+44
-4
lines changed

buffer.pxd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ cdef class ReadBuffer:
8787
ssize_t _length
8888

8989
char _current_message_type
90-
int _current_message_len
90+
int32_t _current_message_len
9191
ssize_t _current_message_len_unread
9292
bint _current_message_ready
9393

buffer.pyx

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ cdef class ReadBuffer:
338338
cdef:
339339
ssize_t nread
340340

341+
self._ensure_first_buf()
341342
while True:
342343
if self._pos0 + nbytes > self._len0:
343344
nread = self._len0 - self._pos0
@@ -579,6 +580,10 @@ cdef class ReadBuffer:
579580
cdef:
580581
const char* cbuf
581582
ssize_t cbuf_len
583+
int32_t msg_len
584+
ssize_t new_pos0
585+
ssize_t pos_delta
586+
int32_t done
582587

583588
while True:
584589
buf.write_byte(mtype)
@@ -590,10 +595,44 @@ cdef class ReadBuffer:
590595
else:
591596
buf.write_bytes(self.consume_message())
592597

598+
# Fast path: exhaust buf0 as efficiently as possible
599+
self._ensure_first_buf()
600+
if self._pos0 + 5 <= self._len0:
601+
cbuf = cpython.PyBytes_AS_STRING(self._buf0)
602+
new_pos0 = self._pos0
603+
604+
done = 0
605+
while new_pos0 + 5 <= self._len0:
606+
if (cbuf + new_pos0)[0] != mtype:
607+
done = 1
608+
break
609+
msg_len = hton.unpack_int32(cbuf + new_pos0 + 1) + 1
610+
if new_pos0 + msg_len > self._len0:
611+
break
612+
new_pos0 += msg_len
613+
614+
if new_pos0 != self._pos0:
615+
if PG_DEBUG:
616+
assert self._pos0 < new_pos0 <= self._len0
617+
618+
pos_delta = new_pos0 - self._pos0
619+
buf.write_cstr(
620+
cbuf + self._pos0,
621+
pos_delta)
622+
623+
self._pos0 = new_pos0
624+
self._length -= pos_delta
625+
626+
if PG_DEBUG:
627+
assert self._length >= 0
628+
629+
if done:
630+
# The next message is of a different type.
631+
return
632+
633+
# Back to slow path
593634
if not self.take_message_type(mtype):
594-
break
595-
596-
return buf
635+
return
597636

598637
cdef bytearray consume_messages(self, char mtype):
599638
"""Consume consecutive messages of the same type."""
@@ -613,6 +652,7 @@ cdef class ReadBuffer:
613652
buf = cpythonx.PyByteArray_AsString(result)
614653

615654
while self.take_message_type(mtype):
655+
self._ensure_first_buf()
616656
nbytes = self._current_message_len_unread
617657
self._read_into(buf, nbytes)
618658
buf += nbytes

0 commit comments

Comments
 (0)