Skip to content

Commit 7a78d6a

Browse files
authored
[HZ-5309] Improve Asyncio Client's Resiliency (#766)
Asyncio client exited early during connection problems. For example, in the following situation: 1. Hazelcast cluster has more than one member. Members are able to access each other with their private IPs, but they are accessible by the client only by their public IPs. 2. The client connects to one of the members using its public IP, without enabling client public address discovery (`use_public_ip=True`). In that case, the client must route all invocations to the member it could connect. This PR fixes that for the asyncio client, by retrying to connect instead of giving up with `TargetDisconnectedError`.
1 parent e06c30d commit 7a78d6a

File tree

2 files changed

+32
-5
lines changed

2 files changed

+32
-5
lines changed

hazelcast/internal/asyncio_connection.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -298,9 +298,12 @@ async def connect_to_all_cluster_members(self, sync_start):
298298
return
299299

300300
if sync_start:
301-
async with asyncio.TaskGroup() as tg:
302-
for member in self._cluster_service.get_members():
303-
tg.create_task(self._get_or_connect_to_member(member))
301+
try:
302+
async with asyncio.TaskGroup() as tg:
303+
for member in self._cluster_service.get_members():
304+
tg.create_task(self._get_or_connect_to_member(member))
305+
except Exception as e:
306+
_logger.debug("Error during connection to all members: %s", str(e))
304307

305308
self._start_connect_all_members_timer()
306309

hazelcast/internal/asyncio_reactor.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,20 +120,42 @@ async def _create_connection(self, config, address):
120120
server_hostname=server_hostname,
121121
sock=sock,
122122
)
123+
self._connected = True
124+
123125
try:
124126
sock.getpeername()
125127
except OSError as err:
126128
if err.errno not in (errno.ENOTCONN, errno.EINVAL):
127129
raise
128130
self._connected = False
129-
else:
130-
self._connected = True
131131

132132
sock, self._proto = res
133133
sock = sock.get_extra_info("socket")
134134
sockname = sock.getsockname()
135135
host, port = sockname[0], sockname[1]
136136
self.local_address = Address(host, port)
137+
self._connect_timer_task = None
138+
if not self._connected:
139+
self._connect_timer_task = self._loop.create_task(
140+
self._connect_retry_cb(0.01, self._sock, (address.host, address.port))
141+
)
142+
143+
async def _connect_retry_cb(self, timeout, sock, address):
144+
await asyncio.sleep(timeout)
145+
if self._connected and self._close_task:
146+
self._close_task.cancel()
147+
return
148+
try:
149+
self.connect(sock, address)
150+
except Exception:
151+
# close task will handle closing the connection
152+
return
153+
if not self._connected:
154+
self._connect_timer_task = self._loop.create_task(
155+
self._connect_retry_cb(timeout, sock, address)
156+
)
157+
elif self._close_task:
158+
self._close_task.cancel()
137159

138160
def connect(self, sock, address):
139161
self._connected = False
@@ -171,6 +193,8 @@ def handle_connect(self):
171193
async def _close_timer_cb(self, timeout):
172194
await asyncio.sleep(timeout)
173195
if not self._connected:
196+
if self._connect_timer_task:
197+
self._connect_timer_task.cancel()
174198
await self.close_connection(None, IOError("Connection timed out"))
175199

176200
def _write(self, buf):

0 commit comments

Comments
 (0)