Skip to content

Commit ed32e37

Browse files
john-westcott-ivjshimkus-rhAlex-Izquierdo
authored
Redis cluster error handling (#1066)
Co-authored-by: Joe Shimkus <[email protected]> Co-authored-by: Alex <[email protected]>
1 parent e1912cf commit ed32e37

File tree

3 files changed

+166
-2
lines changed

3 files changed

+166
-2
lines changed

src/aap_eda/core/management/commands/scheduler.py

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,13 @@
7070
https://github.com/rq/rq-scheduler/blob/master/README.rst
7171
"""
7272
import logging
73+
import re
7374
from datetime import datetime
75+
from time import sleep
7476

7577
import django_rq
78+
import redis
79+
from ansible_base.lib.redis.client import DABRedisCluster
7680
from django.conf import settings
7781
from django_rq.management.commands import rqscheduler
7882
from rq_scheduler import Scheduler
@@ -149,4 +153,73 @@ def handle(self, *args, **options) -> None:
149153
add_startup_jobs(scheduler)
150154
add_periodic_jobs(scheduler)
151155
add_cron_jobs(scheduler)
152-
super().handle(*args, **options)
156+
# We are going to start our own loop here to catch exceptions which
157+
# might be coming from a redis cluster and retrying things.
158+
while True:
159+
try:
160+
super().handle(*args, **options)
161+
except (
162+
redis.exceptions.TimeoutError,
163+
redis.exceptions.ClusterDownError,
164+
redis.exceptions.ConnectionError,
165+
) as e:
166+
# If we got one of these exceptions but are not on a Cluster go
167+
# ahead and raise it normally.
168+
if not isinstance(scheduler.connection, DABRedisCluster):
169+
raise
170+
171+
# There are a lot of different exceptions that inherit from
172+
# ConnectionError. So we need to make sure if we got that its
173+
# an actual ConnectionError. If not, go ahead and raise it.
174+
# Note: ClusterDownError and TimeoutError are not subclasses
175+
# of ConnectionError.
176+
if (
177+
isinstance(e, redis.exceptions.ConnectionError)
178+
and type(e) is not redis.exceptions.ConnectionError
179+
):
180+
raise
181+
182+
downed_node_ip = re.findall(
183+
r"[0-9]+(?:\.[0-9]+){3}:[0-9]+", str(e)
184+
)
185+
186+
# If we got a cluster issue we will loop here until we can ping
187+
# the server again.
188+
max_backoff = 60
189+
current_backoff = 1
190+
while True:
191+
if current_backoff > max_backoff:
192+
# Maybe we just got a network glitch and are waiting
193+
# for a cluster member to fail when its not going to.
194+
# At this point we've waited for 60 secs so lets go
195+
# ahead and let the scheduler try and restart.
196+
logger.error(
197+
"Connection to redis is still down "
198+
"going to attempt to restart scheduler"
199+
)
200+
break
201+
202+
backoff = min(current_backoff, max_backoff)
203+
logger.error(
204+
f"Connection to redis cluster failed. Attempting to "
205+
f"reconnect in {backoff}"
206+
)
207+
sleep(backoff)
208+
current_backoff = 2 * current_backoff
209+
try:
210+
if downed_node_ip:
211+
cluster_nodes = (
212+
scheduler.connection.cluster_nodes()
213+
)
214+
for ip in downed_node_ip:
215+
if "fail" not in cluster_nodes[ip]["flags"]:
216+
raise Exception(
217+
"Failed node is not yet in a failed "
218+
"state"
219+
)
220+
else:
221+
scheduler.connection.ping()
222+
break
223+
# We could tighten this exception up
224+
except Exception:
225+
pass

src/aap_eda/core/tasking/__init__.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import logging
55
from datetime import datetime, timedelta
6+
from time import sleep
67
from types import MethodType
78
from typing import (
89
Any,
@@ -330,6 +331,7 @@ def __init__(
330331
prepare_for_work=prepare_for_work,
331332
serializer=JSONSerializer,
332333
)
334+
self.is_shutting_down = False
333335

334336
def _set_connection(
335337
self,
@@ -411,6 +413,91 @@ def handle_job_success(
411413

412414
pipeline.execute()
413415

416+
def handle_warm_shutdown_request(self):
417+
self.is_shutting_down = True
418+
super().handle_warm_shutdown_request()
419+
420+
# We are going to override the work function to create our own loop.
421+
# This will allow us to catch exceptions that the default work method will
422+
# not handle and restart our worker process if we hit them.
423+
def work(
424+
self,
425+
burst: bool = False,
426+
logging_level: str = "INFO",
427+
date_format: str = rq.defaults.DEFAULT_LOGGING_DATE_FORMAT,
428+
log_format: str = rq.defaults.DEFAULT_LOGGING_FORMAT,
429+
max_jobs: Optional[int] = None,
430+
with_scheduler: bool = False,
431+
) -> bool:
432+
while True:
433+
# super.work() returns a value that we want to return on a normal
434+
# exit.
435+
return_value = None
436+
try:
437+
return_value = super().work(
438+
burst,
439+
logging_level,
440+
date_format,
441+
log_format,
442+
max_jobs,
443+
with_scheduler,
444+
)
445+
except (
446+
redis.exceptions.TimeoutError,
447+
redis.exceptions.ClusterDownError,
448+
redis.exceptions.ConnectionError,
449+
) as e:
450+
# If we got one of these exceptions but are not on a Cluster go
451+
# ahead and raise it normally.
452+
if not isinstance(self.connection, DABRedisCluster):
453+
raise
454+
455+
# There are a lot of different exceptions that inherit from
456+
# ConnectionError. So we need to make sure if we got that its
457+
# an actual ConnectionError. If not, go ahead and raise it.
458+
# Note: ClusterDownError and TimeoutError are not subclasses
459+
# of ConnectionError.
460+
if (
461+
isinstance(e, redis.exceptions.ConnectionError)
462+
and type(e) is not redis.exceptions.ConnectionError
463+
):
464+
raise
465+
466+
# If we got a cluster issue we will loop here until we can ping
467+
# the server again.
468+
max_backoff = 60
469+
current_backoff = 1
470+
while True:
471+
backoff = min(current_backoff, max_backoff)
472+
logger.error(
473+
f"Connection to redis cluster failed. Attempting to "
474+
f"reconnect in {backoff}"
475+
)
476+
sleep(backoff)
477+
current_backoff = 2 * current_backoff
478+
try:
479+
self.connection.ping()
480+
break
481+
# We could tighten this exception up.
482+
except Exception:
483+
pass
484+
# At this point return value is none so we are going to go
485+
# ahead and fall through to the loop to restart.
486+
487+
# We are outside of the work function with either:
488+
# a "normal exist"
489+
# an exit that did not raise an exception
490+
if return_value:
491+
logger.debug(f"Working exited normally with {return_value}")
492+
return return_value
493+
elif self.is_shutting_down:
494+
# Get got a warm shutdown request, lets respect it
495+
return return_value
496+
else:
497+
logger.error(
498+
"Work exited no return value, going to restart the worker"
499+
)
500+
414501

415502
class DefaultWorker(Worker):
416503
"""Custom default worker class used for non-activation tasks.

src/aap_eda/settings/default.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,10 +453,14 @@ def rq_redis_client_instantiation_parameters():
453453
params["socket_connect_timeout"] = settings.get(
454454
"MQ_SOCKET_CONNECT_TIMEOUT", 10
455455
)
456-
params["socket_timeout"] = settings.get("MQ_SOCKET_TIMEOUT", 10)
456+
params["socket_timeout"] = settings.get("MQ_SOCKET_TIMEOUT", 150)
457457
params["cluster_error_retry_attempts"] = settings.get(
458458
"MQ_CLUSTER_ERROR_RETRY_ATTEMPTS", 3
459459
)
460+
from redis.backoff import ConstantBackoff
461+
from redis.retry import Retry
462+
463+
params["retry"] = Retry(backoff=ConstantBackoff(3), retries=20)
460464
return params
461465

462466

0 commit comments

Comments
 (0)