Skip to content

Commit 5324b67

Browse files
authored
fix bulk pipeline restart (#458)
1 parent 3be9a88 commit 5324b67

File tree

3 files changed

+69
-0
lines changed

3 files changed

+69
-0
lines changed

harmony/harmonydb/sql/20240529-sdr-pipeline-task-extract.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ BEGIN
2323
END;
2424
$$ LANGUAGE plpgsql;
2525

26+
-- replaced by function of same name in 20250331-fix-bulk-restart-func.sql
2627
CREATE OR REPLACE FUNCTION unset_task_id(sp_id_param bigint, sector_number_param bigint)
2728
RETURNS void AS $$
2829
DECLARE

harmony/harmonydb/sql/20240611-snap-pipeline.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ BEGIN
339339
END;
340340
$$ LANGUAGE plpgsql;
341341

342+
-- replaced by function of same name in 20250331-fix-bulk-restart-func.sql
342343
CREATE OR REPLACE FUNCTION unset_task_id_snap(sp_id_param bigint, sector_number_param bigint)
343344
RETURNS void AS $$
344345
DECLARE
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
CREATE OR REPLACE FUNCTION unset_task_id(sp_id_param bigint, sector_number_param bigint)
2+
RETURNS void AS $$
3+
DECLARE
4+
column_name text;
5+
column_names text[] := ARRAY[
6+
'task_id_sdr',
7+
'task_id_tree_d',
8+
'task_id_tree_c',
9+
'task_id_tree_r',
10+
'task_id_precommit_msg',
11+
'task_id_porep',
12+
'task_id_finalize',
13+
'task_id_move_storage',
14+
'task_id_commit_msg'
15+
];
16+
update_query text;
17+
task_ids bigint[];
18+
task_id bigint;
19+
BEGIN
20+
-- Get all non-null task IDs
21+
task_ids := get_sdr_pipeline_tasks(sp_id_param, sector_number_param);
22+
23+
IF task_ids IS NULL OR array_length(task_ids, 1) IS NULL THEN
24+
RETURN;
25+
END IF;
26+
27+
-- Loop through each task ID and each column
28+
FOREACH column_name IN ARRAY column_names LOOP
29+
FOREACH task_id IN ARRAY task_ids LOOP
30+
update_query := format('UPDATE sectors_sdr_pipeline SET %I = NULL WHERE %I = $1 AND sp_id = $2 AND sector_number = $3', column_name, column_name);
31+
EXECUTE update_query USING task_id, sp_id_param, sector_number_param;
32+
END LOOP;
33+
END LOOP;
34+
END;
35+
$$ LANGUAGE plpgsql;
36+
37+
38+
CREATE OR REPLACE FUNCTION unset_task_id_snap(sp_id_param bigint, sector_number_param bigint)
39+
RETURNS void AS $$
40+
DECLARE
41+
column_name text;
42+
column_names text[] := ARRAY[
43+
'task_id_encode',
44+
'task_id_prove',
45+
'task_id_submit',
46+
'task_id_move_storage'
47+
];
48+
update_query text;
49+
task_ids bigint[];
50+
task_id bigint;
51+
BEGIN
52+
-- Get all non-null task IDs
53+
task_ids := get_snap_pipeline_tasks(sp_id_param, sector_number_param);
54+
55+
IF task_ids IS NULL OR array_length(task_ids, 1) IS NULL THEN
56+
RETURN;
57+
END IF;
58+
59+
-- Loop through each task ID and each column
60+
FOREACH column_name IN ARRAY column_names LOOP
61+
FOREACH task_id IN ARRAY task_ids LOOP
62+
update_query := format('UPDATE sectors_snap_pipeline SET %I = NULL WHERE %I = $1 AND sp_id = $2 AND sector_number = $3', column_name, column_name);
63+
EXECUTE update_query USING task_id, sp_id_param, sector_number_param;
64+
END LOOP;
65+
END LOOP;
66+
END;
67+
$$ LANGUAGE plpgsql;

0 commit comments

Comments
 (0)