Skip to content

Commit 4540ef7

Browse files
committed
chore: address review feedback
1 parent 2ee3649 commit 4540ef7

File tree

8 files changed

+44
-5
lines changed

8 files changed

+44
-5
lines changed

projects/pgai/db/sql/idempotent/011-vectorizer-int.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,7 @@ begin
507507
( %s
508508
, created_at pg_catalog.timestamptz not null default now()
509509
, failure_step pg_catalog.text not null default ''
510+
, attempts pg_catalog.int4 not null default 0
510511
)
511512
$sql$
512513
, queue_schema, queue_failed_table
@@ -1299,6 +1300,7 @@ begin
12991300
) as _;
13001301

13011302
-- TODO: for very small batch sizes (<10), an array _may_ be faster
1303+
drop table if exists seen_lock_ids;
13021304
create temporary table seen_lock_ids (lock_id bigint);
13031305
create index on seen_lock_ids (lock_id);
13041306

projects/pgai/db/sql/incremental/032-split-transaction-support.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,9 @@ begin
88
execute format('alter table %I.%I rename column loading_retries to attempts', _vectorizer.queue_schema, _vectorizer.queue_table);
99
execute format('alter table %I.%I rename column loading_retry_after to retry_after', _vectorizer.queue_schema, _vectorizer.queue_table);
1010
end loop;
11+
for _vectorizer in select queue_schema, queue_failed_table from ai.vectorizer
12+
loop
13+
execute format('alter table %I.%I add column attempts pg_catalog.int4 not null default 0', _vectorizer.queue_schema, _vectorizer.queue_failed_table);
14+
end loop;
1115
end;
1216
$block$;
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
Table "ai._vectorizer_q_failed_1"
2+
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
3+
--------------+--------------------------+-----------+----------+----------+----------+-------------+--------------+-------------
4+
title | text | | not null | | extended | | |
5+
published | timestamp with time zone | | not null | | plain | | |
6+
created_at | timestamp with time zone | | not null | now() | plain | | |
7+
failure_step | text | | not null | ''::text | extended | | |
8+
attempts | integer | | not null | 0 | plain | | |
9+
Indexes:
10+
"_vectorizer_q_failed_1_title_published_idx" btree (title, published)
11+
Access method: heap
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
Table "ai._vectorizer_q_failed_1"
2+
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
3+
--------------+--------------------------+-----------+----------+----------+----------+-------------+--------------+-------------
4+
title | text | | not null | | extended | | |
5+
published | timestamp with time zone | | not null | | plain | | |
6+
created_at | timestamp with time zone | | not null | now() | plain | | |
7+
failure_step | text | | not null | ''::text | extended | | |
8+
attempts | integer | | not null | 0 | plain | | |
9+
Indexes:
10+
"_vectorizer_q_failed_1_title_published_idx" btree (title, published)
11+
Access method: heap

projects/pgai/db/tests/vectorizer/test_vectorizer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,10 @@ def test_vectorizer_timescaledb():
524524
actual = psql_cmd(r"\d+ ai._vectorizer_q_1")
525525
golden_check("queue-table", actual)
526526

527+
# does the queue failed table look right?
528+
actual = psql_cmd(r"\d+ ai._vectorizer_q_failed_1")
529+
golden_check("failed-queue-table", actual)
530+
527531
# does the view look right?
528532
actual = psql_cmd(r"\d+ website.blog_embedding")
529533
golden_check("view", actual)

projects/pgai/pgai/data/ai.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1243,6 +1243,10 @@ begin
12431243
execute format('alter table %I.%I rename column loading_retries to attempts', _vectorizer.queue_schema, _vectorizer.queue_table);
12441244
execute format('alter table %I.%I rename column loading_retry_after to retry_after', _vectorizer.queue_schema, _vectorizer.queue_table);
12451245
end loop;
1246+
for _vectorizer in select queue_schema, queue_failed_table from ai.vectorizer
1247+
loop
1248+
execute format('alter table %I.%I add column attempts pg_catalog.int4 not null default 0', _vectorizer.queue_schema, _vectorizer.queue_failed_table);
1249+
end loop;
12461250
end;
12471251
$block$;
12481252

@@ -2826,6 +2830,7 @@ begin
28262830
( %s
28272831
, created_at pg_catalog.timestamptz not null default now()
28282832
, failure_step pg_catalog.text not null default ''
2833+
, attempts pg_catalog.int4 not null default 0
28292834
)
28302835
$sql$
28312836
, queue_schema, queue_failed_table
@@ -3618,6 +3623,7 @@ begin
36183623
) as _;
36193624

36203625
-- TODO: for very small batch sizes (<10), an array _may_ be faster
3626+
drop table if exists seen_lock_ids;
36213627
create temporary table seen_lock_ids (lock_id bigint);
36223628
create index on seen_lock_ids (lock_id);
36233629

projects/pgai/pgai/vectorizer/vectorizer.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -635,8 +635,8 @@ def fetch_work_query_with_split_transaction(self) -> sql.Composed:
635635
),
636636
delete_duplicate_rows AS (
637637
DELETE FROM {queue_table} q
638-
WHERE ({pk_fields}) IN (SELECT {pk_fields} FROM locked_queue_rows)
639-
AND q.ctid NOT IN (SELECT _ctid FROM locked_queue_rows)
638+
WHERE ({pk_fields}) IN (SELECT {pk_fields} FROM update_locked_rows)
639+
AND q.ctid NOT IN (SELECT _ctid FROM update_locked_rows)
640640
)
641641
SELECT s.*, {attempts}
642642
FROM update_locked_rows l
@@ -846,10 +846,10 @@ def update_retry_after_and_move_permanent_failures_to_dlq_query(
846846
DELETE FROM {queue_table}
847847
WHERE attempts > {max_attempts}
848848
AND ({pk_fields}) IN (SELECT {pk_fields} FROM erroring_pk_and_step)
849-
RETURNING {pk_fields}
849+
RETURNING {pk_fields}, attempts
850850
)
851-
INSERT INTO {queue_failed_table} ({pk_fields}, failure_step)
852-
SELECT {pk_fields}, failure_step FROM delete_over_attempts JOIN erroring_pk_and_step USING ({pk_fields})
851+
INSERT INTO {queue_failed_table} ({pk_fields}, failure_step, attempts)
852+
SELECT {pk_fields}, failure_step, attempts FROM delete_over_attempts JOIN erroring_pk_and_step USING ({pk_fields})
853853
""").format(
854854
error_values=self._placeholders_tuples(
855855
len(self.vectorizer.source_pk) + 1, len(items)

projects/pgai/tests/vectorizer/cli/test_vectorizer_split_transaction_semantics.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ async def test_vectorizer_error_handling_moves_failed_items_to_dlq_and_logs_erro
346346
"id": 1,
347347
"failure_step": "loading",
348348
"created_at": now,
349+
"attempts": 7,
349350
},
350351
)
351352

0 commit comments

Comments
 (0)