diff --git a/pkg/repository/sqlcv1/olap.sql b/pkg/repository/sqlcv1/olap.sql index feda5a4870..f67f937547 100644 --- a/pkg/repository/sqlcv1/olap.sql +++ b/pkg/repository/sqlcv1/olap.sql @@ -788,8 +788,8 @@ WITH tenants AS ( worker_id IS NOT NULL GROUP BY tenant_id, task_id, task_inserted_at, retry_count -), locked_tasks AS ( - SELECT +), distinct_tasks_to_lock AS ( + SELECT DISTINCT ON (t.tenant_id, t.id, t.inserted_at) t.tenant_id, t.id, t.inserted_at, @@ -801,9 +801,18 @@ WITH tenants AS ( JOIN updatable_events e ON (t.tenant_id, t.id, t.inserted_at) = (e.tenant_id, e.task_id, e.task_inserted_at) - WHERE t.inserted_at >= @minInsertedAt::TIMESTAMPTZ + WHERE + t.inserted_at >= @minInsertedAt::TIMESTAMPTZ ORDER BY - t.inserted_at, t.id + t.tenant_id, t.id, t.inserted_at, t.readable_status +), locked_tasks AS ( + SELECT + dt.* + FROM + v1_tasks_olap t + JOIN + distinct_tasks_to_lock dt ON + (dt.inserted_at, dt.id) = (t.inserted_at, t.id) FOR UPDATE ), already_in_target_partition AS ( -- Check if rows already exist in the target partition (with the new readable_status) @@ -1127,7 +1136,7 @@ WITH tenants AS ( (d.inserted_at, d.id, d.readable_status) = (ap.inserted_at, ap.id, ap.old_readable_status) ), dags_to_update AS ( -- DAGs that need updating and don't already exist in target partition - SELECT + SELECT DISTINCT ON (dns.tenant_id, dns.id, dns.inserted_at) dns.tenant_id, dns.id, dns.inserted_at, @@ -1141,6 +1150,8 @@ WITH tenants AS ( FROM already_in_target_partition ap WHERE (ap.tenant_id, ap.id, ap.inserted_at) = (dns.tenant_id, dns.id, dns.inserted_at) ) + ORDER BY + dns.tenant_id, dns.id, dns.inserted_at, dns.old_readable_status ), updated_dags AS ( UPDATE v1_dags_olap d diff --git a/pkg/repository/sqlcv1/olap.sql.go b/pkg/repository/sqlcv1/olap.sql.go index ba78086b7a..3bcd3ea679 100644 --- a/pkg/repository/sqlcv1/olap.sql.go +++ b/pkg/repository/sqlcv1/olap.sql.go @@ -3386,7 +3386,7 @@ WITH tenants AS ( (d.inserted_at, d.id, d.readable_status) = (ap.inserted_at, ap.id, ap.old_readable_status) ), dags_to_update AS ( -- DAGs that need updating and don't already exist in target partition - SELECT + SELECT DISTINCT ON (dns.tenant_id, dns.id, dns.inserted_at) dns.tenant_id, dns.id, dns.inserted_at, @@ -3400,6 +3400,8 @@ WITH tenants AS ( FROM already_in_target_partition ap WHERE (ap.tenant_id, ap.id, ap.inserted_at) = (dns.tenant_id, dns.id, dns.inserted_at) ) + ORDER BY + dns.tenant_id, dns.id, dns.inserted_at, dns.old_readable_status ), updated_dags AS ( UPDATE v1_dags_olap d @@ -3583,8 +3585,8 @@ WITH tenants AS ( worker_id IS NOT NULL GROUP BY tenant_id, task_id, task_inserted_at, retry_count -), locked_tasks AS ( - SELECT +), distinct_tasks_to_lock AS ( + SELECT DISTINCT ON (t.tenant_id, t.id, t.inserted_at) t.tenant_id, t.id, t.inserted_at, @@ -3596,9 +3598,18 @@ WITH tenants AS ( JOIN updatable_events e ON (t.tenant_id, t.id, t.inserted_at) = (e.tenant_id, e.task_id, e.task_inserted_at) - WHERE t.inserted_at >= $4::TIMESTAMPTZ + WHERE + t.inserted_at >= $4::TIMESTAMPTZ ORDER BY - t.inserted_at, t.id + t.tenant_id, t.id, t.inserted_at, t.readable_status +), locked_tasks AS ( + SELECT + dt.tenant_id, dt.id, dt.inserted_at, dt.readable_status, dt.retry_count, dt.max_readable_status + FROM + v1_tasks_olap t + JOIN + distinct_tasks_to_lock dt ON + (dt.inserted_at, dt.id) = (t.inserted_at, t.id) FOR UPDATE ), already_in_target_partition AS ( -- Check if rows already exist in the target partition (with the new readable_status)