Skip to content

Commit 599cf75

Browse files
authored
Merge pull request scylladb#28 from riptano/python-1185
PYTHON-1185: asyncio message chunks can be processed discontinuously
2 parents 25994d1 + 401e40f commit 599cf75

File tree

5 files changed

+21
-8
lines changed

5 files changed

+21
-8
lines changed

CHANGELOG.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ Features
1818
Bug Fixes
1919
---------
2020
* re-raising the CQLEngineException will fail on Python 3 (PYTHON-1166)
21+
* asyncio message chunks can be processed discontinuously (PYTHON-1185)
2122

2223
Others
2324
------

build.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ schedules:
4646
EXCLUDE_LONG=1
4747
matrix:
4848
exclude:
49-
- python: [2.7, 3.4, 3.6, 3.7]
50-
- cassandra: ['2.0', '2.1', '2.2', '3.0', 'test-dse']
49+
- python: [2.7, 3.4, 3.7]
50+
- cassandra: ['2.0', '2.1', '2.2', '3.0', 'test-dse', dse-4.8', 'dse-5.0']
5151

5252
release_test:
5353
schedule: per_commit

cassandra/io/asyncioreactor.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ class AsyncioConnection(Connection):
8383
_loop_thread = None
8484

8585
_write_queue = None
86+
_write_queue_lock = None
8687

8788
def __init__(self, *args, **kwargs):
8889
Connection.__init__(self, *args, **kwargs)
@@ -91,6 +92,7 @@ def __init__(self, *args, **kwargs):
9192
self._socket.setblocking(0)
9293

9394
self._write_queue = asyncio.Queue(loop=self._loop)
95+
self._write_queue_lock = asyncio.Lock(loop=self._loop)
9496

9597
# see initialize_reactor -- loop is running in a separate thread, so we
9698
# have to use a threadsafe call
@@ -157,20 +159,28 @@ def _close(self):
157159
def push(self, data):
158160
buff_size = self.out_buffer_size
159161
if len(data) > buff_size:
162+
chunks = []
160163
for i in range(0, len(data), buff_size):
161-
self._push_chunk(data[i:i + buff_size])
164+
chunks.append(data[i:i + buff_size])
162165
else:
163-
self._push_chunk(data)
166+
chunks = [data]
164167

165-
def _push_chunk(self, chunk):
166168
if self._loop_thread.ident != get_ident():
167169
asyncio.run_coroutine_threadsafe(
168-
self._write_queue.put(chunk),
170+
self._push_msg(chunks),
169171
loop=self._loop
170172
)
171173
else:
172174
# avoid races/hangs by just scheduling this, not using threadsafe
173-
self._loop.create_task(self._write_queue.put(chunk))
175+
self._loop.create_task(self._push_msg(chunks))
176+
177+
@asyncio.coroutine
178+
def _push_msg(self, chunks):
179+
# This lock ensures all chunks of a message are sequential in the Queue
180+
with (yield from self._write_queue_lock):
181+
for chunk in chunks:
182+
self._write_queue.put_nowait(chunk)
183+
174184

175185
@asyncio.coroutine
176186
def handle_write(self):

tests/integration/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,7 @@ def _id_and_mark(f):
355355
requiredse = unittest.skipUnless(DSE_VERSION, "DSE required")
356356
requirescloudproxy = unittest.skipIf(CLOUD_PROXY_PATH is None, "Cloud Proxy path hasn't been specified")
357357

358+
libevtest = unittest.skipUnless(EVENT_LOOP_MANAGER=="libev", "Test timing designed for libev loop")
358359

359360
def wait_for_node_socket(node, timeout):
360361
binary_itf = node.network_interfaces['binary']

tests/integration/simulacron/test_connection.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from cassandra.policies import HostStateListener, RoundRobinPolicy
2828

2929
from tests import connection_class, thread_pool_executor_class
30-
from tests.integration import requiressimulacron
30+
from tests.integration import requiressimulacron, libevtest
3131
from tests.integration.util import assert_quiescent_pool_state, late
3232
# important to import the patch PROTOCOL_VERSION from the simulacron module
3333
from tests.integration.simulacron import SimulacronBase, PROTOCOL_VERSION
@@ -194,6 +194,7 @@ def test_callbacks_and_pool_when_oto(self):
194194
callback.assert_not_called()
195195

196196
@cythontest
197+
@libevtest
197198
def test_heartbeat_defunct_deadlock(self):
198199
"""
199200
Ensure that there is no deadlock when request is in-flight and heartbeat defuncts connection

0 commit comments

Comments
 (0)