Skip to content

Commit 41cfb02

Browse files
authored
Merge pull request #182 from DrDroidLab/prateek/fix/redis_reliability
fix: Improve Redis connection reliability and Kubernetes deployment s…
2 parents d1c95a7 + b818cc5 commit 41cfb02

File tree

8 files changed

+219
-17
lines changed

8 files changed

+219
-17
lines changed

agent/settings.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,20 @@ def load_yaml(filepath, native_k8s_connector_mode=False):
114114
# Celery Configuration Options
115115
CELERY_BROKER_URL = env.str('CELERY_BROKER_URL', default='redis://localhost:6379/0')
116116
CELERY_RESULT_BACKEND = env.str('CELERY_RESULT_BACKEND', default='redis://localhost:6379/0')
117+
118+
# Broker connection reliability settings
119+
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = env.bool('CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP', default=True)
120+
CELERY_BROKER_CONNECTION_MAX_RETRIES = env.int('CELERY_BROKER_CONNECTION_MAX_RETRIES', default=10)
121+
CELERY_BROKER_CONNECTION_RETRY = True
122+
CELERY_BROKER_POOL_LIMIT = 10
123+
CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS = {
124+
'master_name': 'mymaster',
125+
'socket_keepalive': True,
126+
'socket_connect_timeout': 5,
127+
'socket_timeout': 5,
128+
'retry_on_timeout': True,
129+
'health_check_interval': 30,
130+
}
117131
CELERY_ACCEPT_CONTENT = ['application/json']
118132
CELERY_RESULT_SERIALIZER = 'json'
119133
CELERY_TASK_SERIALIZER = 'json'

agent/tasks.py

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
import threading
23

34
import requests
45
from celery import shared_task
@@ -13,19 +14,59 @@
1314
K8S_API_TIMEOUT = 30
1415
HTTP_REQUEST_TIMEOUT = 30
1516

17+
# Kubernetes client cache (singleton pattern)
18+
_k8s_clients_cache = {}
19+
_k8s_config_lock = threading.Lock()
20+
_k8s_config_loaded = False
1621

17-
def get_pod_details(namespace='drdroid'):
18-
# Import kubernetes here to avoid loading at module import time
19-
from kubernetes import client, config
2022

21-
try:
22-
# Try to load in-cluster config first, then local config
23-
try:
24-
config.load_incluster_config()
25-
except config.ConfigException:
26-
config.load_kube_config()
23+
def _get_k8s_clients():
24+
"""
25+
Get or create singleton Kubernetes API clients.
26+
Reuses the same client instances to prevent connection leaks.
27+
Thread-safe for Celery workers.
28+
"""
29+
global _k8s_config_loaded, _k8s_clients_cache
30+
31+
# Fast path: clients already initialized
32+
if _k8s_clients_cache:
33+
return _k8s_clients_cache['v1'], _k8s_clients_cache['custom']
34+
35+
# Slow path: initialize clients (thread-safe)
36+
with _k8s_config_lock:
37+
# Double-check after acquiring lock
38+
if _k8s_clients_cache:
39+
return _k8s_clients_cache['v1'], _k8s_clients_cache['custom']
40+
41+
from kubernetes import client, config
42+
43+
# Load Kubernetes config only once
44+
if not _k8s_config_loaded:
45+
try:
46+
config.load_incluster_config()
47+
logger.info("Loaded in-cluster Kubernetes config")
48+
except config.ConfigException:
49+
config.load_kube_config()
50+
logger.info("Loaded local Kubernetes config")
51+
_k8s_config_loaded = True
2752

28-
v1 = client.CoreV1Api()
53+
# Create singleton API clients
54+
_k8s_clients_cache['v1'] = client.CoreV1Api()
55+
_k8s_clients_cache['custom'] = client.CustomObjectsApi()
56+
57+
logger.info("Initialized Kubernetes API clients (singleton)")
58+
59+
return _k8s_clients_cache['v1'], _k8s_clients_cache['custom']
60+
61+
62+
def get_pod_details(namespace='drdroid'):
63+
"""
64+
Fetch pod details from Kubernetes API.
65+
Uses cached API clients to prevent connection leaks.
66+
"""
67+
try:
68+
# Get singleton clients (reuses existing instances)
69+
v1, custom_api = _get_k8s_clients()
2970

