Skip to content

Commit d86bc3f

Browse files
committed
feat: split transaction queue item processing
The vectorizer worker used to process queue items in a single transaction. If any step (other than file loading) failed, it would cause the processing to abort, and later be retried. Because it operated in a single transaction, it did not record failed attempts in the queue table. This change rearchitects the queue item processing to consist of two transactions: - The "fetch work" transaction gets a batch of rows from the database for processing. It updates the `attempts` column of those rows to signal that an attempt has been made to process the item. It deletes duplicate queue items for the same primary key columns. - The "embed and write" transaction performs embedding, writes the embeddings to the database, and removes successfully processed queue rows. Rows which failed to be processed have the `retry_after` column set to a value proportional to the number of existing attempts. When the `attempts` column goes over a predefined threshold (6), the queue item is moved to the "failed" (dead letter) queue.
1 parent 844ba0c commit d86bc3f

21 files changed

+6136
-133
lines changed

projects/pgai/db/sql/idempotent/011-vectorizer-int.sql

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -424,8 +424,8 @@ begin
424424
create table %I.%I
425425
( %s
426426
, queued_at pg_catalog.timestamptz not null default now()
427-
, loading_retries pg_catalog.int4 not null default 0
428-
, loading_retry_after pg_catalog.timestamptz
427+
, attempts pg_catalog.int4 not null default 0
428+
, retry_after pg_catalog.timestamptz
429429
)
430430
$sql$
431431
, queue_schema, queue_table
@@ -1266,4 +1266,82 @@ end
12661266
$func$
12671267
language plpgsql volatile security invoker
12681268
set search_path to pg_catalog, pg_temp
1269-
;
1269+
;
1270+
1271+
create or replace function ai._get_next_queue_batch(
1272+
queue_table pg_catalog.regclass,
1273+
batch_size pg_catalog.int4
1274+
) returns setof record AS $$
1275+
declare
1276+
source_pk pg_catalog.jsonb;
1277+
lock_id_string pg_catalog.text;
1278+
query pg_catalog.text;
1279+
lock_count pg_catalog.int4 := 0;
1280+
row record;
1281+
begin
1282+
-- get the source_pk for this queue table
1283+
select v.source_pk
1284+
into source_pk
1285+
from ai.vectorizer v
1286+
where pg_catalog.to_regclass(pg_catalog.format('%I.%I', v.queue_schema, v.queue_table)) operator(pg_catalog.=) _get_next_queue_batch.queue_table;
1287+
1288+
-- construct the "lock id string"
1289+
-- this is a string of all pk column names and their values, e.g. for a
1290+
-- two-column pk consisting of 'time' and 'url' this will generate:
1291+
-- hashtext(format('time|%s|url|%s', time, url))
1292+
select pg_catalog.format($fmt$pg_catalog.hashtext(pg_catalog.format('%s', %s))$fmt$, format_string, format_args)
1293+
into lock_id_string
1294+
from (
1295+
select
1296+
pg_catalog.string_agg(pg_catalog.format('%s|%%s', attname), '|' order by attnum) as format_string
1297+
, pg_catalog.string_agg(attname, ', ' order by attnum) as format_args
1298+
from pg_catalog.jsonb_to_recordset(source_pk) as (attnum int, attname text)
1299+
) as _;
1300+
1301+
-- TODO: for very small batch sizes (<10), an array _may_ be faster
1302+
create temporary table seen_lock_ids (lock_id bigint);
1303+
create index on seen_lock_ids (lock_id);
1304+
1305+
-- construct query to get all
1306+
query := pg_catalog.format($sql$
1307+
select
1308+
q.ctid as _ctid
1309+
, %s as _lock_id
1310+
, q.*
1311+
from %s as q
1312+
where (retry_after is null or retry_after <= now())
1313+
and %s not in (
1314+
-- exclude all locks that we already hold
1315+
select objid::int
1316+
from pg_locks
1317+
where locktype = 'advisory'
1318+
and pid = pg_catalog.pg_backend_pid()
1319+
and classid = %s
1320+
)
1321+
$sql$, lock_id_string, _get_next_queue_batch.queue_table, lock_id_string, _get_next_queue_batch.queue_table::pg_catalog.oid);
1322+
1323+
for row in execute query
1324+
loop
1325+
if lock_count operator(pg_catalog.>=) batch_size then
1326+
exit;
1327+
end if;
1328+
1329+
if exists(select 1 from pg_temp.seen_lock_ids WHERE lock_id operator(pg_catalog.=) row._lock_id) then
1330+
continue;
1331+
end if;
1332+
1333+
insert into pg_temp.seen_lock_ids (lock_id) values (row._lock_id);
1334+
1335+
if pg_catalog.pg_try_advisory_lock(queue_table::pg_catalog.oid::int, row._lock_id) then
1336+
lock_count := lock_count operator(pg_catalog.+) 1;
1337+
return next row;
1338+
end if;
1339+
end loop;
1340+
1341+
drop table seen_lock_ids;
1342+
1343+
return;
1344+
end;
1345+
$$ language plpgsql
1346+
set search_path to pg_catalog, pg_temp
1347+
;
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
-- rename loading_retries and loading_retry_after for all existing queue tables
2+
do language plpgsql $block$
3+
declare
4+
_vectorizer record;
5+
begin
6+
for _vectorizer in select queue_schema, queue_table from ai.vectorizer
7+
loop
8+
execute format('alter table %I.%I rename column loading_retries to attempts', _vectorizer.queue_schema, _vectorizer.queue_table);
9+
execute format('alter table %I.%I rename column loading_retry_after to retry_after', _vectorizer.queue_schema, _vectorizer.queue_table);
10+
end loop;
11+
end;
12+
$block$;
Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
Table "ai._vectorizer_q_1"
2-
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
3-
---------------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+-------------
4-
title | text | | not null | | extended | | |
5-
published | timestamp with time zone | | not null | | plain | | |
6-
queued_at | timestamp with time zone | | not null | now() | plain | | |
7-
loading_retries | integer | | not null | 0 | plain | | |
8-
loading_retry_after | timestamp with time zone | | | | plain | | |
2+
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
3+
-------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+-------------
4+
title | text | | not null | | extended | | |
5+
published | timestamp with time zone | | not null | | plain | | |
6+
queued_at | timestamp with time zone | | not null | now() | plain | | |
7+
attempts | integer | | not null | 0 | plain | | |
8+
retry_after | timestamp with time zone | | | | plain | | |
99
Indexes:
1010
"_vectorizer_q_1_title_published_idx" btree (title, published)
1111
Access method: heap
Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
Table "ai._vectorizer_q_1"
2-
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
3-
---------------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+-------------
4-
title | text | | not null | | extended | | |
5-
published | timestamp with time zone | | not null | | plain | | |
6-
queued_at | timestamp with time zone | | not null | now() | plain | | |
7-
loading_retries | integer | | not null | 0 | plain | | |
8-
loading_retry_after | timestamp with time zone | | | | plain | | |
2+
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
3+
-------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+-------------
4+
title | text | | not null | | extended | | |
5+
published | timestamp with time zone | | not null | | plain | | |
6+
queued_at | timestamp with time zone | | not null | now() | plain | | |
7+
attempts | integer | | not null | 0 | plain | | |
8+
retry_after | timestamp with time zone | | | | plain | | |
99
Indexes:
1010
"_vectorizer_q_1_title_published_idx" btree (title, published)
1111
Access method: heap

