diff --git a/hazelcast/internal/asyncio_connection.py b/hazelcast/internal/asyncio_connection.py index d1d4aa9a90..78919b68b7 100644 --- a/hazelcast/internal/asyncio_connection.py +++ b/hazelcast/internal/asyncio_connection.py @@ -298,9 +298,12 @@ async def connect_to_all_cluster_members(self, sync_start): return if sync_start: - async with asyncio.TaskGroup() as tg: - for member in self._cluster_service.get_members(): - tg.create_task(self._get_or_connect_to_member(member)) + try: + async with asyncio.TaskGroup() as tg: + for member in self._cluster_service.get_members(): + tg.create_task(self._get_or_connect_to_member(member)) + except Exception as e: + _logger.debug("Error during connection to all members: %s", str(e)) self._start_connect_all_members_timer() @@ -393,15 +396,20 @@ async def _get_or_connect_to_address(self, address): return connection async def _get_or_connect_to_member(self, member): - connection = self.active_connections.get(member.uuid, None) - if connection: - return connection + try: + connection = self.active_connections.get(member.uuid, None) + if connection: + return connection + + translated = await self._translate_member_address(member) + connection = self._create_connection(translated) + await connection._create_task + response = self._authenticate(connection) + await self._on_auth(response, connection) + except Exception as e: + _logger.exception("Connecting to member %(member)s", {"member": member}) + return - translated = await self._translate_member_address(member) - connection = self._create_connection(translated) - await connection._create_task - response = self._authenticate(connection) - await self._on_auth(response, connection) return connection def _create_connection(self, address): diff --git a/hazelcast/internal/asyncio_reactor.py b/hazelcast/internal/asyncio_reactor.py index 7907e7961c..f08e1bde55 100644 --- a/hazelcast/internal/asyncio_reactor.py +++ b/hazelcast/internal/asyncio_reactor.py @@ -120,20 +120,42 @@ async def _create_connection(self, config, address): server_hostname=server_hostname, sock=sock, ) + self._connected = True + try: sock.getpeername() except OSError as err: if err.errno not in (errno.ENOTCONN, errno.EINVAL): raise self._connected = False - else: - self._connected = True sock, self._proto = res sock = sock.get_extra_info("socket") sockname = sock.getsockname() host, port = sockname[0], sockname[1] self.local_address = Address(host, port) + self._connect_timer_task = None + if not self._connected: + self._connect_timer_task = self._loop.create_task( + self._connect_retry_cb(0.01, self._sock, (address.host, address.port)) + ) + + async def _connect_retry_cb(self, timeout, sock, address): + await asyncio.sleep(timeout) + if self._connected and self._close_task: + self._close_task.cancel() + return + try: + self.connect(sock, address) + except Exception: + # close task will handle closing the connection + return + if not self._connected: + self._connect_timer_task = self._loop.create_task( + self._connect_retry_cb(timeout, sock, address) + ) + elif self._close_task: + self._close_task.cancel() def connect(self, sock, address): self._connected = False @@ -171,6 +193,8 @@ def handle_connect(self): async def _close_timer_cb(self, timeout): await asyncio.sleep(timeout) if not self._connected: + if self._connect_timer_task: + self._connect_timer_task.cancel() await self.close_connection(None, IOError("Connection timed out")) def _write(self, buf):