Skip to content

Conversation

@JamesGuthrie
Copy link
Member

The vectorizer worker used to process queue items in a single transaction. If any step (other than file loading) failed, it would cause the processing to abort, and later be retried. Because it operated in a single transaction, it did not record failed attempts in the queue table.

This change rearchitects the queue item processing to consist of two transactions:

  • The "fetch work" transaction gets a batch of rows from the database for processing. It updates the attempts column of those rows to signal that an attempt has been made to process the item. It deletes duplicate queue items for the same primary key columns.
  • The "embed and write" transaction performs embedding, writes the embeddings to the database, and removes successfully processed queue rows. Rows which failed to be processed have the retry_after column set to a value proportional to the number of existing attempts. When the attempts column goes over a predefined threshold (6), the queue item is moved to the "failed" (dead letter) queue.

@JamesGuthrie JamesGuthrie requested a review from a team as a code owner June 13, 2025 11:51
@JamesGuthrie JamesGuthrie temporarily deployed to internal-contributors June 13, 2025 11:51 — with GitHub Actions Inactive
@JamesGuthrie JamesGuthrie force-pushed the jg/split-tx-processing branch from d86bc3f to f459370 Compare June 13, 2025 12:02
@JamesGuthrie JamesGuthrie temporarily deployed to internal-contributors June 13, 2025 12:02 — with GitHub Actions Inactive
@JamesGuthrie JamesGuthrie force-pushed the jg/split-tx-processing branch from f459370 to 0ad00f9 Compare June 13, 2025 12:10
@JamesGuthrie JamesGuthrie temporarily deployed to internal-contributors June 13, 2025 12:10 — with GitHub Actions Inactive
Base automatically changed from jg/golden-files to main June 13, 2025 12:51
) as _;

-- TODO: for very small batch sizes (<10), an array _may_ be faster
create temporary table seen_lock_ids (lock_id bigint);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you probably won't ever call this function twice in the same transaction, but having a drop temporary table if exists seen_lock_ids; right before the create will make this more resilient.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks. Done.

SELECT _ctid, _lock_id, {pk_fields}, attempts, retry_after
FROM ai._get_next_queue_batch('{queue_table}', %s) as (_ctid tid, _lock_id int, {pk_columns}, queued_at timestamptz, attempts int, retry_after timestamptz)
),
update_locked_rows AS (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to ensure that the update_locked_rows cte runs before the delete_duplicate_rows cte. If delete_duplicate_rows runs first, you may delete a duplicate that contains the true max(attempts) value.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good catch, thanks!

),
delete_duplicate_rows AS (
DELETE FROM {queue_table} q
WHERE ({pk_fields}) IN (SELECT {pk_fields} FROM locked_queue_rows)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can refer to update_locked_rows instead of locked_queue_rows to force update_locked_rows to run before delete_duplicate_rows.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done.

AND ({pk_fields}) IN (SELECT {pk_fields} FROM erroring_pk_and_step)
RETURNING {pk_fields}
)
INSERT INTO {queue_failed_table} ({pk_fields}, failure_step)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to keep the number of attempts in the dead queue too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, although it's hard coded at the moment, so the value shouldn't be very interesting.

@JamesGuthrie JamesGuthrie temporarily deployed to internal-contributors June 16, 2025 05:59 — with GitHub Actions Inactive
The vectorizer worker used to process queue items in a single
transaction. If any step (other than file loading) failed, it would
cause the processing to abort, and later be retried. Because it operated
in a single transaction, it did not record failed attempts in the queue
table.

This change rearchitects the queue item processing to consist of two
transactions:
- The "fetch work" transaction gets a batch of rows from the database
  for processing. It updates the `attempts` column of those rows to
  signal that an attempt has been made to process the item. It deletes
  duplicate queue items for the same primary key columns.
- The "embed and write" transaction performs embedding, writes the
  embeddings to the database, and removes successfully processed queue
  rows. Rows which failed to be processed have the `retry_after` column
  set to a value proportional to the number of existing attempts. When
  the `attempts` column goes over a predefined threshold (6), the queue
  item is moved to the "failed" (dead letter) queue.
@JamesGuthrie JamesGuthrie force-pushed the jg/split-tx-processing branch from 67e1d4a to 4540ef7 Compare June 16, 2025 06:02
@JamesGuthrie JamesGuthrie temporarily deployed to internal-contributors June 16, 2025 06:02 — with GitHub Actions Inactive
;