projects/pgai/pgai/data/ai.sql

Lines changed: 120 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,6 +1224,45 @@ begin
12241224
end;
12251225
$outer_migration_block$;
12261226

1227+
-------------------------------------------------------------------------------
1228+
-- 032-split-transaction-support.sql
1229+
do $outer_migration_block$ /*032-split-transaction-support.sql*/
1230+
declare
1231+
_sql text;
1232+
_migration record;
1233+
_migration_name text = $migration_name$032-split-transaction-support.sql$migration_name$;
1234+
_migration_body text =
1235+
$migration_body$
1236+
-- rename loading_retries and loading_retry_after for all existing queue tables
1237+
do language plpgsql $block$
1238+
declare
1239+
_vectorizer record;
1240+
begin
1241+
for _vectorizer in select queue_schema, queue_table from ai.vectorizer
1242+
loop
1243+
execute format('alter table %I.%I rename column loading_retries to attempts', _vectorizer.queue_schema, _vectorizer.queue_table);
1244+
execute format('alter table %I.%I rename column loading_retry_after to retry_after', _vectorizer.queue_schema, _vectorizer.queue_table);
1245+
end loop;
1246+
end;
1247+
$block$;
1248+
1249+
$migration_body$;
1250+
begin
1251+
select * into _migration from ai.pgai_lib_migration where "name" operator(pg_catalog.=) _migration_name;
1252+
if _migration is not null then
1253+
raise notice 'migration %s already applied. skipping.', _migration_name;
1254+
if _migration.body operator(pg_catalog.!=) _migration_body then
1255+
raise warning 'the contents of migration "%s" have changed', _migration_name;
1256+
end if;
1257+
return;
1258+
end if;
1259+
_sql = pg_catalog.format(E'do /*%s*/ $migration_body$\nbegin\n%s\nend;\n$migration_body$;', _migration_name, _migration_body);
1260+
execute _sql;
1261+
insert into ai.pgai_lib_migration ("name", body, applied_at_version)
1262+
values (_migration_name, _migration_body, $version$__version__$version$);
1263+
end;
1264+
$outer_migration_block$;
1265+
12271266
--------------------------------------------------------------------------------
12281267
-- 001-chunking.sql
12291268

