Skip to content

Commit 6df25a4

Browse files
Merge pull request #190 from DrDroidLab/prateek/fix/memoryissue
Fixed deep copy memory leak and update work count
2 parents 305811b + 63717bf commit 6df25a4

File tree

3 files changed

+25
-29
lines changed

3 files changed

+25
-29
lines changed

helm/charts/celery_worker/templates/deployment.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ spec:
6868
- name: CELERY_QUEUE
6969
value: "celery"
7070
- name: CELERY_WORKER_COUNT
71-
value: "4"
71+
value: "2"
7272
- name: CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP
7373
value: "true"
7474
- name: CELERY_BROKER_CONNECTION_MAX_RETRIES
@@ -127,7 +127,7 @@ spec:
127127
- name: CELERY_QUEUE
128128
value: "exec"
129129
- name: CELERY_WORKER_COUNT
130-
value: "4"
130+
value: "3"
131131
- name: CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP
132132
value: "true"
133133
- name: CELERY_BROKER_CONNECTION_MAX_RETRIES
@@ -186,7 +186,7 @@ spec:
186186
- name: CELERY_QUEUE
187187
value: "asset_extraction"
188188
- name: CELERY_WORKER_COUNT
189-
value: "3"
189+
value: "1"
190190
- name: CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP
191191
value: "true"
192192
- name: CELERY_BROKER_CONNECTION_MAX_RETRIES

helm/values.yaml

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,29 +48,29 @@ celery-worker:
4848
# Resource configuration for celery worker containers
4949
resources:
5050
scheduler:
51-
# Lightweight task scheduler - handles task distribution and scheduling
51+
# Lightweight task scheduler - handles polling tasks (reduced from 4 to 2 workers)
5252
requests:
53-
cpu: "200m"
53+
cpu: "100m"
5454
memory: "256Mi"
5555
limits:
56-
cpu: "800m"
57-
memory: "896Mi"
56+
cpu: "500m"
57+
memory: "1000Mi"
5858
taskExecutor:
59-
# Task executor for high-priority tasks - handles execution of critical tasks
59+
# Task executor for high-priority tasks - handles execution of critical tasks (reduced from 4 to 3 workers)
6060
requests:
61-
cpu: "250m"
62-
memory: "256Mi"
61+
cpu: "200m"
62+
memory: "512Mi"
6363
limits:
64-
cpu: "1500m"
65-
memory: "2Gi"
64+
cpu: "1000m"
65+
memory: "1536Mi"
6666
assetExtractor:
67-
# Asset extraction worker - handles long-running asset discovery and extraction tasks
67+
# Asset extraction worker - handles long-running asset discovery (reduced from 3 to 1 worker)
6868
requests:
69-
cpu: "250m"
70-
memory: "256Mi"
69+
cpu: "200m"
70+
memory: "512Mi"
7171
limits:
72-
cpu: "1500m"
73-
memory: "2Gi"
72+
cpu: "1000m"
73+
memory: "1536Mi"
7474

7575
redis:
76-
image: redis:7.2
76+
image: redis:7.2

playbooks_engine/tasks.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import logging
2-
import copy
32

43
import requests
54
from celery import shared_task
@@ -94,10 +93,9 @@ def _execute_asset_refresh_task(playbook_task_execution_log):
9493
logger.error(f'_execute_asset_refresh_task:: Error during asset refresh: {str(e)}')
9594
result = PlaybookTaskResult(error=StringValue(value=str(e)))
9695

97-
# Create processed log in the same format as normal playbook tasks
98-
processed_log = copy.deepcopy(playbook_task_execution_log)
96+
# Create processed log in the same format as normal playbook tasks (shallow copy is sufficient)
9997
result_dict = proto_to_dict(result)
100-
processed_log['result'] = result_dict
98+
processed_log = {**playbook_task_execution_log, 'result': result_dict}
10199

102100
# Send results using existing playbook infrastructure
103101
drd_cloud_host = settings.DRD_CLOUD_API_HOST
@@ -261,19 +259,17 @@ def execute_task_and_send_result(playbook_task_execution_log):
261259
try:
262260
# Execute task
263261
results = _execute_playbook_task(task_proto, time_range, global_variable_set)
264-
265-
# Create processed logs
262+
263+
# Create processed logs (using shallow copy - deep copy is unnecessary since we only add 'result')
266264
for result in results:
267-
current_log_copy = copy.deepcopy(playbook_task_execution_log)
268265
result_dict = proto_to_dict(result)
269-
current_log_copy['result'] = result_dict
266+
current_log_copy = {**playbook_task_execution_log, 'result': result_dict}
270267
processed_logs.append(current_log_copy)
271-
268+
272269
except Exception as e:
273270
logger.error(f'execute_task_and_send_result:: Error while executing tasks: {str(e)}')
274-
current_log_copy = copy.deepcopy(playbook_task_execution_log)
275271
error_result = PlaybookTaskResult(error=StringValue(value=str(e)))
276-
current_log_copy['result'] = proto_to_dict(error_result)
272+
current_log_copy = {**playbook_task_execution_log, 'result': proto_to_dict(error_result)}
277273
processed_logs.append(current_log_copy)
278274

279275
# Send results

0 commit comments

Comments
 (0)