Skip to content

Commit a417a4a

Browse files
committed
Updates
1 parent 5406bc6 commit a417a4a

File tree

2 files changed

+14
-26
lines changed

2 files changed

+14
-26
lines changed

hazelcast/internal/asyncio_connection.py

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ def __init__(
173173
self, self._client, config, reactor, invocation_service
174174
)
175175
self._connection_listeners = []
176-
self._connect_all_members_timer = None
176+
self._connect_all_members_task: asyncio.Task | None = None
177177
self._async_start = config.async_start
178178
self._connect_to_cluster_thread_running = False
179179
self._shuffle_member_list = config.shuffle_member_list
@@ -274,8 +274,8 @@ async def shutdown(self):
274274
return
275275

276276
self.live = False
277-
if self._connect_all_members_timer:
278-
self._connect_all_members_timer.cancel()
277+
if self._connect_all_members_task:
278+
self._connect_all_members_task.cancel()
279279

280280
self._heartbeat_manager.shutdown()
281281

@@ -458,6 +458,7 @@ def _start_connect_all_members_timer(self):
458458
connecting_uuids = set()
459459

460460
async def run():
461+
await asyncio.sleep(1)
461462
if not self._lifecycle_service.running:
462463
return
463464

@@ -479,13 +480,9 @@ async def run():
479480
for item in member_uuids:
480481
connecting_uuids.discard(item)
481482

482-
self._connect_all_members_timer = self._reactor.add_timer(
483-
1, lambda: asyncio.create_task(run())
484-
)
483+
self._connect_all_members_task = asyncio.create_task(run())
485484

486-
self._connect_all_members_timer = self._reactor.add_timer(
487-
1, lambda: asyncio.create_task(run())
488-
)
485+
self._connect_all_members_task = asyncio.create_task(run())
489486

490487
async def _connect_to_cluster(self):
491488
await self._sync_connect_to_cluster()
@@ -816,12 +813,14 @@ def __init__(self, connection_manager, client, config, reactor, invocation_servi
816813
self._invocation_service = invocation_service
817814
self._heartbeat_timeout = config.heartbeat_timeout
818815
self._heartbeat_interval = config.heartbeat_interval
819-
self._heartbeat_timer = None
816+
self._heartbeat_task: asyncio.Task | None = None
820817

821818
def start(self):
822819
"""Starts sending periodic HeartBeat operations."""
823820

824821
async def _heartbeat():
822+
await asyncio.sleep(self._heartbeat_interval)
823+
_logger.debug("heartbeat")
825824
conn_manager = self._connection_manager
826825
if not conn_manager.live:
827826
return
@@ -830,18 +829,14 @@ async def _heartbeat():
830829
async with asyncio.TaskGroup() as tg:
831830
for connection in list(conn_manager.active_connections.values()):
832831
tg.create_task(self._check_connection(now, connection))
833-
self._heartbeat_timer = self._reactor.add_timer(
834-
self._heartbeat_interval, lambda: asyncio.create_task(_heartbeat())
835-
)
832+
self._heartbeat_task = asyncio.create_task(_heartbeat())
836833

837-
self._heartbeat_timer = self._reactor.add_timer(
838-
self._heartbeat_interval, lambda: asyncio.create_task(_heartbeat())
839-
)
834+
self._heartbeat_task = asyncio.create_task(_heartbeat())
840835

841836
def shutdown(self):
842837
"""Stops HeartBeat operations."""
843-
if self._heartbeat_timer:
844-
self._heartbeat_timer.cancel()
838+
if self._heartbeat_task:
839+
self._heartbeat_task.cancel()
845840

846841
async def _check_connection(self, now, connection):
847842
if not connection.live:
@@ -858,7 +853,7 @@ async def _check_connection(self, now, connection):
858853
if (now - connection.last_write_time) > self._heartbeat_interval:
859854
request = client_ping_codec.encode_request()
860855
invocation = Invocation(request, connection=connection, urgent=True)
861-
self._invocation_service.invoke(invocation)
856+
asyncio.create_task(self._invocation_service.ainvoke(invocation))
862857

863858

864859
_frame_header = struct.Struct("<iH")

hazelcast/internal/asyncio_reactor.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ class AsyncioReactor:
1717
def __init__(self, loop: AbstractEventLoop | None = None):
1818
self._is_live = False
1919
self._loop = loop or asyncio.get_running_loop()
20-
self._bytes_lock = asyncio.Lock()
2120
self._bytes_sent = 0
2221
self._bytes_received = 0
2322

@@ -46,17 +45,11 @@ async def connection_factory(
4645
)
4746

4847
def update_bytes_sent(self, sent: int):
49-
# with self._bytes_lock:
5048
self._bytes_sent += sent
5149

5250
def update_bytes_received(self, received: int):
53-
# with self._bytes_lock:
5451
self._bytes_received += received
5552

56-
# def _asyncio_loop(self):
57-
# asyncio.set_event_loop(self._loop)
58-
# self._loop.run_forever()
59-
6053

6154
class AsyncioConnection(Connection):
6255
def __init__(

0 commit comments

Comments
 (0)