@@ -2704,8 +2743,8 @@ begin
27042743
create table %I.%I
27052744
( %s
27062745
, queued_at pg_catalog.timestamptz not null default now()
2707-
, loading_retries pg_catalog.int4 not null default 0
2708-
, loading_retry_after pg_catalog.timestamptz
2746+
, attempts pg_catalog.int4 not null default 0
2747+
, retry_after pg_catalog.timestamptz
27092748
)
27102749
$sql$
27112750
, queue_schema, queue_table
@@ -3548,6 +3587,85 @@ language plpgsql volatile security invoker
35483587
set search_path to pg_catalog, pg_temp
35493588
;
35503589

3590+
create or replace function ai._get_next_queue_batch(
3591+
queue_table pg_catalog.regclass,
3592+
batch_size pg_catalog.int4
3593+
) returns setof record AS $$
3594+
declare
3595+
source_pk pg_catalog.jsonb;
3596+
lock_id_string pg_catalog.text;
3597+
query pg_catalog.text;
3598+
lock_count pg_catalog.int4 := 0;
3599+
row record;
3600+
begin
3601+
-- get the source_pk for this queue table
3602+
select v.source_pk
3603+
into source_pk
3604+
from ai.vectorizer v
3605+
where pg_catalog.to_regclass(pg_catalog.format('%I.%I', v.queue_schema, v.queue_table)) operator(pg_catalog.=) _get_next_queue_batch.queue_table;
3606+
3607+
-- construct the "lock id string"
3608+
-- this is a string of all pk column names and their values, e.g. for a
3609+
-- two-column pk consisting of 'time' and 'url' this will generate:
3610+
-- hashtext(format('time|%s|url|%s', time, url))
3611+
select pg_catalog.format($fmt$pg_catalog.hashtext(pg_catalog.format('%s', %s))$fmt$, format_string, format_args)
3612+
into lock_id_string
3613+
from (
3614+
select
3615+
pg_catalog.string_agg(pg_catalog.format('%s|%%s', attname), '|' order by attnum) as format_string
3616+
, pg_catalog.string_agg(attname, ', ' order by attnum) as format_args
3617+
from pg_catalog.jsonb_to_recordset(source_pk) as (attnum int, attname text)
3618+
) as _;
3619+
3620+
-- TODO: for very small batch sizes (<10), an array _may_ be faster
3621+
create temporary table seen_lock_ids (lock_id bigint);
3622+
create index on seen_lock_ids (lock_id);
3623+
3624+
-- construct query to get all
3625+
query := pg_catalog.format($sql$
3626+
select
3627+
q.ctid as _ctid
3628+
, %s as _lock_id
3629+
, q.*
3630+
from %s as q
3631+
where (retry_after is null or retry_after <= now())
3632+
and %s not in (
3633+
-- exclude all locks that we already hold
3634+
select objid::int
3635+
from pg_locks
3636+
where locktype = 'advisory'
3637+
and pid = pg_catalog.pg_backend_pid()
3638+
and classid = %s
3639+
)
3640+
$sql$, lock_id_string, _get_next_queue_batch.queue_table, lock_id_string, _get_next_queue_batch.queue_table::pg_catalog.oid);
3641+
3642+
for row in execute query
3643+
loop
3644+
if lock_count operator(pg_catalog.>=) batch_size then
3645+
exit;
3646+
end if;
3647+
3648+
if exists(select 1 from pg_temp.seen_lock_ids WHERE lock_id operator(pg_catalog.=) row._lock_id) then
3649+
continue;
3650+
end if;
3651+
3652+
insert into pg_temp.seen_lock_ids (lock_id) values (row._lock_id);
3653+
3654+
if pg_catalog.pg_try_advisory_lock(queue_table::pg_catalog.oid::int, row._lock_id) then
3655+
lock_count := lock_count operator(pg_catalog.+) 1;
3656+
return next row;
3657+
end if;
3658+
end loop;
3659+
3660+
drop table seen_lock_ids;
3661+
3662+
return;
3663+
end;
3664+
$$ language plpgsql
3665+
set search_path to pg_catalog, pg_temp
3666+
;
3667+
3668+
35513669
--------------------------------------------------------------------------------
35523670
-- 012-vectorizer-api.sql
35533671
-------------------------------------------------------------------------------
Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,21 @@
11
from .create_vectorizer import CreateVectorizer
2-
from .vectorizer import Executor, Vectorizer
2+
from .vectorizer import (
3+
EmbeddingError,
4+
Executor,
5+
FormattingError,
6+
LoadingError,
7+
ParsingError,
8+
Vectorizer,
9+
)
310
from .worker import Worker
411

