Skip to content

Commit aa66ca0

Browse files
authored
feat: search objects v2 (#626)
1 parent 6f58fe2 commit aa66ca0

35 files changed

+1253
-244
lines changed

.docker/docker-compose-infra.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ services:
55

66
tenant_db:
77
image: postgres:15
8+
shm_size: '1gb'
89
ports:
910
- '5432:5432'
1011
healthcheck:
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
ALTER TABLE storage.objects ADD COLUMN user_metadata jsonb NULL;
2-
ALTER TABLE storage.s3_multipart_uploads ADD COLUMN user_metadata jsonb NULL;
1+
ALTER TABLE storage.objects ADD COLUMN IF NOT EXISTS user_metadata jsonb NULL;
2+
ALTER TABLE storage.s3_multipart_uploads ADD COLUMN IF NOT EXISTS user_metadata jsonb NULL;
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
-- Add level column to objects
2+
ALTER TABLE storage.objects ADD COLUMN IF NOT EXISTS level INT NULL;
3+
4+
--- Index Functions
5+
CREATE OR REPLACE FUNCTION "storage"."get_level"("name" text)
6+
RETURNS int
7+
AS $func$
8+
SELECT array_length(string_to_array("name", '/'), 1);
9+
$func$ LANGUAGE SQL IMMUTABLE STRICT;
10+
11+
-- Table
12+
CREATE TABLE IF NOT EXISTS "storage"."prefixes" (
13+
"bucket_id" text,
14+
"name" text COLLATE "C" NOT NULL,
15+
"level" int GENERATED ALWAYS AS ("storage"."get_level"("name")) STORED,
16+
"created_at" timestamptz DEFAULT now(),
17+
"updated_at" timestamptz DEFAULT now(),
18+
CONSTRAINT "prefixes_bucketId_fkey" FOREIGN KEY ("bucket_id") REFERENCES "storage"."buckets"("id"),
19+
PRIMARY KEY ("bucket_id", "level", "name")
20+
);
21+
22+
ALTER TABLE storage.prefixes ENABLE ROW LEVEL SECURITY;
23+
24+
-- Functions
25+
CREATE OR REPLACE FUNCTION "storage"."get_prefix"("name" text)
26+
RETURNS text
27+
AS $func$
28+
SELECT
29+
CASE WHEN strpos("name", '/') > 0 THEN
30+
regexp_replace("name", '[\/]{1}[^\/]+\/?$', '')
31+
ELSE
32+
''
33+
END;
34+
$func$ LANGUAGE SQL IMMUTABLE STRICT;
35+
36+
CREATE OR REPLACE FUNCTION "storage"."get_prefixes"("name" text)
37+
RETURNS text[]
38+
AS $func$
39+
DECLARE
40+
parts text[];
41+
prefixes text[];
42+
prefix text;
43+
BEGIN
44+
-- Split the name into parts by '/'
45+
parts := string_to_array("name", '/');
46+
prefixes := '{}';
47+
48+
-- Construct the prefixes, stopping one level below the last part
49+
FOR i IN 1..array_length(parts, 1) - 1 LOOP
50+
prefix := array_to_string(parts[1:i], '/');
51+
prefixes := array_append(prefixes, prefix);
52+
END LOOP;
53+
54+
RETURN prefixes;
55+
END;
56+
$func$ LANGUAGE plpgsql IMMUTABLE STRICT;
57+
58+
CREATE OR REPLACE FUNCTION "storage"."add_prefixes"(
59+
"_bucket_id" TEXT,
60+
"_name" TEXT
61+
)
62+
RETURNS void
63+
SECURITY DEFINER
64+
AS $func$
65+
DECLARE
66+
prefixes text[];
67+
BEGIN
68+
prefixes := "storage"."get_prefixes"("_name");
69+
70+
IF array_length(prefixes, 1) > 0 THEN
71+
INSERT INTO storage.prefixes (name, bucket_id)
72+
SELECT UNNEST(prefixes) as name, "_bucket_id" ON CONFLICT DO NOTHING;
73+
END IF;
74+
END;
75+
$func$ LANGUAGE plpgsql VOLATILE;
76+
77+
CREATE OR REPLACE FUNCTION "storage"."delete_prefix" (
78+
"_bucket_id" TEXT,
79+
"_name" TEXT
80+
) RETURNS boolean
81+
SECURITY DEFINER
82+
AS $func$
83+
BEGIN
84+
-- Check if we can delete the prefix
85+
IF EXISTS(
86+
SELECT FROM "storage"."prefixes"
87+
WHERE "prefixes"."bucket_id" = "_bucket_id"
88+
AND level = "storage"."get_level"("_name") + 1
89+
AND "prefixes"."name" COLLATE "C" LIKE "_name" || '/%'
90+
LIMIT 1
91+
)
92+
OR EXISTS(
93+
SELECT FROM "storage"."objects"
94+
WHERE "objects"."bucket_id" = "_bucket_id"
95+
AND "storage"."get_level"("objects"."name") = "storage"."get_level"("_name") + 1
96+
AND "objects"."name" COLLATE "C" LIKE "_name" || '/%'
97+
LIMIT 1
98+
) THEN
99+
-- There are sub-objects, skip deletion
100+
RETURN false;
101+
ELSE
102+
DELETE FROM "storage"."prefixes"
103+
WHERE "prefixes"."bucket_id" = "_bucket_id"
104+
AND level = "storage"."get_level"("_name")
105+
AND "prefixes"."name" = "_name";
106+
RETURN true;
107+
END IF;
108+
END;
109+
$func$ LANGUAGE plpgsql VOLATILE;
110+
111+
-- Triggers
112+
CREATE OR REPLACE FUNCTION "storage"."prefixes_insert_trigger"()
113+
RETURNS trigger
114+
AS $func$
115+
BEGIN
116+
PERFORM "storage"."add_prefixes"(NEW."bucket_id", NEW."name");
117+
RETURN NEW;
118+
END;
119+
$func$ LANGUAGE plpgsql VOLATILE;
120+
121+
CREATE OR REPLACE FUNCTION "storage"."objects_insert_prefix_trigger"()
122+
RETURNS trigger
123+
AS $func$
124+
BEGIN
125+
PERFORM "storage"."add_prefixes"(NEW."bucket_id", NEW."name");
126+
NEW.level := "storage"."get_level"(NEW."name");
127+
128+
RETURN NEW;
129+
END;
130+
$func$ LANGUAGE plpgsql VOLATILE;
131+
132+
CREATE OR REPLACE FUNCTION "storage"."delete_prefix_hierarchy_trigger"()
133+
RETURNS trigger
134+
AS $func$
135+
DECLARE
136+
prefix text;
137+
BEGIN
138+
prefix := "storage"."get_prefix"(OLD."name");
139+
140+
IF coalesce(prefix, '') != '' THEN
141+
PERFORM "storage"."delete_prefix"(OLD."bucket_id", prefix);
142+
END IF;
143+
144+
RETURN OLD;
145+
END;
146+
$func$ LANGUAGE plpgsql VOLATILE;
147+
148+
-- "storage"."prefixes"
149+
CREATE OR REPLACE TRIGGER "prefixes_delete_hierarchy"
150+
AFTER DELETE ON "storage"."prefixes"
151+
FOR EACH ROW
152+
EXECUTE FUNCTION "storage"."delete_prefix_hierarchy_trigger"();
153+
154+
-- "storage"."objects"
155+
CREATE OR REPLACE TRIGGER "objects_insert_create_prefix"
156+
BEFORE INSERT ON "storage"."objects"
157+
FOR EACH ROW
158+
EXECUTE FUNCTION "storage"."objects_insert_prefix_trigger"();
159+
160+
CREATE OR REPLACE TRIGGER "objects_update_create_prefix"
161+
BEFORE UPDATE ON "storage"."objects"
162+
FOR EACH ROW
163+
WHEN (NEW.name != OLD.name)
164+
EXECUTE FUNCTION "storage"."objects_insert_prefix_trigger"();
165+
166+
CREATE OR REPLACE TRIGGER "objects_delete_delete_prefix"
167+
AFTER DELETE ON "storage"."objects"
168+
FOR EACH ROW
169+
EXECUTE FUNCTION "storage"."delete_prefix_hierarchy_trigger"();
170+
171+
-- Permissions
172+
DO $$
173+
DECLARE
174+
anon_role text = COALESCE(current_setting('storage.anon_role', true), 'anon');
175+
authenticated_role text = COALESCE(current_setting('storage.authenticated_role', true), 'authenticated');
176+
service_role text = COALESCE(current_setting('storage.service_role', true), 'service_role');
177+
BEGIN
178+
EXECUTE 'GRANT ALL ON TABLE storage.prefixes TO ' || service_role || ',' || authenticated_role || ', ' || anon_role;
179+
END$$;
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
2+
CREATE OR REPLACE FUNCTION storage.search_v2 (
3+
prefix text,
4+
bucket_name text,
5+
limits int DEFAULT 100,
6+
levels int default 1,
7+
start_after text DEFAULT ''
8+
) RETURNS TABLE (
9+
key text,
10+
name text,
11+
id uuid,
12+
updated_at timestamptz,
13+
created_at timestamptz,
14+
metadata jsonb
15+
)
16+
SECURITY INVOKER
17+
AS $func$
18+
BEGIN
19+
RETURN query EXECUTE
20+
$sql$
21+
SELECT * FROM (
22+
(
23+
SELECT
24+
split_part(name, '/', $4) AS key,
25+
name || '/' AS name,
26+
NULL::uuid AS id,
27+
NULL::timestamptz AS updated_at,
28+
NULL::timestamptz AS created_at,
29+
NULL::jsonb AS metadata
30+
FROM storage.prefixes
31+
WHERE name COLLATE "C" LIKE $1 || '%'
32+
AND bucket_id = $2
33+
AND level = $4
34+
AND name COLLATE "C" > $5
35+
ORDER BY prefixes.name COLLATE "C" LIMIT $3
36+
)
37+
UNION ALL
38+
(SELECT split_part(name, '/', $4) AS key,
39+
name,
40+
id,
41+
updated_at,
42+
created_at,
43+
metadata
44+
FROM storage.objects
45+
WHERE name COLLATE "C" LIKE $1 || '%'
46+
AND bucket_id = $2
47+
AND level = $4
48+
AND name COLLATE "C" > $5
49+
ORDER BY name COLLATE "C" LIMIT $3)
50+
) obj
51+
ORDER BY name COLLATE "C" LIMIT $3;
52+
$sql$
53+
USING prefix, bucket_name, limits, levels, start_after;
54+
END;
55+
$func$ LANGUAGE plpgsql STABLE;
56+
57+
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- postgres-migrations disable-transaction
2+
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS idx_name_bucket_unique on storage.objects (name COLLATE "C", bucket_id);
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
-- postgres-migrations disable-transaction
2+
-- Backfill prefixes table records
3+
-- We run this with 50k batch size to avoid long running transaction
4+
DO $$
5+
DECLARE
6+
batch_size INTEGER := 50000;
7+
total_scanned INTEGER := 0;
8+
row_returned INTEGER := 0;
9+
last_name TEXT COLLATE "C" := NULL;
10+
last_bucket_id TEXT COLLATE "C" := NULL;
11+
BEGIN
12+
LOOP
13+
-- Fetch a batch of objects ordered by name COLLATE "C"
14+
WITH batch as (
15+
SELECT id, bucket_id, name, owner
16+
FROM storage.objects
17+
WHERE (last_name IS NULL OR ((name COLLATE "C", bucket_id) > (last_name, last_bucket_id)))
18+
ORDER BY name COLLATE "C", bucket_id
19+
LIMIT batch_size
20+
),
21+
batch_count as (
22+
SELECT COUNT(*) as count FROM batch
23+
),
24+
cursor as (
25+
SELECT name as last_name, bucket_id as last_bucket FROM batch b
26+
ORDER BY name COLLATE "C" DESC, bucket_id DESC LIMIT 1
27+
),
28+
all_prefixes as (
29+
SELECT UNNEST(storage.get_prefixes(name)) as prefix, bucket_id
30+
FROM batch
31+
),
32+
insert_prefixes as (
33+
INSERT INTO storage.prefixes (bucket_id, name)
34+
SELECT bucket_id, prefix FROM all_prefixes
35+
WHERE coalesce(prefix, '') != ''
36+
ON CONFLICT DO NOTHING
37+
)
38+
SELECT count, cursor.last_name, cursor.last_bucket FROM cursor, batch_count INTO row_returned, last_name, last_bucket_id;
39+
40+
RAISE NOTICE 'Object Row returned: %', row_returned;
41+
RAISE NOTICE 'Last Object: %', last_name;
42+
43+
total_scanned := total_scanned + row_returned;
44+
45+
IF row_returned IS NULL OR row_returned < batch_size THEN
46+
RAISE NOTICE 'Total Object scanned: %', coalesce(total_scanned, 0);
47+
COMMIT;
48+
EXIT;
49+
ELSE
50+
COMMIT;
51+
END IF;
52+
END LOOP;
53+
END;
54+
$$;
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
-- postgres-migrations disable-transaction
2+
-- Backfill prefixes table records
3+
-- We run this with 10k batch size to avoid long running transaction
4+
DO $$
5+
DECLARE
6+
batch_size INTEGER := 10000;
7+
total_scanned INTEGER := 0;
8+
row_returned INTEGER := 0;
9+
last_name TEXT COLLATE "C" := NULL;
10+
last_bucket_id TEXT COLLATE "C" := NULL;
11+
BEGIN
12+
LOOP
13+
-- Fetch a batch of objects ordered by name COLLATE "C"
14+
WITH batch as (
15+
SELECT id, bucket_id, name, storage.get_level(name) as level
16+
FROM storage.objects
17+
WHERE level IS NULL AND (last_name IS NULL OR (name COLLATE "C", bucket_id) > (last_name, last_bucket_id))
18+
ORDER BY name COLLATE "C", bucket_id
19+
LIMIT batch_size
20+
),
21+
batch_count as (
22+
SELECT COUNT(*) as count FROM batch
23+
),
24+
cursor as (
25+
SELECT name as last_name, bucket_id as last_bucket FROM batch b
26+
ORDER BY name COLLATE "C" DESC, bucket_id DESC LIMIT 1
27+
),
28+
update_level as (
29+
UPDATE storage.objects o
30+
SET level = b.level
31+
FROM batch b
32+
WHERE o.id = b.id
33+
)
34+
SELECT count, cursor.last_name, cursor.last_bucket FROM cursor, batch_count INTO row_returned, last_name, last_bucket_id;
35+
36+
RAISE NOTICE 'Object Row returned: %', row_returned;
37+
RAISE NOTICE 'Last Object: %', last_name;
38+
39+
total_scanned := total_scanned + row_returned;
40+
41+
IF row_returned IS NULL OR row_returned < batch_size THEN
42+
RAISE NOTICE 'Total Object scanned: %', coalesce(total_scanned, 0);
43+
COMMIT;
44+
EXIT;
45+
ELSE
46+
COMMIT;
47+
END IF;
48+
END LOOP;
49+
END;
50+
$$;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
-- postgres-migrations disable-transaction
2+
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "objects_bucket_id_level_idx"
3+
ON "storage"."objects" ("bucket_id", level, "name" COLLATE "C");
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- postgres-migrations disable-transaction
2+
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_objects_lower_name ON storage.objects ((path_tokens[level]), lower(name) text_pattern_ops, bucket_id, level);
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- postgres-migrations disable-transaction
2+
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_prefixes_lower_name ON storage.prefixes (bucket_id, level, ((string_to_array(name, '/'))[level]), lower(name) text_pattern_ops);

0 commit comments

Comments
 (0)