create or replace function ai._get_next_queue_batch(
queue_table pg_catalog.regclass,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking that it might be more clear if instead of queue_table use vectorizer_id. You can get the queue table from the same query that fetches the source_pk.

;
;

create or replace function ai._get_next_queue_batch(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this function be prepend with _? We are calling it from the worker. I thought we named things with _ when we deemed them private, in the sense that only public SQL functions reference them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. The function is not intended to be used by anyone other than our python code, as such it is internal/private.

)

@cached_property
def fetch_work_query_with_split_transaction(self) -> sql.Composed:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR, but maybe this would be better as a sql function that takes the vectorizer_id.

)
SELECT s.*, {attempts}
FROM update_locked_rows l
LEFT JOIN LATERAL ( -- NOTE: lateral join forces runtime chunk exclusion
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also know this is old, but maybe add to the NOTE "... chunk exclusion when the source table is a hypertable"

async with conn.transaction():
# Note: We require the following preconditions to ensure that we can
# manually handle transactions in this context.
assert not conn.autocommit
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: My only issue with this is that it's hard to see when a transaction ends and another starts. You have to look for the last time commit() was called, then see where the next DB query is made to understand the boundaries between transactions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. In general this change makes the transaction handling more complicated. I saw two available options:

  1. Implement a new Executor subclass for the split-transaction mode
  2. Add split-transaction mode in alongside the two existing modes

Initially I tried 1 because I thought it would (at least locally) result in a cleaner implementation. It was very unweildy to work with though so I abandoned it and went for option 2.

@alejandrodnm
Copy link
Collaborator

Approved with some comments. None of those are blockers.

@JamesGuthrie JamesGuthrie force-pushed the jg/split-tx-processing branch from 4540ef7 to a59995a Compare June 16, 2025 12:04
@JamesGuthrie JamesGuthrie temporarily deployed to internal-contributors June 16, 2025 12:04 — with GitHub Actions Inactive
@JamesGuthrie JamesGuthrie temporarily deployed to internal-contributors June 16, 2025 12:39 — with GitHub Actions Inactive
@JamesGuthrie JamesGuthrie force-pushed the jg/split-tx-processing branch from d7bb489 to a59995a Compare June 16, 2025 12:41
@JamesGuthrie JamesGuthrie temporarily deployed to internal-contributors June 16, 2025 12:41 — with GitHub Actions Inactive
@JamesGuthrie JamesGuthrie requested a review from jgpruitt June 16, 2025 12:48
except BaseException:
logger.warn(f"Tokenizer for model '{model}' not found")
except Exception as e:
logger.warn(f"Tokenizer for model '{model}' not found: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think generally speaking simply using exc_info=True is going to look nicer than printing the stringified version. It also shows the full stack and chain of exceptions if there was one.

query,
(self._batch_size,),
)
elif self.features.loading_retries:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the double backwards compatibility makes my brain hurt, but I guess as long as we don't have a way to force upgrade there is no way to simplify this.

Comment on lines +1529 to +1562
async with timed_debug_log(f"Pre-processed {len(items)} items"):
for item in items:
pk_values = self._get_item_pk_values(item)
try:
payload = self.vectorizer.config.loading.load(item)
except Exception as e:
if self.features.loading_retries:
errors.append((item, LoadingError(e=e)))
continue

try:
payload = self.vectorizer.config.parsing.parse(item, payload)
except Exception as e:
if self.features.loading_retries:
errors.append((item, ParsingError(e=e)))
continue
else:
raise e
chunks = self.vectorizer.config.chunking.into_chunks(item, payload)
try:
for chunk_id, chunk in enumerate(chunks, 0):
formatted = self.vectorizer.config.formatting.format(
chunk, item
)
records_without_embeddings.append(
pk_values + [chunk_id, formatted]
)
documents.append(formatted)
except Exception as e:
if self.features.loading_retries:
errors.append((item, FormattingError(e=e)))
continue
else:
raise e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I somehow thought the goal still was to do this all within the original transaction to take exactly enough to fill an embedding batch? Rn we still use the configured batch size, are you planning to do a separate PR for this or are just not doing it for now?

from pg_catalog.jsonb_to_recordset(source_pk) as (attnum int, attname text)
) as _;

-- TODO: for very small batch sizes (<10), an array _may_ be faster
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do have a small reservation about the potential catalog bloat that using a temp table might cause, however hopefully the frequency of the execution of this function is low enough to be tolerable.

@alejandrodnm alejandrodnm force-pushed the main branch 2 times, most recently from 35e80a5 to f15011e Compare October 14, 2025 14:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants