Skip to content

Commit dc12bf9

Browse files
JamesGuthrieAskir
andcommitted
fix: remove dangling embedding entries
There is no FK from the queue table to the source table, or from the embedding table to the source table. As a result, we can have "dangling" entries in both the queue table and the embedding table. These occur because of a race condition between deleting a source table row, and the processing of the embeddings for that row. To "clean up" dangling embeddings, we insert a queue row when a source table row is deleted. This queue row is dangling. When a dangling queue row is identified, we use the PK values of the queue row to remove all associated embeddings (if present). Co-authored-by: Jascha <[email protected]>
1 parent 5d24e16 commit dc12bf9

File tree

5 files changed

+587
-3
lines changed

5 files changed

+587
-3
lines changed

projects/pgai/db/sql/idempotent/011-vectorizer-int.sql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,7 @@ declare
586586
_delete_statement pg_catalog.text;
587587
_pk_columns pg_catalog.text;
588588
_pk_values pg_catalog.text;
589+
_old_pk_values pg_catalog.text;
589590
_func_def pg_catalog.text;
590591
_relevant_columns_check pg_catalog.text;
591592
_truncate_statement pg_catalog.text;
@@ -599,6 +600,10 @@ begin
599600
into strict _pk_values
600601
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name);
601602

603+
select pg_catalog.string_agg(pg_catalog.format('old.%I', x.attname), ', ' order by x.attnum)
604+
into strict _old_pk_values
605+
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name);
606+
602607
if target_schema is not null and target_table is not null then
603608
-- Create delete statement for deleted rows
604609
_delete_statement := format('delete from %I.%I where %s', target_schema, target_table,
@@ -637,6 +642,8 @@ begin
637642
if (TG_LEVEL = 'ROW') then
638643
if (TG_OP = 'DELETE') then
639644
$DELETE_STATEMENT$;
645+
insert into $QUEUE_SCHEMA$.$QUEUE_TABLE$ ($PK_COLUMNS$)
646+
values ($OLD_PK_VALUES$);
640647
elsif (TG_OP = 'UPDATE') then
641648
-- Check if the primary key has changed and queue the update
642649
if $PK_CHANGE_CHECK$ then
@@ -674,6 +681,7 @@ begin
674681
_func_def := replace(_func_def, '$QUEUE_TABLE$', quote_ident(queue_table));
675682
_func_def := replace(_func_def, '$PK_COLUMNS$', _pk_columns);
676683
_func_def := replace(_func_def, '$PK_VALUES$', _pk_values);
684+
_func_def := replace(_func_def, '$OLD_PK_VALUES$', _old_pk_values);
677685
_func_def := replace(_func_def, '$TARGET_SCHEMA$', quote_ident(target_schema));
678686
_func_def := replace(_func_def, '$TARGET_TABLE$', quote_ident(target_table));
679687
_func_def := replace(_func_def, '$RELEVANT_COLUMNS_CHECK$', _relevant_columns_check);

projects/pgai/pgai/data/ai.sql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2909,6 +2909,7 @@ declare
29092909
_delete_statement pg_catalog.text;
29102910
_pk_columns pg_catalog.text;
29112911
_pk_values pg_catalog.text;
2912+
_old_pk_values pg_catalog.text;
29122913
_func_def pg_catalog.text;
29132914
_relevant_columns_check pg_catalog.text;
29142915
_truncate_statement pg_catalog.text;
@@ -2922,6 +2923,10 @@ begin
29222923
into strict _pk_values
29232924
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name);
29242925

