Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions hazelcast/internal/asyncio_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,12 @@ async def connect_to_all_cluster_members(self, sync_start):
return

if sync_start:
async with asyncio.TaskGroup() as tg:
for member in self._cluster_service.get_members():
tg.create_task(self._get_or_connect_to_member(member))
try:
async with asyncio.TaskGroup() as tg:
for member in self._cluster_service.get_members():
tg.create_task(self._get_or_connect_to_member(member))
except Exception as e:
_logger.debug("Error during connection to all members: %s", str(e))

self._start_connect_all_members_timer()

Expand Down
24 changes: 22 additions & 2 deletions hazelcast/internal/asyncio_reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,38 @@ async def _create_connection(self, config, address):
server_hostname=server_hostname,
sock=sock,
)
self._connected = True

try:
sock.getpeername()
except OSError as err:
if err.errno not in (errno.ENOTCONN, errno.EINVAL):
raise
self._connected = False
else:
self._connected = True

sock, self._proto = res
sock = sock.get_extra_info("socket")
sockname = sock.getsockname()
host, port = sockname[0], sockname[1]
self.local_address = Address(host, port)
self._connect_timer_task = None
if not self._connected:
self._connect_timer_task = self._loop.create_task(
self._connect_retry_cb(0.01, self._sock, (address.host, address.port))
)

async def _connect_retry_cb(self, timeout, sock, address):
await asyncio.sleep(timeout)
if self._connected and self._close_task:
self._close_task.cancel()
return
self.connect(sock, address)
if not self._connected:
self._connect_timer_task = self._loop.create_task(
self._connect_retry_cb(timeout, sock, address)
)
elif self._close_task:
self._close_task.cancel()

def connect(self, sock, address):
self._connected = False
Expand Down Expand Up @@ -171,6 +189,8 @@ def handle_connect(self):
async def _close_timer_cb(self, timeout):
await asyncio.sleep(timeout)
if not self._connected:
if self._connect_timer_task:
self._connect_timer_task.cancel()
await self.close_connection(None, IOError("Connection timed out"))

def _write(self, buf):
Expand Down