Skip to content

Commit 0edcb8e

Browse files
kolaentealejandrodnm
authored andcommitted
feat: add OpenAI's async batch API support for vectorizer
Adds support for OpenAI's async batch API to process large amounts of embeddings at a lower cost, and higher rate limits. Key features: - New AsyncBatchEmbedder interface for handling async batch operations - Support for OpenAI's batch API implementation - New database tables for tracking batch status and chunks - Configurable polling interval for batch status checks - Automatic retry mechanism for failed batches Database changes: - New async_batch_queue_table for tracking batch status - New async_batch_chunks_table for storing chunks pending processing - Added async_batch_polling_interval column to vectorizer table - New SQL functions for managing async batch operations API changes: - New async_batch_enabled parameter in ai.embedding_openai() - New ai.vectorizer_enable_async_batches() and ai.vectorizer_disable_async_batches() functions - Extended vectorizer configuration to support async batch operations The async batch workflow: 1. Chunks are collected and submitted as a batch to OpenAI 2. Batch status is monitored through polling 3. When ready, embeddings are retrieved and stored 4. Batch resources are cleaned up after successful processing
1 parent 3427bcc commit 0edcb8e

21 files changed

+5024
-1456
lines changed

projects/extension/sql/idempotent/008-embedding.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ create or replace function ai.embedding_openai
77
, chat_user pg_catalog.text default null
88
, api_key_name pg_catalog.text default 'OPENAI_API_KEY'
99
, base_url text default null
10+
, async_batch_enabled pg_catalog.bool default false
1011
) returns pg_catalog.jsonb
1112
as $func$
1213
select json_object
@@ -17,6 +18,7 @@ as $func$
1718
, 'user': chat_user
1819
, 'api_key_name': api_key_name
1920
, 'base_url': base_url
21+
, 'async_batch_enabled': async_batch_enabled
2022
absent on null
2123
)
2224
$func$ language sql immutable security invoker

projects/extension/sql/idempotent/013-vectorizer-api.sql

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
2-
31
-------------------------------------------------------------------------------
42
-- execute_vectorizer
53
create or replace function ai.execute_vectorizer(vectorizer_id pg_catalog.int4) returns void
@@ -31,6 +29,9 @@ create or replace function ai.create_vectorizer
3129
, queue_table pg_catalog.name default null
3230
, grant_to pg_catalog.name[] default ai.grant_to()
3331
, enqueue_existing pg_catalog.bool default true
32+
, async_batch_queue_table pg_catalog.name default null
33+
, async_batch_chunks_table pg_catalog.name default null
34+
, async_batch_polling_interval pg_catalog.interval default '5 minutes'::pg_catalog.interval
3435
) returns pg_catalog.int4
3536
as $func$
3637
declare
@@ -44,6 +45,7 @@ declare
4445
_vectorizer_id pg_catalog.int4;
4546
_sql pg_catalog.text;
4647
_job_id pg_catalog.int8;
48+
_async_batch_supported pg_catalog.bool;
4749
begin
4850
-- make sure all the roles listed in grant_to exist
4951
if grant_to is not null then
@@ -117,6 +119,14 @@ begin
117119
_trigger_name = pg_catalog.concat('_vectorizer_src_trg_', _vectorizer_id);
118120
queue_schema = coalesce(queue_schema, 'ai');
119121
queue_table = coalesce(queue_table, pg_catalog.concat('_vectorizer_q_', _vectorizer_id));
122+
async_batch_queue_table = coalesce(
123+
async_batch_queue_table,
124+
pg_catalog.concat('_vectorizer_async_batch_q_', _vectorizer_id)
125+
);
126+
async_batch_chunks_table = coalesce(
127+
async_batch_chunks_table,
128+
pg_catalog.concat('_vectorizer_async_batch_chunks_', _vectorizer_id)
129+
);
120130

121131
-- make sure view name is available
122132
if pg_catalog.to_regclass(pg_catalog.format('%I.%I', view_schema, view_name)) is not null then
@@ -133,6 +143,16 @@ begin
133143
raise exception 'an object named %.% already exists. specify an alternate queue_table explicitly', queue_schema, queue_table;
134144
end if;
135145

146+
-- make sure embedding batch table name is available
147+
if pg_catalog.to_regclass(pg_catalog.format('%I.%I', queue_schema, async_batch_queue_table)) is not null then
148+
raise exception 'an object named %.% already exists. specify an alternate async_batch_queue_table explicitly', queue_schema, async_batch_queue_table;
149+
end if;
150+
151+
-- make sure embedding batch chunks table name is available
152+
if pg_catalog.to_regclass(pg_catalog.format('%I.%I', queue_schema, async_batch_chunks_table)) is not null then
153+
raise exception 'an object named %.% already exists. specify an alternate async_batch_chunks_table explicitly', queue_schema, async_batch_chunks_table;
154+
end if;
155+
136156
-- validate the embedding config
137157
perform ai._validate_embedding(embedding);
138158

