Skip to content

Commit 58783dc

Browse files
committed
Ported more integration tests
- Fixed a bug where an asyncio Lock was tried to be acquired without a release - Store asyncio tasks in order not to lose them - Removed start/shutdown from AsyncioReactor
1 parent 2718478 commit 58783dc

File tree

5 files changed

+447
-28
lines changed

5 files changed

+447
-28
lines changed

hazelcast/asyncio/client.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,6 @@ def _init_context(self):
162162
)
163163

164164
async def _start(self):
165-
self._reactor.start()
166165
try:
167166
self._internal_lifecycle_service.start()
168167
self._invocation_service.start()
@@ -250,7 +249,6 @@ async def shutdown(self) -> None:
250249
await self._connection_manager.shutdown()
251250
self._invocation_service.shutdown()
252251
self._statistics.shutdown()
253-
self._reactor.shutdown()
254252
self._internal_lifecycle_service.fire_lifecycle_event(LifecycleState.SHUTDOWN)
255253

256254
@property

hazelcast/internal/asyncio_connection.py

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,9 @@ def __init__(
185185
self._use_public_ip = (
186186
isinstance(address_provider, DefaultAddressProvider) and config.use_public_ip
187187
)
188+
# asyncio tasks are weakly referenced
189+
# storing tasks here in order not to lose them midway
190+
self._tasks = set()
188191

189192
def add_listener(self, on_connection_opened=None, on_connection_closed=None):
190193
"""Registers a ConnectionListener.
@@ -315,22 +318,21 @@ async def on_connection_close(self, closed_connection):
315318
disconnected = False
316319
removed = False
317320
trigger_reconnection = False
318-
async with self._lock:
319-
connection = self.active_connections.get(remote_uuid, None)
320-
if connection == closed_connection:
321-
self.active_connections.pop(remote_uuid, None)
322-
removed = True
323-
_logger.info(
324-
"Removed connection to %s:%s, connection: %s",
325-
remote_address,
326-
remote_uuid,
327-
connection,
328-
)
321+
connection = self.active_connections.get(remote_uuid, None)
322+
if connection == closed_connection:
323+
self.active_connections.pop(remote_uuid, None)
324+
removed = True
325+
_logger.info(
326+
"Removed connection to %s:%s, connection: %s",
327+
remote_address,
328+
remote_uuid,
329+
connection,
330+
)
329331

330-
if not self.active_connections:
331-
trigger_reconnection = True
332-
if self._client_state == ClientState.INITIALIZED_ON_CLUSTER:
333-
disconnected = True
332+
if not self.active_connections:
333+
trigger_reconnection = True
334+
if self._client_state == ClientState.INITIALIZED_ON_CLUSTER:
335+
disconnected = True
334336

335337
if disconnected:
336338
self._lifecycle_service.fire_lifecycle_event(LifecycleState.DISCONNECTED)
@@ -813,6 +815,9 @@ def __init__(self, connection_manager, client, config, reactor, invocation_servi
813815
self._heartbeat_timeout = config.heartbeat_timeout
814816
self._heartbeat_interval = config.heartbeat_interval
815817
self._heartbeat_task: asyncio.Task | None = None
818+
# asyncio tasks are weakly referenced
819+
# storing tasks here in order not to lose them midway
820+
self._tasks = set()
816821

817822
def start(self):
818823
"""Starts sending periodic HeartBeat operations."""
@@ -852,7 +857,9 @@ async def _check_connection(self, now, connection):
852857
if (now - connection.last_write_time) > self._heartbeat_interval:
853858
request = client_ping_codec.encode_request()
854859
invocation = Invocation(request, connection=connection, urgent=True)
855-
asyncio.create_task(self._invocation_service.ainvoke(invocation))
860+
task = asyncio.create_task(self._invocation_service.ainvoke(invocation))
861+
self._tasks.add(task)
862+
task.add_done_callback(self._tasks.discard)
856863

857864

858865
_frame_header = struct.Struct("<iH")

hazelcast/internal/asyncio_reactor.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,13 @@
1717

1818
class AsyncioReactor:
1919
def __init__(self, loop: AbstractEventLoop | None = None):
20-
self._is_live = False
2120
self._loop = loop or asyncio.get_running_loop()
2221
self._bytes_sent = 0
2322
self._bytes_received = 0
2423

2524
def add_timer(self, delay, callback):
2625
return self._loop.call_later(delay, callback)
2726

28-
def start(self):
29-
self._is_live = True
30-
31-
def shutdown(self):
32-
if not self._is_live:
33-
return
34-
# TODO: cancel tasks
35-
3627
async def connection_factory(
3728
self, connection_manager, connection_id, address: Address, network_config, message_callback
3829
):
@@ -174,6 +165,9 @@ def __init__(self, conn: AsyncioConnection):
174165
self._write_buf_size = 0
175166
self._recv_buf = None
176167
self._alive = True
168+
# asyncio tasks are weakly referenced
169+
# storing tasks here in order not to lose them midway
170+
self._tasks: set = set()
177171

178172
def connection_made(self, transport: transports.BaseTransport):
179173
self._transport = transport
@@ -184,7 +178,9 @@ def connection_made(self, transport: transports.BaseTransport):
184178

185179
def connection_lost(self, exc):
186180
self._alive = False
187-
self._conn._loop.create_task(self._conn.close_connection(str(exc), None))
181+
task = self._conn._loop.create_task(self._conn.close_connection(str(exc), None))
182+
self._tasks.add(task)
183+
task.add_done_callback(self._tasks.discard)
188184
return False
189185

190186
def close(self):

0 commit comments

Comments
 (0)