Skip to content

Commit 7b5ce95

Browse files
jhfclaude
andcommitted
fix: Prevent batch workers serializing on ShareUpdateExclusiveLock
Two migrations: 1. fix_derive_pipeline_consistency: Defers statistical_unit DELETEs to flush_staging for crash safety, adds staging TRUNCATE at start. 2. fix_batch_null_ranges_causing_full_refresh: When a batch has 0 IDs for a unit type, array_to_int4multirange(NULL) returns NULL. Passing NULL to timeline_*_refresh triggers the full refresh path which runs ANALYZE on 7 tables. ANALYZE acquires ShareUpdateExclusiveLock (self- conflicting), serializing all concurrent batch workers - 75% of worker capacity wasted waiting. Fix: COALESCE NULL ranges to empty '{}' multirange for timepoints/ timesegments/statistical_unit refresh, and skip timeline refresh calls entirely when no IDs exist for that unit type. Verified on Norway dataset: 3.1M statistical_unit rows derived with zero lock contention across 4 concurrent workers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 429aaab commit 7b5ce95

File tree

39 files changed

+1714
-574
lines changed

39 files changed

+1714
-574
lines changed

migrations/20260206211707_fix_derive_pipeline_consistency.down.sql

Lines changed: 658 additions & 0 deletions
Large diffs are not rendered by default.

migrations/20260206211707_fix_derive_pipeline_consistency.up.sql