3071
# Get all pods in the namespace with timeout
3172
pods = v1.list_namespaced_pod(namespace, _request_timeout=K8S_API_TIMEOUT)
@@ -34,7 +75,6 @@ def get_pod_details(namespace='drdroid'):
3475
# Get pod metrics for CPU/memory usage
3576
pod_metrics = {}
3677
try:
37-
custom_api = client.CustomObjectsApi()
3878
metrics = custom_api.list_namespaced_custom_object(
3979
group="metrics.k8s.io",
4080
version="v1beta1",

helm/charts/celery_beat/templates/deployment.yaml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,26 @@ spec:
2525
tolerations:
2626
{{- toYaml .Values.global.tolerations | nindent 8 }}
2727
{{- end }}
28+
initContainers:
29+
- name: wait-for-redis
30+
image: busybox:1.36
31+
command:
32+
- sh
33+
- -c
34+
- |
35+
echo "Waiting for Redis to be ready..."
36+
until nc -z redis-service 6379; do
37+
echo "Redis is unavailable - sleeping"
38+
sleep 2
39+
done
40+
echo "Redis is ready!"
41+
resources:
42+
requests:
43+
cpu: "10m"
44+
memory: "16Mi"
45+
limits:
46+
cpu: "50m"
47+
memory: "32Mi"
2848
containers:
2949
- name: celery-beat
3050
image: {{ .Values.image.repository }}:{{ .Values.image.tag }}
@@ -45,6 +65,10 @@ spec:
4565
value: {{ .Values.global.DRD_CLOUD_API_HOST | quote }}
4666
- name: NATIVE_KUBERNETES_API_MODE
4767
value: {{ .Values.global.NATIVE_KUBERNETES_API_MODE | quote }}
68+
- name: CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP
69+
value: "true"
70+
- name: CELERY_BROKER_CONNECTION_MAX_RETRIES
71+
value: "10"
4872
volumeMounts:
4973
- name: credentials-volume
5074
mountPath: /code/credentials/secrets.yaml
@@ -56,6 +80,26 @@ spec:
5680
limits:
5781
cpu: "500m"
5882
memory: "512Mi"
83+
livenessProbe:
84+
exec:
85+
command:
86+
- /bin/sh
87+
- -c
88+
- "test -f /code/celerybeat.pid && ps -p $(cat /code/celerybeat.pid) > /dev/null"
89+
initialDelaySeconds: 30
90+
periodSeconds: 30
91+
timeoutSeconds: 5
92+
failureThreshold: 3
93+
startupProbe:
94+
exec:
95+
command:
96+
- /bin/sh
97+
- -c
98+
- "test -f /code/celerybeat.pid"
99+
initialDelaySeconds: 15
100+
periodSeconds: 5
101+
timeoutSeconds: 3
102+
failureThreshold: 12
59103
volumes:
60104
- name: credentials-volume
61105
configMap:

helm/charts/celery_worker/templates/deployment.yaml

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,26 @@ spec:
2525
tolerations:
2626
{{- toYaml .Values.global.tolerations | nindent 8 }}
2727
{{- end }}
28+
initContainers:
29+
- name: wait-for-redis
30+
image: busybox:1.36
31+
command:
32+
- sh
33+
- -c
34+
- |
35+
echo "Waiting for Redis to be ready..."
36+
until nc -z redis-service 6379; do
37+
echo "Redis is unavailable - sleeping"
38+
sleep 2
39+
done
40+
echo "Redis is ready!"
41+
resources:
42+
requests:
43+
cpu: "10m"
44+
memory: "16Mi"
45+
limits:
46+
cpu: "50m"
47+
memory: "32Mi"
2848
containers:
2949
- name: celery-worker-scheduler # Lightweight task scheduler
3050
image: {{ .Values.image.repository }}:{{ .Values.image.tag }}
@@ -49,6 +69,10 @@ spec:
4969
value: "celery"
5070
- name: CELERY_WORKER_COUNT
5171
value: "4"
72+
- name: CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP
73+
value: "true"
74+
- name: CELERY_BROKER_CONNECTION_MAX_RETRIES
75+
value: "10"
5276
volumeMounts:
5377
- name: credentials-volume
5478
mountPath: /code/credentials/secrets.yaml
@@ -60,6 +84,26 @@ spec:
6084
limits:
6185
cpu: {{ .Values.resources.scheduler.limits.cpu | quote }}
6286
memory: {{ .Values.resources.scheduler.limits.memory | quote }}
87+
startupProbe:
88+
exec:
89+
command:
90+
- /bin/sh
91+
- -c
92+
- "celery -A agent inspect ping -d celery@$HOSTNAME -t 5"
93+
initialDelaySeconds: 30
94+
periodSeconds: 10
95+
timeoutSeconds: 5
96+
failureThreshold: 12
97+
livenessProbe:
98+
exec:
99+
command:
100+
- /bin/sh
101+
- -c
102+
- "celery -A agent inspect ping -d celery@$HOSTNAME -t 5"
103+
initialDelaySeconds: 30
104+
periodSeconds: 30
105+
timeoutSeconds: 10
106+
failureThreshold: 3
63107

64108
- name: celery-worker-task-executor # Task executor for high-priority tasks
65109
image: {{ .Values.image.repository }}:{{ .Values.image.tag }}
@@ -84,6 +128,10 @@ spec:
84128
value: "exec"
85129
- name: CELERY_WORKER_COUNT
86130
value: "4"
131+
- name: CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP
132+
value: "true"
133+
- name: CELERY_BROKER_CONNECTION_MAX_RETRIES
134+
value: "10"
87135
volumeMounts:
88136
- name: credentials-volume
89137
mountPath: /code/credentials/secrets.yaml
@@ -95,6 +143,26 @@ spec:
95143
limits:
96144
cpu: {{ .Values.resources.taskExecutor.limits.cpu | quote }}
97145
memory: {{ .Values.resources.taskExecutor.limits.memory | quote }}
146+
startupProbe:
147+
exec:
148+
command:
149+
- /bin/sh
150+
- -c
151+
- "celery -A agent inspect ping -d celery@$HOSTNAME -t 5"
152+
initialDelaySeconds: 30
153+
periodSeconds: 10
154+
timeoutSeconds: 5
155+
failureThreshold: 12
156+
livenessProbe:
157+
exec:
158+
command:
159+
- /bin/sh
160+
- -c
161+
- "celery -A agent inspect ping -d celery@$HOSTNAME -t 5"
162+
initialDelaySeconds: 30
163+
periodSeconds: 30
164+
timeoutSeconds: 10
165+
failureThreshold: 3
98166

99167
- name: celery-worker-asset-extractor # Task executor for asset extraction tasks, which run rarely and are long-running
100168
image: {{ .Values.image.repository }}:{{ .Values.image.tag }}
@@ -119,6 +187,10 @@ spec:
119187
value: "asset_extraction"
120188
- name: CELERY_WORKER_COUNT
121189
value: "3"
190+
- name: CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP
191+
value: "true"
192+
- name: CELERY_BROKER_CONNECTION_MAX_RETRIES
193+
value: "10"
122194
volumeMounts:
123195
- name: credentials-volume
124196
mountPath: /code/credentials/secrets.yaml
@@ -130,6 +202,26 @@ spec:
130202
limits:
131203
cpu: {{ .Values.resources.assetExtractor.limits.cpu | quote }}
132204
memory: {{ .Values.resources.assetExtractor.limits.memory | quote }}
205+
startupProbe:
206+
exec:
207+
command:
208+
- /bin/sh
209+
- -c
210+
- "celery -A agent inspect ping -d celery@$HOSTNAME -t 5"
211+
initialDelaySeconds: 30
212+
periodSeconds: 10
213+
timeoutSeconds: 5
214+
failureThreshold: 12
215+
livenessProbe:
216+
exec:
217+
command:
218+
- /bin/sh
219+
- -c
220+
- "celery -A agent inspect ping -d celery@$HOSTNAME -t 5"
221+
initialDelaySeconds: 30
222+
periodSeconds: 30
223+
timeoutSeconds: 10
224+
failureThreshold: 3
133225

134226
volumes:
135227
- name: credentials-volume

helm/charts/redis/templates/configmap.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ data:
1515
timeout 300
1616
1717
# Memory management
18-
maxmemory 400mb
18+
maxmemory 900mb
1919
maxmemory-policy allkeys-lru
2020
2121
# Persistence settings

helm/charts/redis/templates/deployment.yaml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ spec:
1414
labels:
1515
app: redis
1616
spec:
17+
restartPolicy: Always
1718
serviceAccountName: drd-vpc-agent
1819
{{- if .Values.global.nodeSelector }}
1920
nodeSelector:
@@ -30,11 +31,11 @@ spec:
3031
- containerPort: 6379
3132
resources:
3233
requests:
33-
memory: "128Mi"
34-
cpu: "100m"
34+
memory: "256Mi"
35+
cpu: "200m"
3536
limits:
36-
memory: "512Mi"
37-
cpu: "500m"
37+
memory: "1Gi"
38+
cpu: "1000m"
3839
livenessProbe:
3940
tcpSocket:
4041
port: 6379
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
apiVersion: policy/v1
2+
kind: PodDisruptionBudget
3+
metadata:
4+
name: redis-pdb
5+
annotations:
6+
meta.helm.sh/release-name: "{{ .Release.name }}"
7+
spec:
8+
minAvailable: 1
9+
selector:
10+
matchLabels:
11+
app: redis

scripts/start-celery-beat.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ set -o nounset
66
rm -f './celerybeat.pid'
77

88
python manage.py migrate
9-
celery -A agent beat -l INFO
9+
celery -A agent beat -l INFO --pidfile=/code/celerybeat.pid

0 commit comments

Comments
 (0)