Skip to content

Commit b05f39a

Browse files
triklozoidmcanufarioas
authored
fix: PLT-843: Webhook job memory optimization (#8136)
Co-authored-by: Marcel Canu <[email protected]> Co-authored-by: triklozoid <[email protected]> Co-authored-by: farioas <[email protected]>
1 parent 11a13de commit b05f39a

File tree

3 files changed

+153
-16
lines changed

3 files changed

+153
-16
lines changed

label_studio/core/settings/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,7 @@ def collect_versions_dummy(**kwargs):
650650
COLLECT_VERSIONS = collect_versions_dummy
651651

652652
WEBHOOK_TIMEOUT = float(get_env('WEBHOOK_TIMEOUT', 1.0))
653-
WEBHOOK_BATCH_SIZE = int(get_env('WEBHOOK_BATCH_SIZE', 100))
653+
WEBHOOK_BATCH_SIZE = int(get_env('WEBHOOK_BATCH_SIZE', 5000))
654654
WEBHOOK_SERIALIZERS = {
655655
'project': 'webhooks.serializers_for_hooks.ProjectWebhookSerializer',
656656
'task': 'webhooks.serializers_for_hooks.TaskWebhookSerializer',

label_studio/tests/webhooks/test_webhooks.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,3 +440,96 @@ def test_start_training_webhook(setup_project_dialog, ml_start_training_webhook,
440440
assert request_history[0].url == webhook.url
441441
assert 'project' in request_history[0].json()
442442
assert request_history[0].json()['action'] == 'START_TRAINING'
443+
444+
445+
@pytest.mark.django_db
446+
def test_webhook_batching_with_feature_flag(configured_project, organization_webhook):
447+
"""Test that webhooks are sent in batches when feature flag is enabled."""
448+
from unittest.mock import patch
449+
450+
from django.conf import settings
451+
from tasks.models import Task
452+
from webhooks.utils import emit_webhooks_for_instance_sync
453+
454+
webhook = organization_webhook
455+
project = configured_project
456+
457+
# Create multiple tasks to test batching
458+
tasks = []
459+
for i in range(250): # Create more than WEBHOOK_BATCH_SIZE
460+
task = Task.objects.create(data={'text': f'Test task {i}'}, project=project)
461+
tasks.append(task)
462+
463+
# Test with feature flag enabled
464+
with patch('webhooks.utils.flag_set') as mock_flag_set:
465+
mock_flag_set.return_value = True
466+
467+
with requests_mock.Mocker(real_http=True) as m:
468+
m.register_uri('POST', webhook.url)
469+
470+
# Set batch size to 100 for testing
471+
original_batch_size = getattr(settings, 'WEBHOOK_BATCH_SIZE', 100)
472+
settings.WEBHOOK_BATCH_SIZE = 100
473+
474+
try:
475+
emit_webhooks_for_instance_sync(
476+
webhook.organization, webhook.project, WebhookAction.TASKS_CREATED, instance=tasks
477+
)
478+
479+
# Should have 3 requests (250 tasks / 100 batch size = 3 batches)
480+
webhook_requests = list(filter(lambda x: x.url == webhook.url, m.request_history))
481+
assert len(webhook_requests) == 3
482+
483+
# Check first batch has 100 tasks
484+
first_batch = webhook_requests[0].json()
485+
assert 'tasks' in first_batch
486+
assert len(first_batch['tasks']) == 100
487+
488+
# Check second batch has 100 tasks
489+
second_batch = webhook_requests[1].json()
490+
assert len(second_batch['tasks']) == 100
491+
492+
# Check third batch has 50 tasks (remaining)
493+
third_batch = webhook_requests[2].json()
494+
assert len(third_batch['tasks']) == 50
495+
496+
finally:
497+
settings.WEBHOOK_BATCH_SIZE = original_batch_size
498+
499+
500+
@pytest.mark.django_db
501+
def test_webhook_no_batching_without_feature_flag(configured_project, organization_webhook):
502+
"""Test that webhooks are sent in single request when feature flag is disabled."""
503+
from unittest.mock import patch
504+
505+
from tasks.models import Task
506+
from webhooks.utils import emit_webhooks_for_instance_sync
507+
508+
webhook = organization_webhook
509+
project = configured_project
510+
511+
# Create multiple tasks
512+
tasks = []
513+
for i in range(150):
514+
task = Task.objects.create(data={'text': f'Test task {i}'}, project=project)
515+
tasks.append(task)
516+
517+
# Test with feature flag disabled
518+
with patch('webhooks.utils.flag_set') as mock_flag_set:
519+
mock_flag_set.return_value = False
520+
521+
with requests_mock.Mocker(real_http=True) as m:
522+
m.register_uri('POST', webhook.url)
523+
524+
emit_webhooks_for_instance_sync(
525+
webhook.organization, webhook.project, WebhookAction.TASKS_CREATED, instance=tasks
526+
)
527+
528+
# Should have only 1 request (no batching)
529+
webhook_requests = list(filter(lambda x: x.url == webhook.url, m.request_history))
530+
assert len(webhook_requests) == 1
531+
532+
# Check all 150 tasks are in single request
533+
request_data = webhook_requests[0].json()
534+
assert 'tasks' in request_data
535+
assert len(request_data['tasks']) == 150

label_studio/webhooks/utils.py

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77
from core.utils.common import load_func
88
from django.conf import settings
99
from django.db.models import Q
10+
from django.db.models.query import QuerySet
1011

1112
from .models import Webhook, WebhookAction
1213

14+
logger = logging.getLogger(__name__)
15+
1316

1417
def get_active_webhooks(organization, project, action):
1518
"""Return all active webhooks for organization or project by action.
@@ -71,37 +74,78 @@ def emit_webhooks_sync(organization, project, action, payload):
7174
run_webhook_sync(wh, action, payload)
7275

7376

74-
def emit_webhooks_for_instance_sync(organization, project, action, instance=None):
75-
"""Run all active webhooks for the action using instances as payload.
77+
def _process_webhook_batch(webhooks, project, action, batch, action_meta):
78+
"""Process a single batch of instances for webhooks.
7679
77-
Be sure WebhookAction.ACTIONS contains all required fields.
80+
Args:
81+
webhooks: Active webhooks to send
82+
project: Project instance (optional)
83+
action: Action name
84+
batch: Batch of instances to process
85+
action_meta: Action metadata from WebhookAction.ACTIONS
7886
"""
79-
webhooks = get_active_webhooks(organization, project, action)
80-
if not webhooks.exists():
81-
return
8287
payload = {}
83-
# if instances and there is a webhook that sends payload
84-
# get serialized payload
85-
action_meta = WebhookAction.ACTIONS[action]
8688

87-
if instance and isinstance(instance, list) and isinstance(instance[0], int):
88-
instance = action_meta['model'].objects.filter(id__in=instance)
89-
90-
if instance and webhooks.filter(send_payload=True).exists():
89+
if batch and webhooks.filter(send_payload=True).exists():
9190
serializer_class = action_meta.get('serializer')
9291
if serializer_class:
93-
payload[action_meta['key']] = serializer_class(instance=instance, many=action_meta['many']).data
92+
payload[action_meta['key']] = serializer_class(instance=batch, many=action_meta['many']).data
9493
if project and payload:
9594
payload['project'] = load_func(settings.WEBHOOK_SERIALIZERS['project'])(instance=project).data
9695
if payload and 'nested-fields' in action_meta:
9796
for key, value in action_meta['nested-fields'].items():
9897
payload[key] = value['serializer'](
99-
instance=get_nested_field(instance, value['field']), many=value['many']
98+
instance=get_nested_field(batch, value['field']), many=value['many']
10099
).data
100+
101101
for wh in webhooks:
102102
run_webhook_sync(wh, action, payload)
103103

104104

105+
def emit_webhooks_for_instance_sync(organization, project, action, instance=None):
106+
"""Run all active webhooks for the action using instances as payload.
107+
108+
Be sure WebhookAction.ACTIONS contains all required fields.
109+
"""
110+
webhooks = get_active_webhooks(organization, project, action)
111+
if not webhooks.exists():
112+
return
113+
114+
action_meta = WebhookAction.ACTIONS[action]
115+
116+
# Convert list of IDs to queryset
117+
if instance and isinstance(instance, list) and isinstance(instance[0], int):
118+
instance = action_meta['model'].objects.filter(id__in=instance)
119+
120+
# Check if batching is needed
121+
is_batch_collection = isinstance(instance, (list, QuerySet))
122+
use_batching = is_batch_collection and flag_set('fflag_fix_back_plt_843_webhook_memory_improvement_12082025_short')
123+
124+
if use_batching:
125+
# Process in batches
126+
batch_size = settings.WEBHOOK_BATCH_SIZE
127+
128+
if isinstance(instance, QuerySet):
129+
# For QuerySets, use iterator with chunk_size
130+
total_count = instance.count()
131+
logger.debug(f'Processing webhook for {total_count} instances in batches of {batch_size}')
132+
for i in range(0, total_count, batch_size):
133+
batch = instance[i : i + batch_size]
134+
logger.debug(f'Processing batch {i // batch_size + 1} with {batch.count()} instances')
135+
_process_webhook_batch(webhooks, project, action, batch, action_meta)
136+
else:
137+
# For lists, slice directly
138+
total_count = len(instance)
139+
logger.debug(f'Processing webhook for {total_count} instances in batches of {batch_size}')
140+
for i in range(0, len(instance), batch_size):
141+
batch = instance[i : i + batch_size]
142+
logger.debug(f'Processing batch {i // batch_size + 1} with {len(batch)} instances')
143+
_process_webhook_batch(webhooks, project, action, batch, action_meta)
144+
else:
145+
# Original behavior - process all at once
146+
_process_webhook_batch(webhooks, project, action, instance, action_meta)
147+
148+
105149
def run_webhook(webhook, action, payload=None):
106150
"""Run one webhook for action.
107151

0 commit comments

Comments
 (0)