Lines changed: 660 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
-- Down Migration 20260207215705: fix_batch_null_ranges_causing_full_refresh
2+
BEGIN;
3+
4+
-- Restore original procedure that passes NULL ranges directly
5+
CREATE OR REPLACE PROCEDURE worker.statistical_unit_refresh_batch(IN payload jsonb)
6+
LANGUAGE plpgsql
7+
SECURITY DEFINER
8+
AS $statistical_unit_refresh_batch$
9+
DECLARE
10+
v_batch_seq INT := (payload->>'batch_seq')::INT;
11+
v_enterprise_ids INT[];
12+
v_legal_unit_ids INT[];
13+
v_establishment_ids INT[];
14+
v_enterprise_id_ranges int4multirange;
15+
v_legal_unit_id_ranges int4multirange;
16+
v_establishment_id_ranges int4multirange;
17+
BEGIN
18+
-- Extract batch IDs from payload (no more explicit_*_ids merging)
19+
IF jsonb_typeof(payload->'enterprise_ids') = 'array' THEN
20+
SELECT array_agg(value::INT) INTO v_enterprise_ids
21+
FROM jsonb_array_elements_text(payload->'enterprise_ids') AS value;
22+
END IF;
23+
24+
IF jsonb_typeof(payload->'legal_unit_ids') = 'array' THEN
25+
SELECT array_agg(value::INT) INTO v_legal_unit_ids
26+
FROM jsonb_array_elements_text(payload->'legal_unit_ids') AS value;
27+
END IF;
28+
29+
IF jsonb_typeof(payload->'establishment_ids') = 'array' THEN
30+
SELECT array_agg(value::INT) INTO v_establishment_ids
31+
FROM jsonb_array_elements_text(payload->'establishment_ids') AS value;
32+
END IF;
33+
34+
v_enterprise_id_ranges := public.array_to_int4multirange(v_enterprise_ids);
35+
v_legal_unit_id_ranges := public.array_to_int4multirange(v_legal_unit_ids);
36+
v_establishment_id_ranges := public.array_to_int4multirange(v_establishment_ids);
37+
38+
RAISE DEBUG 'Processing batch % with % enterprises, % legal_units, % establishments',
39+
v_batch_seq,
40+
COALESCE(array_length(v_enterprise_ids, 1), 0),
41+
COALESCE(array_length(v_legal_unit_ids, 1), 0),
42+
COALESCE(array_length(v_establishment_ids, 1), 0);
43+
44+
-- Call the refresh procedures for this batch
45+
CALL public.timepoints_refresh(
46+
p_establishment_id_ranges => v_establishment_id_ranges,
47+
p_legal_unit_id_ranges => v_legal_unit_id_ranges,
48+
p_enterprise_id_ranges => v_enterprise_id_ranges
49+
);
50+
51+
CALL public.timesegments_refresh(
52+
p_establishment_id_ranges => v_establishment_id_ranges,
53+
p_legal_unit_id_ranges => v_legal_unit_id_ranges,
54+
p_enterprise_id_ranges => v_enterprise_id_ranges
55+
);
56+
57+
-- Use concurrent-safe version for years
58+
CALL public.timesegments_years_refresh_concurrent();
59+
60+
CALL public.timeline_establishment_refresh(p_unit_id_ranges => v_establishment_id_ranges);
61+
CALL public.timeline_legal_unit_refresh(p_unit_id_ranges => v_legal_unit_id_ranges);
62+
CALL public.timeline_enterprise_refresh(p_unit_id_ranges => v_enterprise_id_ranges);
63+
64+
CALL public.statistical_unit_refresh(
65+
p_establishment_id_ranges => v_establishment_id_ranges,
66+
p_legal_unit_id_ranges => v_legal_unit_id_ranges,
67+
p_enterprise_id_ranges => v_enterprise_id_ranges
68+
);
69+
END;
70+
$statistical_unit_refresh_batch$;
71+
72+
END;
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
-- Migration 20260207215705: fix_batch_null_ranges_causing_full_refresh
2+
--
3+
-- Fix: When a batch has no IDs for a unit type (e.g., 0 establishments),
4+
-- array_to_int4multirange(NULL) returns NULL. Passing NULL to refresh procedures
5+
-- triggers the FULL REFRESH path which runs ANALYZE on multiple tables.
6+
-- ANALYZE acquires ShareUpdateExclusiveLock which self-conflicts, serializing
7+
-- all concurrent batch workers (75% of worker capacity wasted waiting).
8+
--
9+
-- Solution: Use COALESCE to pass empty multirange instead of NULL for
10+
-- multi-range parameters, and skip timeline refresh calls entirely when
11+
-- there are no IDs for that unit type.
12+
BEGIN;
13+
14+
CREATE OR REPLACE PROCEDURE worker.statistical_unit_refresh_batch(IN payload jsonb)
15+
LANGUAGE plpgsql
16+
SECURITY DEFINER
17+
AS $statistical_unit_refresh_batch$
18+
DECLARE
19+
v_batch_seq INT := (payload->>'batch_seq')::INT;
20+
v_enterprise_ids INT[];
21+
v_legal_unit_ids INT[];
22+
v_establishment_ids INT[];
23+
v_enterprise_id_ranges int4multirange;
24+
v_legal_unit_id_ranges int4multirange;
25+
v_establishment_id_ranges int4multirange;
26+
BEGIN
27+
-- Extract batch IDs from payload
28+
IF jsonb_typeof(payload->'enterprise_ids') = 'array' THEN
29+
SELECT array_agg(value::INT) INTO v_enterprise_ids
30+
FROM jsonb_array_elements_text(payload->'enterprise_ids') AS value;
31+
END IF;
32+
33+
IF jsonb_typeof(payload->'legal_unit_ids') = 'array' THEN
34+
SELECT array_agg(value::INT) INTO v_legal_unit_ids
35+
FROM jsonb_array_elements_text(payload->'legal_unit_ids') AS value;
36+
END IF;
37+
38+
IF jsonb_typeof(payload->'establishment_ids') = 'array' THEN
39+
SELECT array_agg(value::INT) INTO v_establishment_ids
40+
FROM jsonb_array_elements_text(payload->'establishment_ids') AS value;
41+
END IF;
42+
43+
v_enterprise_id_ranges := public.array_to_int4multirange(v_enterprise_ids);
44+
v_legal_unit_id_ranges := public.array_to_int4multirange(v_legal_unit_ids);
45+
v_establishment_id_ranges := public.array_to_int4multirange(v_establishment_ids);
46+
47+
RAISE DEBUG 'Processing batch % with % enterprises, % legal_units, % establishments',
48+
v_batch_seq,
49+
COALESCE(array_length(v_enterprise_ids, 1), 0),
50+
COALESCE(array_length(v_legal_unit_ids, 1), 0),
51+
COALESCE(array_length(v_establishment_ids, 1), 0);
52+
53+
-- Call refresh procedures for this batch.
54+
-- IMPORTANT: Use COALESCE to pass empty multirange '{}' instead of NULL.
55+
-- NULL is interpreted as "full refresh" which runs ANALYZE, acquiring
56+
-- ShareUpdateExclusiveLock that serializes all concurrent batch workers.
57+
CALL public.timepoints_refresh(
58+
p_establishment_id_ranges => COALESCE(v_establishment_id_ranges, '{}'::int4multirange),
59+
p_legal_unit_id_ranges => COALESCE(v_legal_unit_id_ranges, '{}'::int4multirange),
60+
p_enterprise_id_ranges => COALESCE(v_enterprise_id_ranges, '{}'::int4multirange)
61+
);
62+
63+
CALL public.timesegments_refresh(
64+
p_establishment_id_ranges => COALESCE(v_establishment_id_ranges, '{}'::int4multirange),
65+
p_legal_unit_id_ranges => COALESCE(v_legal_unit_id_ranges, '{}'::int4multirange),
66+
p_enterprise_id_ranges => COALESCE(v_enterprise_id_ranges, '{}'::int4multirange)
67+
);
68+
69+
-- Use concurrent-safe version for years
70+
CALL public.timesegments_years_refresh_concurrent();
71+
72+
-- Timeline refreshes: skip when no IDs for that unit type (avoids full refresh)
73+
IF v_establishment_id_ranges IS NOT NULL THEN
74+
CALL public.timeline_establishment_refresh(p_unit_id_ranges => v_establishment_id_ranges);
75+
END IF;
76+
IF v_legal_unit_id_ranges IS NOT NULL THEN
77+
CALL public.timeline_legal_unit_refresh(p_unit_id_ranges => v_legal_unit_id_ranges);
78+
END IF;
79+
IF v_enterprise_id_ranges IS NOT NULL THEN
80+
CALL public.timeline_enterprise_refresh(p_unit_id_ranges => v_enterprise_id_ranges);
81+
END IF;
82+
83+
CALL public.statistical_unit_refresh(
84+
p_establishment_id_ranges => COALESCE(v_establishment_id_ranges, '{}'::int4multirange),
85+
p_legal_unit_id_ranges => COALESCE(v_legal_unit_id_ranges, '{}'::int4multirange),
86+
p_enterprise_id_ranges => COALESCE(v_enterprise_id_ranges, '{}'::int4multirange)
87+
);
88+
END;
89+
$statistical_unit_refresh_batch$;
90+
91+
END;

