Skip to content

Commit 550a0fc

Browse files
committed
add managed schema for subscription management
1 parent 07758cd commit 550a0fc

File tree

1 file changed

+100
-0
lines changed

1 file changed

+100
-0
lines changed
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
-- migrate:up
2+
create schema if not exists supabase_subscription_mgmt;
3+
4+
create or replace function supabase_subscription_mgmt.pg_create_subscription(
5+
arg_subscription_name text,
6+
arg_connection_string text,
7+
arg_publication_name text,
8+
arg_slot_name text,
9+
arg_copy_data boolean = true,
10+
arg_origin text = 'any'
11+
)
12+
returns void language plpgsql
13+
security definer
14+
set search_path = pg_catalog, supabase_subscription_mgmt
15+
as $$
16+
declare
17+
pg_version int;
18+
create_subscription_cmd text;
19+
begin
20+
-- get the postgresql version
21+
select current_setting('server_version_num')::int into pg_version;
22+
23+
if pg_version < 160000 and arg_origin <> 'any' then
24+
raise exception 'postgresql version must be 16 or higher to specify origin other than "any". current version: %', pg_version;
25+
end if;
26+
27+
if arg_origin <> 'any' and arg_origin <> 'none' then
28+
raise exception 'invalid origin: %. origin must be either "any" or "none".', arg_origin;
29+
end if;
30+
31+
-- pg16 and later: include the origin parameter only if it's 'none', as its default is any
32+
if pg_version >= 160000 and arg_origin = 'none' then
33+
create_subscription_cmd := pg_catalog.format(
34+
'create subscription %I connection %L publication %I with (slot_name=%L, create_slot=false, copy_data=%s, origin=%L)',
35+
arg_subscription_name, arg_connection_string, arg_publication_name, arg_slot_name, arg_copy_data::text, arg_origin);
36+
else
37+
create_subscription_cmd := pg_catalog.format(
38+
'create subscription %I connection %L publication %I with (slot_name=%L, create_slot=false, copy_data=%s)',
39+
arg_subscription_name, arg_connection_string, arg_publication_name, arg_slot_name, arg_copy_data::text);
40+
end if;
41+
42+
-- execute the create subscription command
43+
execute create_subscription_cmd;
44+
end;
45+
$$;
46+
47+
48+
create or replace function supabase_subscription_mgmt.pg_alter_subscription_disable(
49+
arg_subscription_name text
50+
)
51+
returns void language plpgsql
52+
security definer
53+
set search_path = pg_catalog
54+
as $$
55+
begin
56+
execute pg_catalog.format('alter subscription %I disable', arg_subscription_name);
57+
end;
58+
$$;
59+
60+
create or replace function supabase_subscription_mgmt.pg_alter_subscription_enable(
61+
arg_subscription_name text
62+
)
63+
returns void language plpgsql
64+
security definer
65+
set search_path = pg_catalog, supabase_subscription_mgmt
66+
as $$
67+
begin
68+
execute pg_catalog.format('alter subscription %I enable', arg_subscription_name);
69+
end;
70+
$$;
71+
72+
create or replace function supabase_subscription_mgmt.pg_drop_subscription(
73+
arg_subscription_name text
74+
)
75+
returns void language plpgsql
76+
security definer
77+
set search_path = pg_catalog, supabase_subscription_mgmt
78+
as $$
79+
declare
80+
l_slot_name text;
81+
l_subconninfo text;
82+
begin
83+
select subslotname, subconninfo
84+
into l_slot_name, l_subconninfo
85+
from pg_catalog.pg_subscription
86+
where subname = arg_subscription_name;
87+
if l_slot_name is null and l_subconninfo is null then
88+
raise exception 'no subscription found for name: %', arg_subscription_name;
89+
end if;
90+
execute pg_catalog.format('alter subscription %I disable', arg_subscription_name);
91+
execute pg_catalog.format('alter subscription %I set (slot_name = none)', arg_subscription_name);
92+
execute pg_catalog.format('drop subscription %I', arg_subscription_name);
93+
end;
94+
$$;
95+
96+
grant usage on schema supabase_subscription_mgmt to postgres;
97+
grant execute on all functions in schema supabase_subscription_mgmt to postgres;
98+
99+
-- migrate:down
100+

0 commit comments

Comments
 (0)