2926+
select pg_catalog.string_agg(pg_catalog.format('old.%I', x.attname), ', ' order by x.attnum)
2927+
into strict _old_pk_values
2928+
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name);
2929+
29252930
if target_schema is not null and target_table is not null then
29262931
-- Create delete statement for deleted rows
29272932
_delete_statement := format('delete from %I.%I where %s', target_schema, target_table,
@@ -2960,6 +2965,8 @@ begin
29602965
if (TG_LEVEL = 'ROW') then
29612966
if (TG_OP = 'DELETE') then
29622967
$DELETE_STATEMENT$;
2968+
insert into $QUEUE_SCHEMA$.$QUEUE_TABLE$ ($PK_COLUMNS$)
2969+
values ($OLD_PK_VALUES$);
29632970
elsif (TG_OP = 'UPDATE') then
29642971
-- Check if the primary key has changed and queue the update
29652972
if $PK_CHANGE_CHECK$ then
@@ -2997,6 +3004,7 @@ begin
29973004
_func_def := replace(_func_def, '$QUEUE_TABLE$', quote_ident(queue_table));
29983005
_func_def := replace(_func_def, '$PK_COLUMNS$', _pk_columns);
29993006
_func_def := replace(_func_def, '$PK_VALUES$', _pk_values);
3007+
_func_def := replace(_func_def, '$OLD_PK_VALUES$', _old_pk_values);
30003008
_func_def := replace(_func_def, '$TARGET_SCHEMA$', quote_ident(target_schema));
30013009
_func_def := replace(_func_def, '$TARGET_TABLE$', quote_ident(target_table));
30023010
_func_def := replace(_func_def, '$RELEVANT_COLUMNS_CHECK$', _relevant_columns_check);

projects/pgai/pgai/vectorizer/vectorizer.py

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,7 @@ def fetch_work_query_with_split_transaction(self) -> sql.Composed:
638638
WHERE ({pk_fields}) IN (SELECT {pk_fields} FROM update_locked_rows)
639639
AND q.ctid NOT IN (SELECT _ctid FROM update_locked_rows)
640640
)
641-
SELECT s.*, {attempts}
641+
SELECT {q_pk_fields}, s.*, {attempts}
642642
FROM update_locked_rows l
643643
LEFT JOIN LATERAL ( -- NOTE: lateral join forces runtime chunk exclusion
644644
SELECT *
@@ -653,6 +653,17 @@ def fetch_work_query_with_split_transaction(self) -> sql.Composed:
653653
[sql.Identifier("q", attname) for attname in self.pk_attnames]
654654
),
655655
pk_fields=self.pk_fields_sql,
656+
# Note: q_pk_fields is used to surface the queue row PKs, required
657+
# to clean up "dangling" embeddings. In the "dangling" embedding case,
658+
# the source row has been deleted, so all source table pk columns are null.
659+
q_pk_fields=sql.SQL(", ").join(
660+
[
661+
sql.SQL(" AS ").join(
662+
[sql.Identifier("l", attname), sql.Identifier(f"_q_{attname}")]
663+
)
664+
for attname in self.pk_attnames
665+
]
666+
),
656667
queue_table=sql.Identifier(
657668
self.vectorizer.queue_schema, self.vectorizer.queue_table
658669
),
@@ -1174,12 +1185,57 @@ async def _do_batch(self, conn: AsyncConnection) -> int:
11741185
# Filter out items that were deleted from the source table.
11751186
# We use the first primary key column, since they can only
11761187
# be null if the LEFT JOIN didn't find a match.
1188+
items_to_delete = [
1189+
i for i in items if i[self.vectorizer.source_pk[0].attname] is None
1190+
]
11771191
items = [
11781192
i for i in items if i[self.vectorizer.source_pk[0].attname] is not None
11791193
]
11801194

1195+
if self.features.split_processing and len(items_to_delete) > 0:
1196+
1197+
def remap_queue_pks(
1198+
items_to_delete: list[SourceRow],
1199+
) -> list[SourceRow]:
1200+
"""
1201+
Given a list of source rows, remaps "hidden" queue PK fields for deletion
1202+
1203+
This is generally pretty ugly, but here are _all_ of the horrible details:
1204+
There is no FK from the queue table to the source table, or from the
1205+
embedding table to the source table. As a result, we can have "dangling"
1206+
entries in both the queue table and the embedding table.
1207+
To "clean up" dangling embeddings, we ensure that a queue row is inserted
1208+
when a source table row is deleted. This queue row is dangling, and there
1209+
may be dangling embeddings.
1210+
To clean up the dangling embeddings, we need to know:
1211+
a) That the queue row is dangling
1212+
b) The PK values belonging to the queue row
1213+
1214+
In the fetch work query, we extract all source table columns with `s.*`,
1215+
because we generally don't know/want to know about the structure of that
1216+
table. As a result, when there is a dangling entry in the queue table, all
1217+
PK columns of the source row are None. We _also_ extract the PK values of
1218+
the queue row, but aliased (with the prefix `_q_` to avoid conflicts with
1219+
other columns in the source table).
1220+
1221+
This function takes a list of source rows and maps the aliased queue row
1222+
PKs to the original PK names.
1223+
"""
1224+
del_pks: list[SourceRow] = []
1225+
for item in items_to_delete:
1226+
del_pk: SourceRow = {}
1227+
for attname in self.queries.pk_attnames:
1228+
del_pk[attname] = item[f"_q_{attname}"]
1229+
del_pks.append(del_pk)
1230+
return del_pks
1231+
1232+
async with conn.transaction():
1233+
pks_to_delete = remap_queue_pks(items_to_delete)
1234+
await self._delete_embeddings(conn, pks_to_delete)
1235+
await self._delete_processed_queue_rows(conn, pks_to_delete)
1236+
11811237
if len(items) == 0:
1182-
return 0
1238+
return len(items_to_delete)
11831239

11841240
try:
11851241
num_chunks = await self._embed_and_write(conn, items)

0 commit comments

Comments
 (0)