test/expected/103_legal_units_with_data_source.out

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,10 +200,6 @@ ORDER BY slug;
200200
\echo Run worker processing for analytics tasks
201201
Run worker processing for analytics tasks
202202
CALL worker.process_tasks(p_queue => 'analytics');
203-
NOTICE: Created btree index su_ei_tax_ident_idx for external_ident_type
204-
NOTICE: Created btree index su_ei_stat_ident_idx for external_ident_type
205-
NOTICE: Created indices for stat_definition employees
206-
NOTICE: Created indices for stat_definition turnover
207203
SELECT queue, state, count(*) FROM worker.tasks AS t JOIN worker.command_registry AS c ON t.command = c.command WHERE c.queue != 'maintenance' GROUP BY queue,state ORDER BY queue,state;
208204
queue | state | count
209205
-----------+-----------+-------

test/expected/104_load_establishment_without_legal_unit.out

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -254,10 +254,6 @@ SELECT
254254
\echo Run worker processing for analytics tasks
255255
Run worker processing for analytics tasks
256256
CALL worker.process_tasks(p_queue => 'analytics');
257-
NOTICE: Created btree index su_ei_tax_ident_idx for external_ident_type
258-
NOTICE: Created btree index su_ei_stat_ident_idx for external_ident_type
259-
NOTICE: Created indices for stat_definition employees
260-
NOTICE: Created indices for stat_definition turnover
261257
SELECT queue, state, count(*) FROM worker.tasks AS t JOIN worker.command_registry AS c ON t.command = c.command WHERE c.queue != 'maintenance' GROUP BY queue,state ORDER BY queue,state;
262258
queue | state | count
263259
-----------+-----------+-------

