Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,19 @@ jobs:
build-pgai-docker-image:
needs: authorize
runs-on: ubuntu-latest
steps:
steps:
- uses: actions/checkout@v4
with:
# in a pull_request_target event, the ref is the `main` branch not the PR branch
# so we need to tell checkout to use the head.ref instead.
ref: ${{ github.event.pull_request.head.sha || github.ref }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Build the Docker image
uses: docker/build-push-action@v6
with:
context: "{{defaultContext}}:projects/pgai"
context: "./projects/pgai"
push: false # we don't push the image, we just build it to validate it
platforms: linux/amd64,linux/arm64

Expand Down
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ The key strength of this architecture lies in its resilience: data modifications

First, install the pgai package.

```
```bash
pip install pgai
```

Expand All @@ -57,6 +57,11 @@ import pgai
pgai.install(DB_URL)
```

If you are not on Timescale Cloud you will also need to run the pgai vectorizer worker. Install the dependencies for it via:
```bash
pip install pgai[vectorizer-worker]
```


# Quick Start

Expand Down
2 changes: 1 addition & 1 deletion docs/semantic_catalog/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Once the semantic catalog is loaded with embedded descriptions, you start genera
```bash
git clone https://github.com/timescale/pgai.git -b jgpruitt/semantic-catalog
cd pgai/projects/pgai
uv sync
uv sync --extra semantic-catalog
source .venv/bin/activate
pgai --version
```
Expand Down
7 changes: 7 additions & 0 deletions docs/vectorizer/alembic-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ Alembic is a database migration tool that allows you to manage your database sch

We first cover how to create vectorizers using the Alembic operations. Then, we cover how to exclude the tables created and managed by pgai Vectorizer from the autogenerate process.

## Installation
To make use of the alembic operations you need to install pgai with the sqlalchemy extras:

```bash
pip install pgai[sqlalchemy]
```

## Creating vectorizers
pgai provides native Alembic operations for managing vectorizers. For them to work you need to run `register_operations` in your env.py file. Which registers the pgai operations under the global op context:

Expand Down
7 changes: 6 additions & 1 deletion docs/vectorizer/python-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ with psycopg.connect(conn_string) as conn:

# Running the vectorizer worker

You can then run the vectorizer worker using the the CLI tool or the `Worker` class discussed in the [vectorizer worker documentation](/docs/vectorizer/worker.md).
You can then run the vectorizer worker using the the CLI tool or the `Worker` class discussed in the [vectorizer worker documentation](/docs/vectorizer/worker.md).
Note that you will need to install pgai with the `vectorizer-worker` extra to run the worker yourself:

```bash
pip install pgai[vectorizer-worker]
```

# Related integrations

Expand Down
4 changes: 2 additions & 2 deletions docs/vectorizer/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ When you use pgai vectorizers on a self-hosted Postgres installation or another
1. **Install [pgai](https://pypi.org/project/pgai/) from PyPI**

```shell
pip install pgai
pip install pgai[vectorizer-worker]
```

The `pgai` command line tool should now be in your `$PATH`.
Expand Down Expand Up @@ -69,7 +69,7 @@ For more configuration options, see [Advanced configuration options](#advanced-c
1. Add the pgai package dependency to your project

```shell
pip install pgai
pip install pgai[vectorizer-worker]
```
or add `pgai` to the dependencies in your `requirements.txt` file, `pyproject.toml`, or similar configuration file.

Expand Down
2 changes: 1 addition & 1 deletion projects/extension/tests/test_litellm.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def cur() -> psycopg.Cursor:
"name": "cohere/embed-english-v3.0",
"dimensions": 1024,
"api_key_name": "COHERE_API_KEY",
"exception": """CohereException - {"message":"no api key supplied"}""",
"exception": '"message":"no api key supplied"',
"extra_options": {},
"input_types": [
"search_query",
Expand Down
1 change: 0 additions & 1 deletion projects/pgai/.gitattributes

This file was deleted.

2 changes: 1 addition & 1 deletion projects/pgai/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ url = "https://download.pytorch.org/whl/cpu"
explicit = true
EOF
RUN uv lock # update lockfile, switches to cpu-only torch
RUN uv sync --frozen --no-install-project --no-dev
RUN uv sync --frozen --no-install-project --no-dev --extra vectorizer-worker

# Note: some dependencies are not supported on 3.13, do not bump until that is sorted out
FROM python:3.12-slim
Expand Down
1 change: 1 addition & 0 deletions projects/pgai/benchmark/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
cassettes/*
3 changes: 0 additions & 3 deletions projects/pgai/benchmark/cassettes/wiki_openai_500

This file was deleted.

94 changes: 91 additions & 3 deletions projects/pgai/db/sql/idempotent/011-vectorizer-int.sql
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ begin
create table %I.%I
( %s
, queued_at pg_catalog.timestamptz not null default now()
, loading_retries pg_catalog.int4 not null default 0
, loading_retry_after pg_catalog.timestamptz
, attempts pg_catalog.int4 not null default 0
, retry_after pg_catalog.timestamptz
)
$sql$
, queue_schema, queue_table
Expand Down Expand Up @@ -507,6 +507,7 @@ begin
( %s
, created_at pg_catalog.timestamptz not null default now()
, failure_step pg_catalog.text not null default ''
, attempts pg_catalog.int4 not null default 0
)
$sql$
, queue_schema, queue_failed_table
Expand Down Expand Up @@ -585,6 +586,7 @@ declare
_delete_statement pg_catalog.text;
_pk_columns pg_catalog.text;
_pk_values pg_catalog.text;
_old_pk_values pg_catalog.text;
_func_def pg_catalog.text;
_relevant_columns_check pg_catalog.text;
_truncate_statement pg_catalog.text;
Expand All @@ -598,6 +600,10 @@ begin
into strict _pk_values
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name);

select pg_catalog.string_agg(pg_catalog.format('old.%I', x.attname), ', ' order by x.attnum)
into strict _old_pk_values
from pg_catalog.jsonb_to_recordset(source_pk) x(attnum int, attname name);

if target_schema is not null and target_table is not null then
-- Create delete statement for deleted rows
_delete_statement := format('delete from %I.%I where %s', target_schema, target_table,
Expand Down Expand Up @@ -636,6 +642,8 @@ begin
if (TG_LEVEL = 'ROW') then
if (TG_OP = 'DELETE') then
$DELETE_STATEMENT$;
insert into $QUEUE_SCHEMA$.$QUEUE_TABLE$ ($PK_COLUMNS$)
values ($OLD_PK_VALUES$);
elsif (TG_OP = 'UPDATE') then
-- Check if the primary key has changed and queue the update
if $PK_CHANGE_CHECK$ then
Expand Down Expand Up @@ -673,6 +681,7 @@ begin
_func_def := replace(_func_def, '$QUEUE_TABLE$', quote_ident(queue_table));
_func_def := replace(_func_def, '$PK_COLUMNS$', _pk_columns);
_func_def := replace(_func_def, '$PK_VALUES$', _pk_values);
_func_def := replace(_func_def, '$OLD_PK_VALUES$', _old_pk_values);
_func_def := replace(_func_def, '$TARGET_SCHEMA$', quote_ident(target_schema));
_func_def := replace(_func_def, '$TARGET_TABLE$', quote_ident(target_table));
_func_def := replace(_func_def, '$RELEVANT_COLUMNS_CHECK$', _relevant_columns_check);
Expand Down Expand Up @@ -1266,4 +1275,83 @@ end
$func$
language plpgsql volatile security invoker
set search_path to pg_catalog, pg_temp
;
;

create or replace function ai._get_next_queue_batch(
queue_table pg_catalog.regclass,
batch_size pg_catalog.int4
) returns setof record AS $$
declare
source_pk pg_catalog.jsonb;
lock_id_string pg_catalog.text;
query pg_catalog.text;
lock_count pg_catalog.int4 := 0;
row record;
begin
-- get the source_pk for this queue table
select v.source_pk
into source_pk
from ai.vectorizer v
where pg_catalog.to_regclass(pg_catalog.format('%I.%I', v.queue_schema, v.queue_table)) operator(pg_catalog.=) _get_next_queue_batch.queue_table;

-- construct the "lock id string"
-- this is a string of all pk column names and their values, e.g. for a
-- two-column pk consisting of 'time' and 'url' this will generate:
-- hashtext(format('time|%s|url|%s', time, url))
select pg_catalog.format($fmt$pg_catalog.hashtext(pg_catalog.format('%s', %s))$fmt$, format_string, format_args)
into lock_id_string
from (
select
pg_catalog.string_agg(pg_catalog.format('%s|%%s', attname), '|' order by attnum) as format_string
, pg_catalog.string_agg(attname, ', ' order by attnum) as format_args
from pg_catalog.jsonb_to_recordset(source_pk) as (attnum int, attname text)
) as _;

-- TODO: for very small batch sizes (<10), an array _may_ be faster
drop table if exists seen_lock_ids;
create temporary table seen_lock_ids (lock_id bigint);
create index on seen_lock_ids (lock_id);

-- construct query to get all
query := pg_catalog.format($sql$
select
q.ctid as _ctid
, %s as _lock_id
, q.*
from %s as q
where (retry_after is null or retry_after <= now())
and %s not in (
-- exclude all locks that we already hold
select objid::int
from pg_locks
where locktype = 'advisory'
and pid = pg_catalog.pg_backend_pid()
and classid = %s
)
$sql$, lock_id_string, _get_next_queue_batch.queue_table, lock_id_string, _get_next_queue_batch.queue_table::pg_catalog.oid);

for row in execute query
loop
if lock_count operator(pg_catalog.>=) batch_size then
exit;
end if;

if exists(select 1 from pg_temp.seen_lock_ids WHERE lock_id operator(pg_catalog.=) row._lock_id) then
continue;
end if;

insert into pg_temp.seen_lock_ids (lock_id) values (row._lock_id);

if pg_catalog.pg_try_advisory_lock(queue_table::pg_catalog.oid::int, row._lock_id) then
lock_count := lock_count operator(pg_catalog.+) 1;
return next row;
end if;
end loop;

drop table seen_lock_ids;

return;
end;
$$ language plpgsql
set search_path to pg_catalog, pg_temp
;
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- rename loading_retries and loading_retry_after for all existing queue tables
do language plpgsql $block$
declare
_vectorizer record;
begin
for _vectorizer in select queue_schema, queue_table from ai.vectorizer
loop
execute format('alter table %I.%I rename column loading_retries to attempts', _vectorizer.queue_schema, _vectorizer.queue_table);
execute format('alter table %I.%I rename column loading_retry_after to retry_after', _vectorizer.queue_schema, _vectorizer.queue_table);
end loop;
for _vectorizer in select queue_schema, queue_failed_table from ai.vectorizer
loop
execute format('alter table %I.%I add column attempts pg_catalog.int4 not null default 0', _vectorizer.queue_schema, _vectorizer.queue_failed_table);
end loop;
end;
$block$;
11 changes: 11 additions & 0 deletions projects/pgai/db/tests/golden/failed-queue-table-16.expected
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Table "ai._vectorizer_q_failed_1"
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
--------------+--------------------------+-----------+----------+----------+----------+-------------+--------------+-------------
title | text | | not null | | extended | | |
published | timestamp with time zone | | not null | | plain | | |
created_at | timestamp with time zone | | not null | now() | plain | | |
failure_step | text | | not null | ''::text | extended | | |
attempts | integer | | not null | 0 | plain | | |
Indexes:
"_vectorizer_q_failed_1_title_published_idx" btree (title, published)
Access method: heap
11 changes: 11 additions & 0 deletions projects/pgai/db/tests/golden/failed-queue-table-17.expected
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Table "ai._vectorizer_q_failed_1"
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
--------------+--------------------------+-----------+----------+----------+----------+-------------+--------------+-------------
title | text | | not null | | extended | | |
published | timestamp with time zone | | not null | | plain | | |
created_at | timestamp with time zone | | not null | now() | plain | | |
failure_step | text | | not null | ''::text | extended | | |
attempts | integer | | not null | 0 | plain | | |
Indexes:
"_vectorizer_q_failed_1_title_published_idx" btree (title, published)
Access method: heap
14 changes: 7 additions & 7 deletions projects/pgai/db/tests/golden/queue-table-16.expected
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
Table "ai._vectorizer_q_1"
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
---------------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+-------------
title | text | | not null | | extended | | |
published | timestamp with time zone | | not null | | plain | | |
queued_at | timestamp with time zone | | not null | now() | plain | | |
loading_retries | integer | | not null | 0 | plain | | |
loading_retry_after | timestamp with time zone | | | | plain | | |
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
-------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+-------------
title | text | | not null | | extended | | |
published | timestamp with time zone | | not null | | plain | | |
queued_at | timestamp with time zone | | not null | now() | plain | | |
attempts | integer | | not null | 0 | plain | | |
retry_after | timestamp with time zone | | | | plain | | |
Indexes:
"_vectorizer_q_1_title_published_idx" btree (title, published)
Access method: heap
14 changes: 7 additions & 7 deletions projects/pgai/db/tests/golden/queue-table-17.expected
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
Table "ai._vectorizer_q_1"
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
---------------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+-------------
title | text | | not null | | extended | | |
published | timestamp with time zone | | not null | | plain | | |
queued_at | timestamp with time zone | | not null | now() | plain | | |
loading_retries | integer | | not null | 0 | plain | | |
loading_retry_after | timestamp with time zone | | | | plain | | |
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
-------------+--------------------------+-----------+----------+---------+----------+-------------+--------------+-------------
title | text | | not null | | extended | | |
published | timestamp with time zone | | not null | | plain | | |
queued_at | timestamp with time zone | | not null | now() | plain | | |
attempts | integer | | not null | 0 | plain | | |
retry_after | timestamp with time zone | | | | plain | | |
Indexes:
"_vectorizer_q_1_title_published_idx" btree (title, published)
Access method: heap
4 changes: 4 additions & 0 deletions projects/pgai/db/tests/vectorizer/test_vectorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,10 @@ def test_vectorizer_timescaledb():
actual = psql_cmd(r"\d+ ai._vectorizer_q_1")
golden_check("queue-table", actual)

# does the queue failed table look right?
actual = psql_cmd(r"\d+ ai._vectorizer_q_failed_1")
golden_check("failed-queue-table", actual)

# does the view look right?
actual = psql_cmd(r"\d+ website.blog_embedding")
golden_check("view", actual)
Expand Down
5 changes: 4 additions & 1 deletion projects/pgai/pgai/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,10 @@ async def do() -> GenerateSQLResponse:
console.print(Syntax(resp.sql_statement, "sql", word_wrap=True))

if save_final_prompt:
save_final_prompt.expanduser().resolve().write_text(resp.final_prompt)
# The final prompt is the user prompt of the last message request we made.
save_final_prompt.expanduser().resolve().write_text(
str(resp.messages[-1][0].parts[-1].content)
)


@semantic_catalog.command()
Expand Down
Loading
Loading