Skip to content

Commit 694150e

Browse files
fix: PLT-844: Dynamic batch size based on annotaion.result (#8187)
Co-authored-by: robot-ci-heartex <[email protected]> Co-authored-by: triklozoid <[email protected]>
1 parent 2a5b8ac commit 694150e

File tree

2 files changed

+116
-7
lines changed

2 files changed

+116
-7
lines changed

label_studio/projects/models.py

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1109,14 +1109,41 @@ def _update_tasks_counters_and_task_states(
11091109

11101110
return objs
11111111

1112+
def get_max_annotation_result_size(self):
1113+
"""Get the maximum annotation result size for this project"""
1114+
# For SQLite, return 0 (no annotations to consider)
1115+
if settings.DJANGO_DB == settings.DJANGO_DB_SQLITE:
1116+
return 0
1117+
1118+
# Using raw SQL to ensure we use the specific index annotation_proj_result_octlen_idx
1119+
# which is optimized for this query pattern (project_id, octet_length DESC)
1120+
with connection.cursor() as cursor:
1121+
cursor.execute(
1122+
"""
1123+
SELECT id,
1124+
octet_length(result::text) AS bytes
1125+
FROM task_completion
1126+
WHERE project_id = %s
1127+
ORDER BY octet_length(result::text) DESC
1128+
LIMIT 1
1129+
""",
1130+
[self.id],
1131+
)
1132+
1133+
row = cursor.fetchone()
1134+
if not row or not row[1]:
1135+
return 0
1136+
1137+
return row[1]
1138+
11121139
def get_task_batch_size(self):
1113-
"""Calculate optimal batch size based on task data size"""
1140+
"""Calculate optimal batch size based on task data size and annotation result size"""
11141141
# For SQLite, use default MAX_TASK_BATCH_SIZE
11151142
if settings.DJANGO_DB == settings.DJANGO_DB_SQLITE:
11161143
return settings.MAX_TASK_BATCH_SIZE
11171144

1118-
# Using raw SQL to ensure we use the specific index task_proj_octlen_idx
1119-
# which is optimized for this query pattern (project_id, octet_length DESC)
1145+
# Get maximum task data size using the optimized index
1146+
max_task_size = 0
11201147
with connection.cursor() as cursor:
11211148
cursor.execute(
11221149
"""
@@ -1131,10 +1158,17 @@ def get_task_batch_size(self):
11311158
)
11321159

11331160
row = cursor.fetchone()
1134-
if not row or not row[1]:
1135-
return settings.MAX_TASK_BATCH_SIZE
1161+
if row and row[1]:
1162+
max_task_size = row[1]
1163+
1164+
# Get maximum annotation result size using the new optimized index
1165+
max_annotation_size = self.get_max_annotation_result_size()
1166+
1167+
# Use the larger of the two sizes for batch calculation
1168+
max_data_size = max(max_task_size, max_annotation_size)
11361169

1137-
max_data_size = row[1]
1170+
if max_data_size == 0:
1171+
return settings.MAX_TASK_BATCH_SIZE
11381172

11391173
batch_size = settings.TASK_DATA_PER_BATCH // max_data_size
11401174

@@ -1143,7 +1177,11 @@ def get_task_batch_size(self):
11431177
elif batch_size < 1:
11441178
batch_size = 1
11451179

1146-
logger.info(f'Project {self.id}: max task data size {max_data_size} bytes, calculated batch size {batch_size}')
1180+
logger.info(
1181+
f'Project {self.id}: max task size {max_task_size} bytes, '
1182+
f'max annotation size {max_annotation_size} bytes, '
1183+
f'calculated batch size {batch_size}'
1184+
)
11471185
return batch_size
11481186

11491187
def __str__(self):
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Generated by Django 5.1.9 on 2025-08-18 12:00
2+
3+
from django.db import migrations
4+
from django.conf import settings
5+
from core.models import AsyncMigrationStatus
6+
from core.redis import start_job_async_or_sync
7+
import logging
8+
logger = logging.getLogger(__name__)
9+
10+
IS_SQLITE = settings.DJANGO_DB == settings.DJANGO_DB_SQLITE
11+
12+
migration_name = '0057_annotation_proj_result_octlen_idx_async'
13+
14+
sql_create_index = (
15+
'CREATE INDEX CONCURRENTLY IF NOT EXISTS annotation_proj_result_octlen_idx '
16+
'ON task_completion (project_id, octet_length(result::text) DESC) '
17+
'INCLUDE (id);'
18+
)
19+
sql_drop_index = (
20+
'DROP INDEX CONCURRENTLY IF EXISTS annotation_proj_result_octlen_idx;'
21+
)
22+
23+
def forward_migration(migration_name):
24+
migration, created = AsyncMigrationStatus.objects.get_or_create(
25+
name=migration_name,
26+
defaults={'status': AsyncMigrationStatus.STATUS_STARTED},
27+
)
28+
if not created:
29+
return
30+
31+
logger.info(f'Start async migration {migration_name}')
32+
from django.db import connection
33+
cursor = connection.cursor()
34+
cursor.execute(sql_create_index)
35+
migration.status = AsyncMigrationStatus.STATUS_FINISHED
36+
migration.save()
37+
logger.info(f'Async migration {migration_name} complete')
38+
39+
def backward_migration(migration_name):
40+
migration = AsyncMigrationStatus.objects.create(
41+
name=migration_name,
42+
status=AsyncMigrationStatus.STATUS_STARTED,
43+
)
44+
logger.info(f'Start revert of async migration {migration_name}')
45+
from django.db import connection
46+
cursor = connection.cursor()
47+
cursor.execute(sql_drop_index)
48+
migration.status = AsyncMigrationStatus.STATUS_FINISHED
49+
migration.save()
50+
logger.info(f'Async migration {migration_name} revert complete')
51+
52+
def forwards(apps, schema_editor):
53+
if IS_SQLITE:
54+
logger.info('SQLite execution')
55+
logger.info('Skipping async index creation for non-PostgreSQL databases')
56+
return
57+
58+
start_job_async_or_sync(forward_migration, migration_name=migration_name)
59+
60+
def backwards(apps, schema_editor):
61+
start_job_async_or_sync(backward_migration, migration_name=migration_name)
62+
63+
class Migration(migrations.Migration):
64+
atomic = False
65+
66+
dependencies = [
67+
("tasks", "0056_task_prediction_result_proj_gin_idx_async"),
68+
]
69+
operations = [
70+
migrations.RunPython(forwards, backwards),
71+
]

0 commit comments

Comments
 (0)