test/expected/105_legal_units_with_statistics.out

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2453,10 +2453,6 @@ ORDER BY slug;
24532453
\echo Run worker processing for analytics tasks
24542454
Run worker processing for analytics tasks
24552455
CALL worker.process_tasks(p_queue => 'analytics');
2456-
NOTICE: Created btree index su_ei_tax_ident_idx for external_ident_type
2457-
NOTICE: Created btree index su_ei_stat_ident_idx for external_ident_type
2458-
NOTICE: Created indices for stat_definition employees
2459-
NOTICE: Created indices for stat_definition turnover
24602456
SELECT queue, state, count(*) FROM worker.tasks AS t JOIN worker.command_registry AS c ON t.command = c.command WHERE c.queue != 'maintenance' GROUP BY queue,state ORDER BY queue,state;
24612457
queue | state | count
24622458
-----------+-----------+-------

test/expected/106_load_with_status_codes.out

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,10 +210,6 @@ WHERE slug LIKE 'import_35_%' ORDER BY slug;
210210
\echo Run worker processing for analytics tasks
211211
Run worker processing for analytics tasks
212212
CALL worker.process_tasks(p_queue => 'analytics');
213-
NOTICE: Created btree index su_ei_tax_ident_idx for external_ident_type
214-
NOTICE: Created btree index su_ei_stat_ident_idx for external_ident_type
215-
NOTICE: Created indices for stat_definition employees
216-
NOTICE: Created indices for stat_definition turnover
217213
SELECT queue, state, count(*) FROM worker.tasks AS t JOIN worker.command_registry AS c ON t.command = c.command WHERE c.queue != 'maintenance' GROUP BY queue,state ORDER BY queue,state;
218214
queue | state | count
219215
-----------+-----------+-------

test/expected/107_load_and_verify_history_functions.out

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -321,10 +321,6 @@ ORDER BY year, month;
321321
\echo Run worker processing for analytics tasks
322322
Run worker processing for analytics tasks
323323
CALL worker.process_tasks(p_queue => 'analytics');
324-
NOTICE: Created btree index su_ei_tax_ident_idx for external_ident_type
325-
NOTICE: Created btree index su_ei_stat_ident_idx for external_ident_type
326-
NOTICE: Created indices for stat_definition employees
327-
NOTICE: Created indices for stat_definition turnover
328324
SELECT queue, state, count(*) FROM worker.tasks AS t JOIN worker.command_registry AS c ON t.command = c.command WHERE c.queue != 'maintenance' GROUP BY queue,state ORDER BY queue,state;
329325
queue | state | count
330326
-----------+-----------+-------

test/expected/109_hierarchy_functions.out

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,10 +220,6 @@ SELECT row_id, errors, invalid_codes, merge_status FROM public.import_34_eswlu_e
220220
\echo Run worker processing for analytics tasks
221221
Run worker processing for analytics tasks
222222
CALL worker.process_tasks(p_queue => 'analytics');
223-
NOTICE: Created btree index su_ei_tax_ident_idx for external_ident_type
224-
NOTICE: Created btree index su_ei_stat_ident_idx for external_ident_type
225-
NOTICE: Created indices for stat_definition employees
226-
NOTICE: Created indices for stat_definition turnover
227223
SELECT queue, state, count(*) FROM worker.tasks AS t JOIN worker.command_registry AS c ON t.command = c.command WHERE c.queue != 'maintenance' GROUP BY queue,state ORDER BY queue,state;
228224
queue | state | count
229225
-----------+-----------+-------

0 commit comments

Comments
 (0)