From bd940a34fe392e1cb0e02009a6f4b245b1a4530d Mon Sep 17 00:00:00 2001 From: Lenny Date: Fri, 3 Oct 2025 19:12:29 -0400 Subject: [PATCH] fix: add ref counting for reliable prefix lifecycle management --- .../tenant/0044-prefix-ref-counting.sql | 420 ++++++++++++++++++ .../tenant/0045-calculate-ref-counts.sql | 282 ++++++++++++ src/internal/database/migrations/types.ts | 5 +- src/test/prefixes.test.ts | 47 +- 4 files changed, 747 insertions(+), 7 deletions(-) create mode 100644 migrations/tenant/0044-prefix-ref-counting.sql create mode 100644 migrations/tenant/0045-calculate-ref-counts.sql diff --git a/migrations/tenant/0044-prefix-ref-counting.sql b/migrations/tenant/0044-prefix-ref-counting.sql new file mode 100644 index 00000000..a7d0c29c --- /dev/null +++ b/migrations/tenant/0044-prefix-ref-counting.sql @@ -0,0 +1,420 @@ +-- Drop existing triggers that will be replaced +DROP TRIGGER IF EXISTS objects_insert_create_prefix ON storage.objects; +DROP TRIGGER IF EXISTS objects_update_create_prefix ON storage.objects; +DROP TRIGGER IF EXISTS objects_delete_delete_prefix ON storage.objects; +DROP TRIGGER IF EXISTS prefixes_delete_hierarchy ON storage.prefixes; +DROP TRIGGER IF EXISTS prefixes_create_hierarchy ON storage.prefixes; + +-- Add ref counting columns to prefixes table +-- Default to -1 (sentinel value meaning "not yet calculated") +-- Migration 0045 will calculate correct values and replace increment/decrement functions +ALTER TABLE storage.prefixes + ADD COLUMN IF NOT EXISTS child_objects INTEGER DEFAULT -1, + ADD COLUMN IF NOT EXISTS child_prefixes INTEGER DEFAULT -1; + +-- Helper function to get the direct parent prefix +CREATE OR REPLACE FUNCTION storage.get_direct_parent(name text) + RETURNS text + LANGUAGE sql + IMMUTABLE STRICT +AS $$ + SELECT CASE + WHEN position('/' in name) = 0 THEN '' + ELSE regexp_replace(name, '/[^/]+$', '') + END; +$$; + +-- Function to acquire advisory lock for a specific bucket+prefix combination +-- This ensures serialization of operations on the same prefix to avoid race conditions +CREATE OR REPLACE FUNCTION storage.lock_prefix(bucket_id text, prefix_name text) + RETURNS void + LANGUAGE plpgsql +AS $$ +BEGIN + PERFORM pg_advisory_xact_lock( + hashtextextended(bucket_id || '/' || prefix_name, 0) + ); +END; +$$; + +-- Function to acquire multiple prefix locks in consistent order to prevent deadlocks +-- This function takes a list of prefixes and locks them in deterministic order +CREATE OR REPLACE FUNCTION storage.lock_multiple_prefixes(bucket_id text, prefix_names text[]) + RETURNS void + LANGUAGE plpgsql +AS $$ +DECLARE + _prefix_name text; + _sorted_prefixes text[]; +BEGIN + -- Sort prefixes to ensure consistent lock ordering across all transactions + SELECT array_agg(prefix_name ORDER BY prefix_name) + INTO _sorted_prefixes + FROM unnest(prefix_names) AS prefix_name + WHERE prefix_name != ''; + + -- Acquire locks in sorted order + FOREACH _prefix_name IN ARRAY _sorted_prefixes + LOOP + PERFORM storage.lock_prefix(bucket_id, _prefix_name); + END LOOP; +END; +$$; + +-- Function to increment child_prefixes count for a parent prefix +-- TEMPORARY: During migration, just ensures prefix exists with -1 sentinel +-- This will be replaced with real increment logic at end of migration 0045 +CREATE OR REPLACE FUNCTION storage.increment_prefix_child_count( + _bucket_id text, + _child_name text +) + RETURNS void + LANGUAGE plpgsql + SECURITY DEFINER +AS $$ +DECLARE + _parent_name text; +BEGIN + _parent_name := storage.get_direct_parent(_child_name); + + IF _parent_name = '' THEN + RETURN; + END IF; + + PERFORM storage.lock_prefix(_bucket_id, _parent_name); + + -- Just ensure prefix exists with -1 sentinel (don't increment yet) + INSERT INTO storage.prefixes (bucket_id, name, child_objects, child_prefixes) + VALUES (_bucket_id, _parent_name, -1, -1) + ON CONFLICT (bucket_id, level, name) DO NOTHING; +END; +$$; + +-- Function to decrement child_prefixes count for a parent prefix +-- TEMPORARY: During migration, this is a NO-OP (prefix should already exist) +-- This will be replaced with real decrement logic at end of migration 0045 +CREATE OR REPLACE FUNCTION storage.decrement_prefix_child_count( + _bucket_id text, + _child_name text +) + RETURNS void + LANGUAGE plpgsql + SECURITY DEFINER +AS $$ +BEGIN + RETURN; +END; +$$; + +-- Function to increment child_objects count for a prefix +-- TEMPORARY: During migration, just ensures prefix exists with -1 sentinel +-- This will be replaced with real increment logic at end of migration 0045 +CREATE OR REPLACE FUNCTION storage.increment_prefix_object_count( + _bucket_id text, + _prefix_name text, + _count bigint +) + RETURNS void + LANGUAGE plpgsql + SECURITY DEFINER +AS $$ +BEGIN + IF _prefix_name = '' THEN + RETURN; + END IF; + + -- Just ensure prefix exists with -1 sentinel (don't increment yet) + INSERT INTO storage.prefixes (bucket_id, name, child_objects, child_prefixes) + VALUES (_bucket_id, _prefix_name, -1, -1) + ON CONFLICT (bucket_id, level, name) DO NOTHING; +END; +$$; + +-- Function to decrement child_objects count for a prefix +-- TEMPORARY: During migration, this is a NO-OP +-- This will be replaced with real decrement logic at end of migration 0045 +CREATE OR REPLACE FUNCTION storage.decrement_prefix_object_count( + _bucket_id text, + _prefix_name text, + _count bigint +) + RETURNS void + LANGUAGE plpgsql + SECURITY DEFINER +AS $$ +BEGIN + RETURN; +END; +$$; + +-- Trigger function for object insertions (statement-level) +-- Creates parent prefixes and increments child_objects counts +CREATE OR REPLACE FUNCTION storage.objects_insert_after_ref_counting() + RETURNS trigger + LANGUAGE plpgsql + SECURITY DEFINER +AS $$ +DECLARE + _all_prefixes text[]; + _prefix_updates RECORD; +BEGIN + -- Collect all unique prefixes and acquire locks upfront in sorted order to prevent deadlocks + SELECT array_agg(DISTINCT storage.get_direct_parent(i.name) ORDER BY storage.get_direct_parent(i.name)) + INTO _all_prefixes + FROM inserted i + WHERE position('/' in i.name) > 0 + AND storage.get_direct_parent(i.name) != ''; + + IF _all_prefixes IS NOT NULL AND array_length(_all_prefixes, 1) > 0 THEN + FOR _prefix_updates IN + SELECT DISTINCT bucket_id FROM inserted + LOOP + PERFORM storage.lock_multiple_prefixes(_prefix_updates.bucket_id, _all_prefixes); + END LOOP; + END IF; + + FOR _prefix_updates IN + WITH inserted_with_parents AS ( + SELECT + i.bucket_id, + storage.get_direct_parent(i.name) as parent_prefix, + COUNT(*) as inserted_count + FROM inserted i + WHERE position('/' in i.name) > 0 + GROUP BY i.bucket_id, storage.get_direct_parent(i.name) + ) + SELECT bucket_id, parent_prefix, inserted_count + FROM inserted_with_parents + WHERE parent_prefix != '' + ORDER BY bucket_id, parent_prefix + LOOP + PERFORM storage.increment_prefix_object_count( + _prefix_updates.bucket_id, + _prefix_updates.parent_prefix, + _prefix_updates.inserted_count + ); + END LOOP; + + RETURN NULL; +END; +$$; + +-- Trigger function the level before insert or update object +CREATE OR REPLACE FUNCTION storage.objects_set_level() + RETURNS trigger + LANGUAGE plpgsql +AS $$ +BEGIN + NEW.level := storage.get_level(NEW.name); + RETURN NEW; +END; +$$; + +-- Trigger function for object deletions (statement-level) +-- Decrements parent prefix/object counts +CREATE OR REPLACE FUNCTION storage.objects_delete_ref_counting() + RETURNS trigger + LANGUAGE plpgsql + SECURITY DEFINER +AS $$ +DECLARE + _prefix_updates RECORD; + _all_prefixes text[]; +BEGIN + -- Collect all unique prefixes and acquire locks upfront in sorted order to prevent deadlocks + SELECT array_agg(DISTINCT storage.get_direct_parent(d.name) ORDER BY storage.get_direct_parent(d.name)) + INTO _all_prefixes + FROM deleted d + WHERE position('/' in d.name) > 0 + AND storage.get_direct_parent(d.name) != ''; + + IF _all_prefixes IS NOT NULL AND array_length(_all_prefixes, 1) > 0 THEN + FOR _prefix_updates IN + SELECT DISTINCT bucket_id FROM deleted + LOOP + PERFORM storage.lock_multiple_prefixes(_prefix_updates.bucket_id, _all_prefixes); + END LOOP; + END IF; + + -- Process prefix updates atomically + FOR _prefix_updates IN + WITH deleted_with_parents AS ( + SELECT + d.bucket_id, + storage.get_direct_parent(d.name) as parent_prefix, + COUNT(*) as deleted_count + FROM deleted d + WHERE position('/' in d.name) > 0 -- Only objects with parent prefixes + GROUP BY d.bucket_id, storage.get_direct_parent(d.name) + ) + SELECT bucket_id, parent_prefix, deleted_count + FROM deleted_with_parents + WHERE parent_prefix != '' -- Exclude root level + ORDER BY bucket_id, parent_prefix -- Maintain consistent ordering + LOOP + PERFORM storage.decrement_prefix_object_count( + _prefix_updates.bucket_id, + _prefix_updates.parent_prefix, + _prefix_updates.deleted_count + ); + END LOOP; + + RETURN NULL; +END; +$$; + +-- Trigger function for object updates (statement-level AFTER) +-- Handles moves between prefixes by updating both old and new parents +CREATE OR REPLACE FUNCTION storage.objects_update_after_ref_counting() + RETURNS trigger + LANGUAGE plpgsql + SECURITY DEFINER +AS $$ +DECLARE + _all_prefixes text[]; + _prefix_updates RECORD; +BEGIN + -- Collect all unique prefixes (both old and new) and acquire locks upfront in sorted order to prevent deadlocks + SELECT array_agg(DISTINCT prefix_name ORDER BY prefix_name) + INTO _all_prefixes + FROM ( + SELECT storage.get_direct_parent(o.name) as prefix_name, o.bucket_id + FROM old_table o + WHERE position('/' in o.name) > 0 + UNION + SELECT storage.get_direct_parent(n.name) as prefix_name, n.bucket_id + FROM new_table n + WHERE position('/' in n.name) > 0 + ) prefixes + WHERE prefix_name != ''; + + IF _all_prefixes IS NOT NULL AND array_length(_all_prefixes, 1) > 0 THEN + FOR _prefix_updates IN + SELECT DISTINCT bucket_id FROM old_table + UNION + SELECT DISTINCT bucket_id FROM new_table + LOOP + PERFORM storage.lock_multiple_prefixes(_prefix_updates.bucket_id, _all_prefixes); + END LOOP; + END IF; + + -- Decrement old parents atomically + FOR _prefix_updates IN + WITH old_parents AS ( + SELECT + o.bucket_id, + storage.get_direct_parent(o.name) as parent_prefix, + COUNT(*) as moved_count + FROM old_table o + INNER JOIN new_table n ON o.id = n.id + WHERE position('/' in o.name) > 0 + AND (o.bucket_id != n.bucket_id OR storage.get_direct_parent(o.name) != storage.get_direct_parent(n.name)) + GROUP BY o.bucket_id, storage.get_direct_parent(o.name) + ) + SELECT bucket_id, parent_prefix, moved_count + FROM old_parents + WHERE parent_prefix != '' + ORDER BY bucket_id, parent_prefix + LOOP + PERFORM storage.decrement_prefix_object_count( + _prefix_updates.bucket_id, + _prefix_updates.parent_prefix, + _prefix_updates.moved_count + ); + END LOOP; + + -- Increment new parents atomically + FOR _prefix_updates IN + WITH new_parents AS ( + SELECT + n.bucket_id, + storage.get_direct_parent(n.name) as parent_prefix, + COUNT(*) as moved_count + FROM new_table n + INNER JOIN old_table o ON o.id = n.id + WHERE position('/' in n.name) > 0 + AND (o.bucket_id != n.bucket_id OR storage.get_direct_parent(o.name) != storage.get_direct_parent(n.name)) + GROUP BY n.bucket_id, storage.get_direct_parent(n.name) + ) + SELECT bucket_id, parent_prefix, moved_count + FROM new_parents + WHERE parent_prefix != '' + ORDER BY bucket_id, parent_prefix + LOOP + PERFORM storage.increment_prefix_object_count( + _prefix_updates.bucket_id, + _prefix_updates.parent_prefix, + _prefix_updates.moved_count + ); + END LOOP; + + RETURN NULL; +END; +$$; + +-- Trigger function for prefix insertions +-- Increments parent prefix child_prefixes count +CREATE OR REPLACE FUNCTION storage.prefixes_insert_ref_counting() + RETURNS trigger + LANGUAGE plpgsql +AS $$ +BEGIN + PERFORM storage.increment_prefix_child_count(NEW.bucket_id, NEW.name); + RETURN NEW; +END; +$$; + +-- Trigger function for prefix deletions +-- Decrements parent prefix child_prefixes count +CREATE OR REPLACE FUNCTION storage.prefixes_delete_ref_counting() + RETURNS trigger + LANGUAGE plpgsql +AS $$ +BEGIN + PERFORM storage.decrement_prefix_child_count(OLD.bucket_id, OLD.name); + RETURN OLD; +END; +$$; + +-- Object triggers +CREATE TRIGGER objects_insert_set_level + BEFORE INSERT ON storage.objects + FOR EACH ROW + EXECUTE FUNCTION storage.objects_set_level(); + +CREATE TRIGGER objects_insert_ref_counting + AFTER INSERT ON storage.objects + REFERENCING NEW TABLE AS inserted + FOR EACH STATEMENT + EXECUTE FUNCTION storage.objects_insert_after_ref_counting(); + +CREATE TRIGGER objects_delete_ref_counting + AFTER DELETE ON storage.objects + REFERENCING OLD TABLE AS deleted + FOR EACH STATEMENT + EXECUTE FUNCTION storage.objects_delete_ref_counting(); + +CREATE TRIGGER objects_update_set_level + BEFORE UPDATE ON storage.objects + FOR EACH ROW + EXECUTE FUNCTION storage.objects_set_level(); + +CREATE TRIGGER objects_update_after_ref_counting + AFTER UPDATE ON storage.objects + REFERENCING OLD TABLE AS old_table NEW TABLE AS new_table + FOR EACH STATEMENT + EXECUTE FUNCTION storage.objects_update_after_ref_counting(); + +-- Prefix triggers +CREATE TRIGGER prefixes_insert_ref_counting + AFTER INSERT ON storage.prefixes + FOR EACH ROW + EXECUTE FUNCTION storage.prefixes_insert_ref_counting(); + +CREATE TRIGGER prefixes_delete_ref_counting + AFTER DELETE ON storage.prefixes + FOR EACH ROW + EXECUTE FUNCTION storage.prefixes_delete_ref_counting(); + +-- Create index to help find orphaned prefixes (used by decrement functions and migration 0045 cleanup) +CREATE INDEX IF NOT EXISTS prefixes_empty_idx ON storage.prefixes(bucket_id, level) WHERE child_objects = 0 AND child_prefixes = 0; \ No newline at end of file diff --git a/migrations/tenant/0045-calculate-ref-counts.sql b/migrations/tenant/0045-calculate-ref-counts.sql new file mode 100644 index 00000000..cd3502f1 --- /dev/null +++ b/migrations/tenant/0045-calculate-ref-counts.sql @@ -0,0 +1,282 @@ +-- postgres-migrations disable-transaction + +-- Helper function to calculate counts for all -1 prefixes +CREATE OR REPLACE FUNCTION storage.migrate_calculate_sentinel_prefixes() +RETURNS void +LANGUAGE plpgsql +AS $func$ +DECLARE + batch_size INTEGER := 10000; + total_processed INTEGER := 0; + rows_in_batch INTEGER := 0; + delay INTEGER := 1; + start_time TIMESTAMPTZ; + exec_duration INTERVAL; + BEGIN + LOOP + start_time := clock_timestamp(); + + -- Batch of -1 prefixes to process + WITH batch AS ( + SELECT bucket_id, name + FROM storage.prefixes + WHERE (child_objects < 0 OR child_prefixes < 0) + ORDER BY bucket_id, name + LIMIT batch_size + ), + -- Calculate child_objects for this batch + object_counts AS ( + SELECT + b.bucket_id, + b.name, + COUNT(o.id) as child_count + FROM batch b + LEFT JOIN storage.objects o + ON o.bucket_id = b.bucket_id + AND regexp_replace(o.name, '/[^/]+$', '') = b.name + AND position('/' in o.name) > 0 + GROUP BY b.bucket_id, b.name + ), + -- Calculate child_prefixes for this batch + prefix_counts AS ( + SELECT + b.bucket_id, + b.name, + COUNT(p.name) as child_count + FROM batch b + LEFT JOIN storage.prefixes p + ON p.bucket_id = b.bucket_id + AND regexp_replace(p.name, '/[^/]+$', '') = b.name + AND position('/' in p.name) > 0 + GROUP BY b.bucket_id, b.name + ) + -- Update counts for this batch + UPDATE storage.prefixes prf + SET + child_objects = COALESCE(oc.child_count, 0), + child_prefixes = COALESCE(pc.child_count, 0) + FROM batch b + LEFT JOIN object_counts oc ON b.bucket_id = oc.bucket_id AND b.name = oc.name + LEFT JOIN prefix_counts pc ON b.bucket_id = pc.bucket_id AND b.name = pc.name + WHERE prf.bucket_id = b.bucket_id AND prf.name = b.name; + + GET DIAGNOSTICS rows_in_batch = ROW_COUNT; + + exec_duration := clock_timestamp() - start_time; + total_processed := total_processed + COALESCE(rows_in_batch, 0); + + RAISE NOTICE ' Batch: % prefixes | Duration: % | Batch size: %', + COALESCE(rows_in_batch, 0), + exec_duration, + batch_size; + + -- Exit when no more -1 prefixes remain + EXIT WHEN rows_in_batch = 0; + + PERFORM pg_sleep(delay); + + -- Adaptive batch sizing (same as migration 0029) + IF exec_duration > interval '3 seconds' THEN + IF batch_size <= 20000 THEN + batch_size := GREATEST(batch_size - 1000, 5000); + ELSE + batch_size := 20000; + END IF; + ELSE + batch_size := LEAST(batch_size + 5000, 50000); + END IF; + + delay := CASE WHEN delay >= 10 THEN 1 ELSE delay + 1 END; + END LOOP; + + RAISE NOTICE ' Total prefixes processed: %', total_processed; + + -- Delete any orphaned prefixes (0,0 ref counts) + DELETE FROM storage.prefixes + WHERE child_objects = 0 AND child_prefixes = 0; + + GET DIAGNOSTICS rows_in_batch = ROW_COUNT; + IF rows_in_batch > 0 THEN + RAISE NOTICE ' Deleted % orphaned prefixes', rows_in_batch; + END IF; +END; +$func$; + +DO $$ +DECLARE + sentinel_count INTEGER := 0; +BEGIN + RAISE NOTICE '================================================='; + RAISE NOTICE 'Starting prefix ref counting migration'; + RAISE NOTICE 'Strategy: Process -1 sentinels with adaptive batching'; + RAISE NOTICE '================================================='; + + -- Create temporary index to speed up batch queries + RAISE NOTICE 'Creating temporary index for migration...'; + CREATE INDEX IF NOT EXISTS prefixes_sentinel_idx + ON storage.prefixes(bucket_id, name) + WHERE (child_objects < 0 OR child_prefixes < 0); + RAISE NOTICE 'Index created.'; + + -- Phase 1: Calculate counts for all -1 prefixes + RAISE NOTICE 'Phase 1: Calculating counts for all -1 prefixes...'; + + PERFORM storage.migrate_calculate_sentinel_prefixes(); + + -- Phase 2: Replace sentinel functions with real increment/decrement logic + RAISE NOTICE 'Phase 2: Activating real increment/decrement functions...'; + + -- Real increment_prefix_child_count (child_prefixes) + CREATE OR REPLACE FUNCTION storage.increment_prefix_child_count( + _bucket_id text, + _child_name text + ) + RETURNS void + LANGUAGE plpgsql + SECURITY DEFINER + AS $func$ + DECLARE + _parent_name text; + BEGIN + _parent_name := storage.get_direct_parent(_child_name); + + IF _parent_name = '' THEN + RETURN; + END IF; + + PERFORM storage.lock_prefix(_bucket_id, _parent_name); + + INSERT INTO storage.prefixes (bucket_id, name, child_objects, child_prefixes) + VALUES (_bucket_id, _parent_name, 0, 1) + ON CONFLICT (bucket_id, level, name) + DO UPDATE SET child_prefixes = storage.prefixes.child_prefixes + 1; + END; + $func$; + + -- Real decrement_prefix_child_count (child_prefixes) + CREATE OR REPLACE FUNCTION storage.decrement_prefix_child_count( + _bucket_id text, + _child_name text + ) + RETURNS void + LANGUAGE plpgsql + SECURITY DEFINER + AS $func$ + DECLARE + _parent_name text; + _new_object_count integer; + _new_prefix_count integer; + BEGIN + _parent_name := storage.get_direct_parent(_child_name); + + IF _parent_name = '' THEN + RETURN; + END IF; + + PERFORM storage.lock_prefix(_bucket_id, _parent_name); + + UPDATE storage.prefixes + SET child_prefixes = child_prefixes - 1 + WHERE storage.prefixes.bucket_id = _bucket_id + AND storage.prefixes.name = _parent_name + RETURNING child_objects, child_prefixes + INTO _new_object_count, _new_prefix_count; + + IF _new_object_count = 0 AND _new_prefix_count = 0 THEN + DELETE FROM storage.prefixes + WHERE storage.prefixes.bucket_id = _bucket_id + AND storage.prefixes.name = _parent_name + AND child_objects = 0 + AND child_prefixes = 0; + END IF; + END; + $func$; + + -- Real increment_prefix_object_count (child_objects) + CREATE OR REPLACE FUNCTION storage.increment_prefix_object_count( + _bucket_id text, + _prefix_name text, + _count bigint + ) + RETURNS void + LANGUAGE plpgsql + SECURITY DEFINER + AS $func$ + BEGIN + IF _prefix_name = '' THEN + RETURN; + END IF; + + INSERT INTO storage.prefixes (bucket_id, name, child_objects, child_prefixes) + VALUES (_bucket_id, _prefix_name, _count, 0) + ON CONFLICT (bucket_id, level, name) + DO UPDATE SET child_objects = storage.prefixes.child_objects + _count; + END; + $func$; + + -- Real decrement_prefix_object_count (child_objects) + CREATE OR REPLACE FUNCTION storage.decrement_prefix_object_count( + _bucket_id text, + _prefix_name text, + _count bigint + ) + RETURNS void + LANGUAGE plpgsql + SECURITY DEFINER + AS $func$ + DECLARE + _new_object_count integer; + _new_prefix_count integer; + BEGIN + IF _prefix_name = '' THEN + RETURN; + END IF; + + UPDATE storage.prefixes + SET child_objects = child_objects - _count + WHERE storage.prefixes.bucket_id = _bucket_id + AND storage.prefixes.name = _prefix_name + RETURNING child_objects, child_prefixes + INTO _new_object_count, _new_prefix_count; + + IF _new_object_count = 0 AND _new_prefix_count = 0 THEN + DELETE FROM storage.prefixes + WHERE storage.prefixes.bucket_id = _bucket_id + AND storage.prefixes.name = _prefix_name + AND child_objects = 0 + AND child_prefixes = 0; + END IF; + END; + $func$; + + RAISE NOTICE ' Replaced 4 functions with real increment/decrement logic'; + + -- Phase 3: Final validation and cleanup of any -1 prefixes created during migration + RAISE NOTICE 'Phase 3: Checking for -1 prefixes created during migration...'; + + SELECT COUNT(*) INTO sentinel_count + FROM storage.prefixes + WHERE child_objects < 0 OR child_prefixes < 0; + + IF sentinel_count > 0 THEN + RAISE NOTICE ' Found % -1 prefixes created during migration. Calculating final counts...', sentinel_count; + PERFORM storage.migrate_calculate_sentinel_prefixes(); + ELSE + RAISE NOTICE ' No -1 prefixes found - migration was clean!'; + END IF; + + -- Drop temporary index + RAISE NOTICE 'Dropping temporary index...'; + DROP INDEX IF EXISTS storage.prefixes_sentinel_idx; + RAISE NOTICE 'Index dropped.'; + + -- Final summary + RAISE NOTICE '================================================='; + RAISE NOTICE 'Migration complete!'; + RAISE NOTICE 'All counts are now accurate and maintained by triggers.'; + RAISE NOTICE '================================================='; +END; +$$; + +-- Drop the helper function (no longer needed after migration) +DROP FUNCTION IF EXISTS storage.migrate_calculate_sentinel_prefixes(); diff --git a/src/internal/database/migrations/types.ts b/src/internal/database/migrations/types.ts index deb27086..d54fdaaf 100644 --- a/src/internal/database/migrations/types.ts +++ b/src/internal/database/migrations/types.ts @@ -40,5 +40,8 @@ export const DBMigration = { 'add-search-v2-sort-support': 39, 'fix-prefix-race-conditions-optimized': 40, 'add-object-level-update-trigger': 41, - 'fix-object-level': 42, + 'rollback-prefix-triggers': 42, + 'fix-object-level': 43, + 'prefix-ref-counting': 44, + 'calculate-ref-counts': 45, } diff --git a/src/test/prefixes.test.ts b/src/test/prefixes.test.ts index f0496967..f8bf8220 100644 --- a/src/test/prefixes.test.ts +++ b/src/test/prefixes.test.ts @@ -166,7 +166,7 @@ describe('Prefix Hierarchy Race Condition Tests', () => { }) describe('Race Condition Scenario 1: Concurrent Deletes of Related Objects', () => { - it.skip('should handle concurrent deletion of objects in same folder without leaving dangling prefixes', async () => { + it('should handle concurrent deletion of objects in same folder without leaving dangling prefixes', async () => { // Create multiple objects in the same folder structure await createObject('shared/folder/file1.txt') await createObject('shared/folder/file2.txt') @@ -198,7 +198,7 @@ describe('Prefix Hierarchy Race Condition Tests', () => { expect(prefixes).toHaveLength(0) }) - it.skip('should handle partial concurrent deletion correctly', async () => { + it('should handle partial concurrent deletion correctly', async () => { // Create objects in multiple subfolders await createObject('race/test/file1.txt') await createObject('race/test/file2.txt') @@ -399,8 +399,43 @@ describe('Prefix Hierarchy Race Condition Tests', () => { }) }) + describe('Critical Race Condition: DELETE UPDATE Gap', () => { + it('should reproduce the race condition between UPDATE and DELETE in statement trigger', async () => { + await createObject('race/shared/file1.txt') + await createObject('race/shared/file2.txt') + + const db = tHelper.database.connection.pool.acquire() + + // Simulate the race condition by using two concurrent transactions: + // Execute concurrent operations that target the same prefix + await Promise.all([ + // Operation 1: Delete all objects in the prefix (this will try to delete the prefix) + deleteObjects(['race/shared/file1.txt', 'race/shared/file2.txt']), + + // Operation 2: Immediately create a new object in the same prefix + // This should increment the counters between the UPDATE and DELETE + (async () => { + // Small delay to increase chance of hitting the race window + await new Promise((resolve) => setTimeout(resolve, 1)) + await createObject('race/shared/file3.txt') + })(), + ]) + + const objects = await db + .select('name') + .from('storage.objects') + .where('bucket_id', bucketName) + .where('name', 'like', 'race/shared/%') + expect(objects.length).toBe(1) + + // If the race condition occurred, the prefix will be incorrectly deleted + const prefixes = await getPrefixes() + expect(prefixes.some((p) => p.name === 'race/shared')).toBe(true) + }) + }) + describe('Stress Test: High Concurrency', () => { - it.skip('should handle many concurrent operations without corruption', async () => { + it('should handle many concurrent operations without corruption', async () => { // Create many objects in overlapping folder structures const objects: string[] = [] const folders = ['stress1', 'stress2', 'stress3'] @@ -539,7 +574,7 @@ describe('Prefix Hierarchy Race Condition Tests', () => { expect(prefixes).toHaveLength(0) }) - it.skip('should handle concurrent moves from the same source folder without dangling prefixes', async () => { + it('should handle concurrent moves from the same source folder without dangling prefixes', async () => { await createObject('race-move/src/f1.txt') await createObject('race-move/src/f2.txt') await createObject('race-move/src/f3.txt') @@ -564,7 +599,7 @@ describe('Prefix Hierarchy Race Condition Tests', () => { }) }) - it.skip('should handle deadlock scenario in concurrent cross-prefix moves without hanging', async () => { + it('should handle deadlock scenario in concurrent cross-prefix moves without hanging', async () => { // This test reproduces the deadlock scenario where two transactions // try to move files between overlapping top-level prefixes in opposite directions: // Transaction 1: photos/* -> docs/* (locks photos -> docs) @@ -699,7 +734,7 @@ describe('Prefix Hierarchy Race Condition Tests', () => { }) describe('Stress Test: Move Operations', () => { - it.skip('should handle many concurrent moves and clean old prefixes correctly', async () => { + it('should handle many concurrent moves and clean old prefixes correctly', async () => { const sources = ['mvstress/src1', 'mvstress/src2', 'mvstress/src3'] const subs = ['sub1', 'sub2', 'sub3'] const countPerSub = 5