Skip to content

Commit 3d9254a

Browse files
fenositslenny
andauthored
fix: cleaning up of prefixes under heavy concurrency (#764)
* fix: cleaning up of prefixes under heavy concurrency * fix deadlock in overlapping prefixes --------- Co-authored-by: Lenny <[email protected]>
1 parent a4cc5d5 commit 3d9254a

File tree

5 files changed

+1476
-1
lines changed

5 files changed

+1476
-1
lines changed

.docker/docker-compose-infra.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ services:
3636
POSTGRES_PASSWORD: postgres
3737

3838
pg_bouncer:
39-
image: bitnami/pgbouncer:latest
39+
image: bitnamilegacy/pgbouncer:latest
4040
ports:
4141
- 6453:6432
4242
environment:

migrations/tenant/0039-add-search-v2-sort-support.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
DROP FUNCTION IF EXISTS storage.search_v2;
12
CREATE OR REPLACE FUNCTION storage.search_v2 (
23
prefix text,
34
bucket_name text,
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
-- Drop old prefix-related triggers that conflict with new GC system
2+
DROP TRIGGER IF EXISTS prefixes_delete_hierarchy ON storage.prefixes;
3+
DROP TRIGGER IF EXISTS objects_delete_delete_prefix ON storage.objects;
4+
DROP TRIGGER IF EXISTS objects_update_create_prefix ON storage.objects;
5+
6+
-- Helper: Acquire statement-scoped advisory locks for the top-level path
7+
-- for each \[bucket_id, name] pair to serialize operations per "bucket/top_level_prefix".
8+
CREATE OR REPLACE FUNCTION storage.lock_top_prefixes(bucket_ids text[], names text[])
9+
RETURNS void
10+
LANGUAGE plpgsql
11+
SECURITY DEFINER
12+
AS $$
13+
DECLARE
14+
v_bucket text;
15+
v_top text;
16+
BEGIN
17+
FOR v_bucket, v_top IN
18+
SELECT DISTINCT t.bucket_id,
19+
split_part(t.name, '/', 1) AS top
20+
FROM unnest(bucket_ids, names) AS t(bucket_id, name)
21+
WHERE t.name <> ''
22+
ORDER BY 1, 2
23+
LOOP
24+
PERFORM pg_advisory_xact_lock(hashtextextended(v_bucket || '/' || v_top, 0));
25+
END LOOP;
26+
END;
27+
$$;
28+
29+
-- Helper: Given arrays of bucket_ids and names, compute all ancestor
30+
-- prefixes and delete those that are leaves (no children objects or prefixes).
31+
-- Repeats bottom-up until no more rows are removed.
32+
CREATE OR REPLACE FUNCTION storage.delete_leaf_prefixes(bucket_ids text[], names text[])
33+
RETURNS void
34+
LANGUAGE plpgsql
35+
SECURITY DEFINER
36+
AS $$
37+
DECLARE
38+
v_rows_deleted integer;
39+
BEGIN
40+
LOOP
41+
WITH candidates AS (
42+
SELECT DISTINCT t.bucket_id,
43+
unnest(storage.get_prefixes(t.name)) AS name
44+
FROM unnest(bucket_ids, names) AS t(bucket_id, name)
45+
),
46+
uniq AS (
47+
SELECT bucket_id,
48+
name,
49+
storage.get_level(name) AS level
50+
FROM candidates
51+
WHERE name <> ''
52+
GROUP BY bucket_id, name
53+
),
54+
leaf AS (
55+
SELECT p.bucket_id, p.name, p.level
56+
FROM storage.prefixes AS p
57+
JOIN uniq AS u
58+
ON u.bucket_id = p.bucket_id
59+
AND u.name = p.name
60+
AND u.level = p.level
61+
WHERE NOT EXISTS (
62+
SELECT 1
63+
FROM storage.objects AS o
64+
WHERE o.bucket_id = p.bucket_id
65+
AND storage.get_level(o.name) = p.level + 1
66+
AND o.name COLLATE "C" LIKE p.name || '/%'
67+
)
68+
AND NOT EXISTS (
69+
SELECT 1
70+
FROM storage.prefixes AS c
71+
WHERE c.bucket_id = p.bucket_id
72+
AND c.level = p.level + 1
73+
AND c.name COLLATE "C" LIKE p.name || '/%'
74+
)
75+
)
76+
DELETE FROM storage.prefixes AS p
77+
USING leaf AS l
78+
WHERE p.bucket_id = l.bucket_id
79+
AND p.name = l.name
80+
AND p.level = l.level;
81+
82+
GET DIAGNOSTICS v_rows_deleted = ROW_COUNT;
83+
EXIT WHEN v_rows_deleted = 0;
84+
END LOOP;
85+
END;
86+
$$;
87+
88+
-- After DELETE on storage.objects
89+
-- - Guards with `gc.prefixes`
90+
-- - Locks top-level prefixes for touched objects
91+
-- - Deletes leaf prefixes derived from deleted object names and their ancestors
92+
CREATE OR REPLACE FUNCTION storage.objects_delete_cleanup()
93+
RETURNS trigger
94+
LANGUAGE plpgsql
95+
SECURITY DEFINER
96+
AS $$
97+
DECLARE
98+
v_bucket_ids text[];
99+
v_names text[];
100+
BEGIN
101+
IF current_setting('storage.gc.prefixes', true) = '1' THEN
102+
RETURN NULL;
103+
END IF;
104+
105+
PERFORM set_config('storage.gc.prefixes', '1', true);
106+
107+
SELECT COALESCE(array_agg(d.bucket_id), '{}'),
108+
COALESCE(array_agg(d.name), '{}')
109+
INTO v_bucket_ids, v_names
110+
FROM deleted AS d
111+
WHERE d.name <> '';
112+
113+
PERFORM storage.lock_top_prefixes(v_bucket_ids, v_names);
114+
PERFORM storage.delete_leaf_prefixes(v_bucket_ids, v_names);
115+
116+
RETURN NULL;
117+
END;
118+
$$;
119+
120+
-- After UPDATE on storage.objects
121+
-- - Only OLD names matter for cleanup; NEW prefixes are created elsewhere
122+
-- - Guards with `gc.prefixes`, locks, then prunes leaves derived from OLD names
123+
CREATE OR REPLACE FUNCTION storage.objects_update_cleanup()
124+
RETURNS trigger
125+
LANGUAGE plpgsql
126+
SECURITY DEFINER
127+
AS $$
128+
DECLARE
129+
-- NEW - OLD (destinations to create prefixes for)
130+
v_add_bucket_ids text[];
131+
v_add_names text[];
132+
133+
-- OLD - NEW (sources to prune)
134+
v_src_bucket_ids text[];
135+
v_src_names text[];
136+
BEGIN
137+
IF TG_OP <> 'UPDATE' THEN
138+
RETURN NULL;
139+
END IF;
140+
141+
-- 1) Compute NEW−OLD (added paths) and OLD−NEW (moved-away paths)
142+
WITH added AS (
143+
SELECT n.bucket_id, n.name
144+
FROM new_rows n
145+
WHERE n.name <> '' AND position('/' in n.name) > 0
146+
EXCEPT
147+
SELECT o.bucket_id, o.name FROM old_rows o WHERE o.name <> ''
148+
),
149+
moved AS (
150+
SELECT o.bucket_id, o.name
151+
FROM old_rows o
152+
WHERE o.name <> ''
153+
EXCEPT
154+
SELECT n.bucket_id, n.name FROM new_rows n WHERE n.name <> ''
155+
)
156+
SELECT
157+
-- arrays for ADDED (dest) in stable order
158+
COALESCE( (SELECT array_agg(a.bucket_id ORDER BY a.bucket_id, a.name) FROM added a), '{}' ),
159+
COALESCE( (SELECT array_agg(a.name ORDER BY a.bucket_id, a.name) FROM added a), '{}' ),
160+
-- arrays for MOVED (src) in stable order
161+
COALESCE( (SELECT array_agg(m.bucket_id ORDER BY m.bucket_id, m.name) FROM moved m), '{}' ),
162+
COALESCE( (SELECT array_agg(m.name ORDER BY m.bucket_id, m.name) FROM moved m), '{}' )
163+
INTO v_add_bucket_ids, v_add_names, v_src_bucket_ids, v_src_names;
164+
165+
-- Nothing to do?
166+
IF (array_length(v_add_bucket_ids, 1) IS NULL) AND (array_length(v_src_bucket_ids, 1) IS NULL) THEN
167+
RETURN NULL;
168+
END IF;
169+
170+
-- 2) Take per-(bucket, top) locks: ALL prefixes in consistent global order to prevent deadlocks
171+
DECLARE
172+
v_all_bucket_ids text[];
173+
v_all_names text[];
174+
BEGIN
175+
-- Combine source and destination arrays for consistent lock ordering
176+
v_all_bucket_ids := COALESCE(v_src_bucket_ids, '{}') || COALESCE(v_add_bucket_ids, '{}');
177+
v_all_names := COALESCE(v_src_names, '{}') || COALESCE(v_add_names, '{}');
178+
179+
-- Single lock call ensures consistent global ordering across all transactions
180+
IF array_length(v_all_bucket_ids, 1) IS NOT NULL THEN
181+
PERFORM storage.lock_top_prefixes(v_all_bucket_ids, v_all_names);
182+
END IF;
183+
END;
184+
185+
-- 3) Create destination prefixes (NEW−OLD) BEFORE pruning sources
186+
IF array_length(v_add_bucket_ids, 1) IS NOT NULL THEN
187+
WITH candidates AS (
188+
SELECT DISTINCT t.bucket_id, unnest(storage.get_prefixes(t.name)) AS name
189+
FROM unnest(v_add_bucket_ids, v_add_names) AS t(bucket_id, name)
190+
WHERE name <> ''
191+
)
192+
INSERT INTO storage.prefixes (bucket_id, name)
193+
SELECT c.bucket_id, c.name
194+
FROM candidates c
195+
ON CONFLICT DO NOTHING;
196+
END IF;
197+
198+
-- 4) Prune source prefixes bottom-up for OLD−NEW
199+
IF array_length(v_src_bucket_ids, 1) IS NOT NULL THEN
200+
-- re-entrancy guard so DELETE on prefixes won't recurse
201+
IF current_setting('storage.gc.prefixes', true) <> '1' THEN
202+
PERFORM set_config('storage.gc.prefixes', '1', true);
203+
END IF;
204+
205+
PERFORM storage.delete_leaf_prefixes(v_src_bucket_ids, v_src_names);
206+
END IF;
207+
208+
RETURN NULL;
209+
END;
210+
$$;
211+
212+
-- After DELETE on storage.prefixes
213+
-- - When prefixes are deleted, remove now-empty ancestor prefixes
214+
-- - Guards with `gc.prefixes`, locks, then prunes leaves derived from deleted prefixes
215+
CREATE OR REPLACE FUNCTION storage.prefixes_delete_cleanup()
216+
RETURNS trigger
217+
LANGUAGE plpgsql
218+
SECURITY DEFINER
219+
AS $$
220+
DECLARE
221+
v_bucket_ids text[];
222+
v_names text[];
223+
BEGIN
224+
IF current_setting('storage.gc.prefixes', true) = '1' THEN
225+
RETURN NULL;
226+
END IF;
227+
228+
PERFORM set_config('storage.gc.prefixes', '1', true);
229+
230+
SELECT COALESCE(array_agg(d.bucket_id), '{}'),
231+
COALESCE(array_agg(d.name), '{}')
232+
INTO v_bucket_ids, v_names
233+
FROM deleted AS d
234+
WHERE d.name <> '';
235+
236+
PERFORM storage.lock_top_prefixes(v_bucket_ids, v_names);
237+
PERFORM storage.delete_leaf_prefixes(v_bucket_ids, v_names);
238+
239+
RETURN NULL;
240+
END;
241+
$$;
242+
243+
-- Trigger bindings
244+
CREATE TRIGGER objects_delete_cleanup
245+
AFTER DELETE ON storage.objects
246+
REFERENCING OLD TABLE AS deleted
247+
FOR EACH STATEMENT
248+
EXECUTE FUNCTION storage.objects_delete_cleanup();
249+
250+
CREATE TRIGGER prefixes_delete_cleanup
251+
AFTER DELETE ON storage.prefixes
252+
REFERENCING OLD TABLE AS deleted
253+
FOR EACH STATEMENT
254+
EXECUTE FUNCTION storage.prefixes_delete_cleanup();
255+
256+
CREATE TRIGGER objects_update_cleanup
257+
AFTER UPDATE ON storage.objects
258+
REFERENCING OLD TABLE AS old_rows NEW TABLE AS new_rows
259+
FOR EACH STATEMENT
260+
EXECUTE FUNCTION storage.objects_update_cleanup();

src/internal/database/migrations/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,5 @@ export const DBMigration = {
3838
'add-bucket-name-length-trigger': 37,
3939
'iceberg-catalog-flag-on-buckets': 38,
4040
'add-search-v2-sort-support': 39,
41+
'fix-prefix-race-conditions-optimized': 40,
4142
}

0 commit comments

Comments
 (0)