From 23ee4d1eee3014a0556c5506986c9d227f6475ad Mon Sep 17 00:00:00 2001 From: gabriel ruttner Date: Sat, 24 Jan 2026 11:01:17 -0500 Subject: [PATCH 1/2] lock only distinct --- pkg/repository/sqlcv1/olap.sql | 17 +++++++++++++---- pkg/repository/sqlcv1/olap.sql.go | 17 +++++++++++++---- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/pkg/repository/sqlcv1/olap.sql b/pkg/repository/sqlcv1/olap.sql index feda5a4870..fb5edcf694 100644 --- a/pkg/repository/sqlcv1/olap.sql +++ b/pkg/repository/sqlcv1/olap.sql @@ -788,8 +788,8 @@ WITH tenants AS ( worker_id IS NOT NULL GROUP BY tenant_id, task_id, task_inserted_at, retry_count -), locked_tasks AS ( - SELECT +), distinct_tasks_to_lock AS ( + SELECT DISTINCT ON (t.tenant_id, t.id, t.inserted_at) t.tenant_id, t.id, t.inserted_at, @@ -801,9 +801,18 @@ WITH tenants AS ( JOIN updatable_events e ON (t.tenant_id, t.id, t.inserted_at) = (e.tenant_id, e.task_id, e.task_inserted_at) - WHERE t.inserted_at >= @minInsertedAt::TIMESTAMPTZ + WHERE + t.inserted_at >= @minInsertedAt::TIMESTAMPTZ ORDER BY - t.inserted_at, t.id + t.tenant_id, t.id, t.inserted_at, t.readable_status +), locked_tasks AS ( + SELECT + dt.* + FROM + v1_tasks_olap t + JOIN + distinct_tasks_to_lock dt ON + (dt.inserted_at, dt.id) = (t.inserted_at, t.id) FOR UPDATE ), already_in_target_partition AS ( -- Check if rows already exist in the target partition (with the new readable_status) diff --git a/pkg/repository/sqlcv1/olap.sql.go b/pkg/repository/sqlcv1/olap.sql.go index ba78086b7a..cf8b06a719 100644 --- a/pkg/repository/sqlcv1/olap.sql.go +++ b/pkg/repository/sqlcv1/olap.sql.go @@ -3583,8 +3583,8 @@ WITH tenants AS ( worker_id IS NOT NULL GROUP BY tenant_id, task_id, task_inserted_at, retry_count -), locked_tasks AS ( - SELECT +), distinct_tasks_to_lock AS ( + SELECT DISTINCT ON (t.tenant_id, t.id, t.inserted_at) t.tenant_id, t.id, t.inserted_at, @@ -3596,9 +3596,18 @@ WITH tenants AS ( JOIN updatable_events e ON (t.tenant_id, t.id, t.inserted_at) = (e.tenant_id, e.task_id, e.task_inserted_at) - WHERE t.inserted_at >= $4::TIMESTAMPTZ + WHERE + t.inserted_at >= $4::TIMESTAMPTZ ORDER BY - t.inserted_at, t.id + t.tenant_id, t.id, t.inserted_at, t.readable_status +), locked_tasks AS ( + SELECT + dt.tenant_id, dt.id, dt.inserted_at, dt.readable_status, dt.retry_count, dt.max_readable_status + FROM + v1_tasks_olap t + JOIN + distinct_tasks_to_lock dt ON + (dt.inserted_at, dt.id) = (t.inserted_at, t.id) FOR UPDATE ), already_in_target_partition AS ( -- Check if rows already exist in the target partition (with the new readable_status) From 5d219965b4343d19b45076bc36f32f6c52c7f8d6 Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Sat, 24 Jan 2026 11:28:21 -0500 Subject: [PATCH 2/2] try to fix dag statuses as well --- pkg/repository/sqlcv1/olap.sql | 4 +++- pkg/repository/sqlcv1/olap.sql.go | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/repository/sqlcv1/olap.sql b/pkg/repository/sqlcv1/olap.sql index fb5edcf694..f67f937547 100644 --- a/pkg/repository/sqlcv1/olap.sql +++ b/pkg/repository/sqlcv1/olap.sql @@ -1136,7 +1136,7 @@ WITH tenants AS ( (d.inserted_at, d.id, d.readable_status) = (ap.inserted_at, ap.id, ap.old_readable_status) ), dags_to_update AS ( -- DAGs that need updating and don't already exist in target partition - SELECT + SELECT DISTINCT ON (dns.tenant_id, dns.id, dns.inserted_at) dns.tenant_id, dns.id, dns.inserted_at, @@ -1150,6 +1150,8 @@ WITH tenants AS ( FROM already_in_target_partition ap WHERE (ap.tenant_id, ap.id, ap.inserted_at) = (dns.tenant_id, dns.id, dns.inserted_at) ) + ORDER BY + dns.tenant_id, dns.id, dns.inserted_at, dns.old_readable_status ), updated_dags AS ( UPDATE v1_dags_olap d diff --git a/pkg/repository/sqlcv1/olap.sql.go b/pkg/repository/sqlcv1/olap.sql.go index cf8b06a719..3bcd3ea679 100644 --- a/pkg/repository/sqlcv1/olap.sql.go +++ b/pkg/repository/sqlcv1/olap.sql.go @@ -3386,7 +3386,7 @@ WITH tenants AS ( (d.inserted_at, d.id, d.readable_status) = (ap.inserted_at, ap.id, ap.old_readable_status) ), dags_to_update AS ( -- DAGs that need updating and don't already exist in target partition - SELECT + SELECT DISTINCT ON (dns.tenant_id, dns.id, dns.inserted_at) dns.tenant_id, dns.id, dns.inserted_at, @@ -3400,6 +3400,8 @@ WITH tenants AS ( FROM already_in_target_partition ap WHERE (ap.tenant_id, ap.id, ap.inserted_at) = (dns.tenant_id, dns.id, dns.inserted_at) ) + ORDER BY + dns.tenant_id, dns.id, dns.inserted_at, dns.old_readable_status ), updated_dags AS ( UPDATE v1_dags_olap d