Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions hazelcast/internal/asyncio_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,12 @@ async def connect_to_all_cluster_members(self, sync_start):
return

if sync_start:
async with asyncio.TaskGroup() as tg:
for member in self._cluster_service.get_members():
tg.create_task(self._get_or_connect_to_member(member))
try:
async with asyncio.TaskGroup() as tg:
for member in self._cluster_service.get_members():
tg.create_task(self._get_or_connect_to_member(member))
except Exception as e:
_logger.debug("Error during connection to all members: %s", str(e))

self._start_connect_all_members_timer()

Expand Down
24 changes: 22 additions & 2 deletions hazelcast/internal/asyncio_reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,38 @@ async def _create_connection(self, config, address):
server_hostname=server_hostname,
sock=sock,
)
self._connected = True

try:
sock.getpeername()
except OSError as err:
if err.errno not in (errno.ENOTCONN, errno.EINVAL):
raise
self._connected = False
else:
self._connected = True

sock, self._proto = res
sock = sock.get_extra_info("socket")
sockname = sock.getsockname()
host, port = sockname[0], sockname[1]
self.local_address = Address(host, port)
self._connect_timer_task = None
if not self._connected:
self._connect_timer_task = self._loop.create_task(
self._connect_retry_cb(0.01, self._sock, (address.host, address.port))
)

async def _connect_retry_cb(self, timeout, sock, address):
await asyncio.sleep(timeout)
if self._connected and self._close_task:
self._close_task.cancel()
return
self.connect(sock, address)
if not self._connected:
self._connect_timer_task = self._loop.create_task(
self._connect_retry_cb(timeout, sock, address)
)
elif self._close_task:
self._close_task.cancel()

def connect(self, sock, address):
self._connected = False
Expand Down Expand Up @@ -171,6 +189,8 @@ def handle_connect(self):
async def _close_timer_cb(self, timeout):
await asyncio.sleep(timeout)
if not self._connected:
if self._connect_timer_task:
self._connect_timer_task.cancel()
await self.close_connection(None, IOError("Connection timed out"))

def _write(self, buf):
Expand Down