@@ -225,6 +245,25 @@ begin
225245
scheduling = pg_catalog.jsonb_insert(scheduling, array['job_id'], pg_catalog.to_jsonb(_job_id));
226246
end if;
227247

248+
-- TODO: I wanted this to be created only when enabling the async batch
249+
-- support, so that we don't create 2 extra tables that probably won't be
250+
-- used. The issue is that we don't store the value of grant_to.
251+
-- Tow new tables might not be enough to warrant any changes, but if you're
252+
-- multi-tenant with 100 of customers, it'll be like 200 extra empty
253+
-- tables.
254+
--
255+
-- create async batch tables.
256+
select (embedding operator(pg_catalog.?) 'async_batch_enabled')::bool into _async_batch_supported;
257+
if _async_batch_supported is true then
258+
perform ai._vectorizer_create_async_batch_tables
259+
( queue_schema
260+
, async_batch_queue_table
261+
, async_batch_chunks_table
262+
, _source_pk
263+
, grant_to
264+
);
265+
end if;
266+
228267
insert into ai.vectorizer
229268
( id
230269
, source_schema
@@ -238,6 +277,9 @@ begin
238277
, queue_schema
239278
, queue_table
240279
, config
280+
, async_batch_queue_table
281+
, async_batch_chunks_table
282+
, async_batch_polling_interval
241283
)
242284
values
243285
( _vectorizer_id
@@ -260,6 +302,9 @@ begin
260302
, 'scheduling', scheduling
261303
, 'processing', processing
262304
)
305+
, async_batch_queue_table
306+
, async_batch_chunks_table
307+
, async_batch_polling_interval
263308
);
264309

265310
-- record dependencies in pg_depend
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
-------------------------------------------------------------------------------
2+
-- _vectorizer_create_async_batch__table
3+
create or replace function ai._vectorizer_create_async_batch_tables(
4+
schema_name name,
5+
async_batch_queue_table name,
6+
async_batch_chunks_table name,
7+
source_pk pg_catalog.jsonb,
8+
grant_to name []
9+
) returns void as
10+
$func$
11+
declare
12+
_sql text;
13+
_index_name text;
14+
_pk_cols pg_catalog.text;
15+
begin
16+
-- create the batches table
17+
select pg_catalog.format
18+
( $sql$create table %I.%I(
19+
id VARCHAR(255) PRIMARY KEY,
20+
created_at TIMESTAMP(0) NOT NULL DEFAULT NOW(),
21+
status TEXT NOT NULL,
22+
errors JSONB,
23+
metadata JSONB,
24+
next_attempt_after TIMESTAMPTZ NOT NULL,
25+
total_attempts INT NOT NULL DEFAULT 0
26+
)$sql$
27+
, schema_name
28+
, async_batch_queue_table
29+
) into strict _sql
30+
;
31+
execute _sql;
32+
33+
select pg_catalog.format
34+
( $sql$create index on %I.%I (status)$sql$
35+
, schema_name
36+
, async_batch_queue_table
37+
) into strict _sql
38+
;
39+
execute _sql;
40+
41+
select pg_catalog.string_agg(pg_catalog.format('%I', x.attname), ', ' order by x.pknum)
42+
into strict _pk_cols
43+
from pg_catalog.jsonb_to_recordset(source_pk) x(pknum int, attname name)
44+
;
45+
46+
select pg_catalog.format(
47+
$sql$
48+
create table %I.%I(
49+
%s,
50+
chunk_seq int not null,
51+
created_at timestamptz not null default now(),
52+
async_batch_id text not null references %I.%I (id) on delete cascade,
53+
chunk text not null,
54+
unique (%s, chunk_seq)
55+
)$sql$,
56+
schema_name,
57+
async_batch_chunks_table,
58+
(
59+
select pg_catalog.string_agg(
60+
pg_catalog.format('%I %s not null' , x.attname , x.typname),
61+
', '
62+
order by x.attnum
63+
)
64+
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name, typname name)
65+
),
66+
schema_name,
67+
async_batch_queue_table,
68+
_pk_cols
69+
) into strict _sql
70+
;
71+
execute _sql;
72+
73+
if grant_to is not null then
74+
-- grant select, update, delete on batches table to grant_to roles
75+
select pg_catalog.format(
76+
$sql$grant select, insert, update, delete on %I.%I to %s$sql$,
77+
schema_name,
78+
async_batch_queue_table,
79+
(
80+
select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
81+
from pg_catalog.unnest(grant_to) x
82+
)
83+
) into strict _sql;
84+
execute _sql;
85+
86+
-- grant select, update, delete on batch chunks table to grant_to roles
87+
select pg_catalog.format(
88+
$sql$grant select, insert, update, delete on %I.%I to %s$sql$,
89+
schema_name,
90+
async_batch_chunks_table,
91+
(
92+
select pg_catalog.string_agg(pg_catalog.quote_ident(x), ', ')
93+
from pg_catalog.unnest(grant_to) x
94+
)
95+
) into strict _sql;
96+
execute _sql;
97+
end if;
98+
end;
99+
$func$
100+
language plpgsql volatile security invoker
101+
set search_path to pg_catalog, pg_temp;
102+
103+
-------------------------------------------------------------------------------
104+
-- vectorizer_enable_async_batches
105+
create or replace function ai.vectorizer_enable_async_batches(
106+
vectorizer_id pg_catalog.int4
107+
) returns void
108+
as $func$
109+
declare
110+
_config pg_catalog.jsonb;
111+
begin
112+
select config into _config
113+
from ai.vectorizers
114+
where id = vectorizer_id;
115+
116+
if _config is null then
117+
raise exception 'vectorizer with id % not found', vectorizer_id;
118+
end if;
119+
120+
if not _config ? 'use_async_batch_api' then
121+
raise exception 'vectorizer configuration does not support async batch api';
122+
end if;
123+
124+
update ai.vectorizers
125+
set config = jsonb_set(config, '{async_batch_enabled}', 'true'::jsonb)
126+
where id = vectorizer_id;
127+
128+
perform
129+
end
130+
$func$ language plpgsql security definer
131+
set search_path to pg_catalog, pg_temp;
132+
133+
-------------------------------------------------------------------------------
134+
-- vectorizer_disable_async_batches
135+
create or replace function ai.vectorizer_disable_async_batches(
136+
vectorizer_id pg_catalog.int4
137+
) returns void
138+
as $func$
139+
declare
140+
_config pg_catalog.jsonb;
141+
begin
142+
select config into _config
143+
from ai.vectorizers
144+
where id = vectorizer_id;
145+
146+
if _config is null then
147+
raise exception 'vectorizer with id % not found', vectorizer_id;
148+
end if;
149+
150+
if not _config ? 'use_async_batch_api' then
151+
raise exception 'vectorizer configuration does not support async batch api';
152+
end if;
153+
154+
update ai.vectorizers
155+
set config = jsonb_set(config, '{async_batch_enabled}', 'false'::jsonb)
156+
where id = vectorizer_id;
157+
158+
perform
159+
end
160+
$func$ language plpgsql security definer
161+
set search_path to pg_catalog, pg_temp;