512
__all__ = [
613
"Vectorizer",
714
"Executor",
815
"CreateVectorizer",
916
"Worker",
17+
"FormattingError",
18+
"LoadingError",
19+
"ParsingError",
20+
"EmbeddingError",
1021
]

projects/pgai/pgai/vectorizer/embedders/voyageai.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ def voyage_token_counter(model: str) -> Callable[[str], int] | None:
3838
try:
3939
tokenizer: Tokenizer = client.tokenizer(model)
4040
return lambda text: len(tokenizer.encode(text).tokens)
41-
except BaseException:
42-
logger.warn(f"Tokenizer for model '{model}' not found")
41+
except Exception as e:
42+
logger.warn(f"Tokenizer for model '{model}' not found: {e}")
4343
return None
4444

4545

projects/pgai/pgai/vectorizer/features/features.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ def __init__(
1616
has_loading_retries: bool,
1717
has_reveal_secret_function: bool,
1818
has_vectorizer_errors_view: bool,
19+
has_split_processing: bool,
1920
) -> None:
2021
self.has_disabled_column = has_disabled_column
2122
self.has_worker_tracking_table = has_worker_tracking_table
2223
self.has_loading_retries = has_loading_retries
24+
self.has_split_processing = has_split_processing
2325
self.has_reveal_secret_function = has_reveal_secret_function
2426
self.has_vectorizer_errors_view = has_vectorizer_errors_view
2527

@@ -74,21 +76,31 @@ def from_db(cls: type[Self], cur: psycopg.Cursor) -> Self:
7476
cur.execute(query)
7577
has_vectorizer_errors_view = cur.fetchone() is not None
7678

79+
query = """
80+
SELECT proname
81+
FROM pg_proc p
82+
WHERE pronamespace = 'ai'::regnamespace
83+
AND proname = '_get_next_queue_batch';
84+
"""
85+
cur.execute(query)
86+
has_split_processing = cur.fetchone() is not None
87+
7788
return cls(
7889
has_disabled_column,
7990
has_worker_tracking_table,
8091
has_loading_retries,
8192
has_reveal_secret_function,
8293
has_vectorizer_errors_view,
94+
has_split_processing,
8395
)
8496

8597
@classmethod
8698
def for_testing_latest_version(cls: type[Self]) -> Self:
87-
return cls(True, True, True, True, True)
99+
return cls(True, True, True, True, True, True)
88100

89101
@classmethod
90102
def for_testing_no_features(cls: type[Self]) -> Self:
91-
return cls(False, False, False, False, False)
103+
return cls(False, False, False, False, False, False)
92104

93105
@cached_property
94106
def disable_vectorizers(self) -> bool:
@@ -113,6 +125,15 @@ def loading_retries(self) -> bool:
113125
"""
114126
return self.has_loading_retries
115127

128+
@cached_property
129+
def split_processing(self) -> bool:
130+
"""If the "split processing" feature is supported by the extension.
131+
132+
The feature breaks processing into two transactions: one to fetch work
133+
and another to commit successfully processed items.
134+
"""
135+
return self.has_split_processing
136+
116137
@cached_property
117138
def db_reveal_secrets(self) -> bool:
118139
"""If the db has the `reveal_secret` function."""

projects/pgai/pgai/vectorizer/generate/function_parser.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def python_type(self) -> str:
3030
"regclass": "str",
3131
}
3232
base_type = type_mapping.get(self.type_name, "Any")
33-
if self.is_array and not base_type.startswith("list"):
33+
if self.is_array and not base_type.startswith("list"): # noqa: SIM108
3434
type_str = f"list[{base_type}]"
3535
else:
3636
type_str = base_type

0 commit comments

Comments
 (0)