projects/extension/sql/idempotent/900-semantic-catalog-init.sql

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,17 @@
11
--FEATURE-FLAG: text_to_sql
22

33
-------------------------------------------------------------------------------
4-
-- create_semantic_catalog
5-
create or replace function ai.create_semantic_catalog
4+
-- initialize_semantic_catalog
5+
create or replace function ai.initialize_semantic_catalog
66
( embedding pg_catalog.jsonb default null
77
, indexing pg_catalog.jsonb default ai.indexing_default()
88
, scheduling pg_catalog.jsonb default ai.scheduling_default()
99
, processing pg_catalog.jsonb default ai.processing_default()
1010
, grant_to pg_catalog.name[] default ai.grant_to()
11-
, text_to_sql pg_catalog.jsonb default null
1211
, catalog_name pg_catalog.name default 'default'
1312
) returns pg_catalog.int4
1413
as $func$
1514
declare
16-
_catalog_name pg_catalog.name = catalog_name;
17-
_text_to_sql pg_catalog.jsonb = text_to_sql;
1815
_catalog_id pg_catalog.int4;
1916
_obj_vec_id pg_catalog.int4;
2017
_sql_vec_id pg_catalog.int4;
@@ -60,14 +57,12 @@ begin
6057
, catalog_name
6158
, obj_vectorizer_id
6259
, sql_vectorizer_id
63-
, text_to_sql
6460
)
6561
values
6662
( _catalog_id
67-
, _catalog_name
63+
, initialize_semantic_catalog.catalog_name
6864
, _obj_vec_id
6965
, _sql_vec_id
70-
, _text_to_sql
7166
)
7267
returning id
7368
into strict _catalog_id
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
ALTER TABLE ai.vectorizer
2+
ADD COLUMN IF NOT EXISTS async_batch_queue_table pg_catalog.name DEFAULT NULL,
3+
ADD COLUMN IF NOT EXISTS async_batch_chunks_table pg_catalog.name DEFAULT NULL,
4+
ADD COLUMN IF NOT EXISTS async_batch_polling_interval interval DEFAULT interval '5 minutes';

0 commit comments

Comments
 (0)