Skip to content

Commit ecc9377

Browse files
authored
feat: implement dynamic PostgreSQL source with composite key support (#910)
* feat: implement dynamic PostgreSQL source with composite key support - Add PostgreSQL table source with dynamic schema generation - Support both single and composite primary keys with proper KTable structure - Implement type-safe column access using name lookup instead of indices - Add comprehensive PostgreSQL to CocoIndex type mapping - Support KeyValue::Struct for composite keys with individual field access - Update example with environment variable validation for key configuration * style: format * chore: undo changes unrelated to this PR * chore: undo more unrelated format changes * fix: fix `list()` interface and use `Postgres` instead of `PostgresDb` * refactor: create a consolidated `get_db_pool()` shared by source/target * directly keep cocoindex types in `Executor` * simplify conversion from pg->coco values, no detour * simplify key decoding logic * reduce code copy-pastes * fix: clarify error message when table not found * fix: support ordinal correctly and avoid per-row column name search * reorganize the example to make it simpler * fix: correctly support types like `integer` * example: finalize the example and `README` * cleanup: remove unused files * example: cleanup pyproject, rename * example: update dependency version
1 parent 26f1b5f commit ecc9377

File tree

16 files changed

+1041
-75
lines changed

16 files changed

+1041
-75
lines changed

examples/paper_metadata/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ We appreciate a star ⭐ at [CocoIndex Github](https://github.com/cocoindex-io/c
2929

3030
1. [Install Postgres](https://cocoindex.io/docs/getting_started/installation#-install-postgres) if you don't have one.
3131

32-
2. dependencies:
32+
2. Install dependencies:
3333

3434
```bash
3535
pip install -e .

examples/postgres_source/.env

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Database Configuration
2+
3+
# CocoIndex Database, for CocoIndex internal storage and target
4+
COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex
5+
6+
# Source Database, for data source
7+
SOURCE_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Database Configuration
2+
# CocoIndex Database (for storing embeddings)
3+
COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex
4+
5+
# Database URLs
6+
SOURCE_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/source_data
7+
8+
# ========================================
9+
# Configuration for test_simple table
10+
# ========================================
11+
TABLE_NAME=test_simple
12+
KEY_COLUMN_FOR_SINGLE_KEY=id
13+
INDEXING_COLUMN=message
14+
ORDINAL_COLUMN=created_at
15+
16+
# ========================================
17+
# Configuration for test_multiple table
18+
# ========================================
19+
TABLE_NAME=test_multiple
20+
KEY_COLUMNS_FOR_MULTIPLE_KEYS=product_category,product_name
21+
INDEXING_COLUMN=description
22+
ORDINAL_COLUMN=modified_time

examples/postgres_source/README.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# PostgreSQL Source Example 🗄️
2+
3+
[![GitHub](https://img.shields.io/github/stars/cocoindex-io/cocoindex?color=5B5BD6)](https://github.com/cocoindex-io/cocoindex)
4+
5+
This example demonstrates how to use Postgres tables as the source for CocoIndex.
6+
It reads structured data from existing PostgreSQL tables, performs calculations, generates embeddings, and stores them in a separate CocoIndex table.
7+
8+
We appreciate a star ⭐ at [CocoIndex Github](https://github.com/cocoindex-io/cocoindex) if this is helpful.
9+
10+
This example contains two flows:
11+
12+
1. `postgres_message_indexing_flow`: Read from a simpler table `source_messages` (single primary key), and generate embeddings for the `message` column.
13+
2. `postgres_product_indexing_flow`: Read from a more complex table `source_products` (composite primary key), compute additional fields and generates embeddings.
14+
15+
16+
## Prerequisites
17+
18+
Before running the example, you need to:
19+
20+
1. Install dependencies:
21+
22+
```bash
23+
pip install -e .
24+
```
25+
26+
2. Follow the [CocoIndex PostgreSQL setup guide](https://cocoindex.io/docs/getting_started/quickstart) to install and configure PostgreSQL with pgvector extension.
27+
28+
3. Create source tables `source_messages` and `source_products` with sample data:
29+
30+
```bash
31+
$ psql "postgres://cocoindex:cocoindex@localhost/cocoindex" -f ./prepare_source_data.sql
32+
```
33+
34+
For simplicity, we use the same database for source and target. You can also setup a separate Postgres database to use as the source database.
35+
Remember to update the `SOURCE_DATABASE_URL` in `.env` file if you use a separate database.
36+
37+
## Run
38+
39+
Update index, which will also setup the tables at the first time:
40+
41+
```bash
42+
cocoindex update --setup main.py
43+
```
44+
45+
## CocoInsight
46+
CocoInsight is in Early Access now (Free) 😊 You found us! A quick 3 minute video tutorial about CocoInsight: [Watch on YouTube](https://youtu.be/ZnmyoHslBSc?si=pPLXWALztkA710r9).
47+
48+
Run CocoInsight to understand your RAG data pipeline:
49+
50+
```sh
51+
cocoindex server -ci main.py
52+
```
53+
54+
You can also add a `-L` flag to make the server keep updating the index to reflect source changes at the same time:
55+
56+
```sh
57+
cocoindex server -ci -L main.py
58+
```
59+
60+
Then open the CocoInsight UI at [https://cocoindex.io/cocoinsight](https://cocoindex.io/cocoinsight).

examples/postgres_source/main.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import cocoindex
2+
import os
3+
4+
5+
@cocoindex.flow_def(name="PostgresMessageIndexing")
6+
def postgres_message_indexing_flow(
7+
flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
8+
) -> None:
9+
"""
10+
Define a flow that reads data from a PostgreSQL table, generates embeddings,
11+
and stores them in another PostgreSQL table with pgvector.
12+
"""
13+
14+
data_scope["messages"] = flow_builder.add_source(
15+
cocoindex.sources.Postgres(
16+
table_name="source_messages",
17+
# Optional. Use the default CocoIndex database if not specified.
18+
database=cocoindex.add_transient_auth_entry(
19+
cocoindex.sources.DatabaseConnectionSpec(
20+
url=os.getenv("SOURCE_DATABASE_URL"),
21+
)
22+
),
23+
# Optional.
24+
ordinal_column="created_at",
25+
)
26+
)
27+
28+
indexed_messages = data_scope.add_collector()
29+
with data_scope["messages"].row() as message_row:
30+
# Use the indexing column for embedding generation
31+
message_row["embedding"] = message_row["message"].transform(
32+
cocoindex.functions.SentenceTransformerEmbed(
33+
model="sentence-transformers/all-MiniLM-L6-v2"
34+
)
35+
)
36+
# Collect the data - include key columns and content
37+
indexed_messages.collect(
38+
id=message_row["id"],
39+
author=message_row["author"],
40+
message=message_row["message"],
41+
embedding=message_row["embedding"],
42+
)
43+
44+
indexed_messages.export(
45+
"output",
46+
cocoindex.targets.Postgres(),
47+
primary_key_fields=["id"],
48+
vector_indexes=[
49+
cocoindex.VectorIndexDef(
50+
field_name="embedding",
51+
metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY,
52+
)
53+
],
54+
)
55+
56+
57+
@cocoindex.op.function()
58+
def calculate_total_value(
59+
price: float,
60+
amount: int,
61+
) -> float:
62+
return price * amount
63+
64+
65+
@cocoindex.op.function()
66+
def make_full_description(
67+
category: str,
68+
name: str,
69+
description: str,
70+
) -> str:
71+
return f"Category: {category}\nName: {name}\n\n{description}"
72+
73+
74+
@cocoindex.flow_def(name="PostgresProductIndexing")
75+
def postgres_product_indexing_flow(
76+
flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
77+
) -> None:
78+
"""
79+
Define a flow that reads data from a PostgreSQL table, generates embeddings,
80+
and stores them in another PostgreSQL table with pgvector.
81+
"""
82+
data_scope["products"] = flow_builder.add_source(
83+
cocoindex.sources.Postgres(
84+
table_name="source_products",
85+
# Optional. Use the default CocoIndex database if not specified.
86+
database=cocoindex.add_transient_auth_entry(
87+
cocoindex.sources.DatabaseConnectionSpec(
88+
url=os.getenv("SOURCE_DATABASE_URL"),
89+
)
90+
),
91+
# Optional.
92+
ordinal_column="modified_time",
93+
)
94+
)
95+
96+
indexed_product = data_scope.add_collector()
97+
with data_scope["products"].row() as product:
98+
product["full_description"] = flow_builder.transform(
99+
make_full_description,
100+
product["_key"]["product_category"],
101+
product["_key"]["product_name"],
102+
product["description"],
103+
)
104+
product["total_value"] = flow_builder.transform(
105+
calculate_total_value,
106+
product["price"],
107+
product["amount"],
108+
)
109+
product["embedding"] = product["full_description"].transform(
110+
cocoindex.functions.SentenceTransformerEmbed(
111+
model="sentence-transformers/all-MiniLM-L6-v2"
112+
)
113+
)
114+
indexed_product.collect(
115+
product_category=product["_key"]["product_category"],
116+
product_name=product["_key"]["product_name"],
117+
description=product["description"],
118+
price=product["price"],
119+
amount=product["amount"],
120+
total_value=product["total_value"],
121+
embedding=product["embedding"],
122+
)
123+
124+
indexed_product.export(
125+
"output",
126+
cocoindex.targets.Postgres(),
127+
primary_key_fields=["product_category", "product_name"],
128+
vector_indexes=[
129+
cocoindex.VectorIndexDef(
130+
field_name="embedding",
131+
metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY,
132+
)
133+
],
134+
)
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
-- Usage: run with psql from your shell, for example:
2+
-- $ psql "postgres://cocoindex:cocoindex@localhost/cocoindex" -f ./prepare_source_data.sql
3+
-- ========================================
4+
-- Simple schema: source_messages (single primary key)
5+
-- ========================================
6+
DROP TABLE IF EXISTS source_messages CASCADE;
7+
CREATE TABLE source_messages (
8+
id uuid NOT NULL PRIMARY KEY DEFAULT gen_random_uuid(),
9+
author text NOT NULL,
10+
message text NOT NULL,
11+
created_at timestamp DEFAULT CURRENT_TIMESTAMP
12+
);
13+
INSERT INTO source_messages (author, message)
14+
VALUES (
15+
'Jane Smith',
16+
'Hello world! This is a test message.'
17+
),
18+
(
19+
'John Doe',
20+
'PostgreSQL source integration is working great!'
21+
),
22+
(
23+
'Jane Smith',
24+
'CocoIndex makes database processing so much easier.'
25+
),
26+
(
27+
'John Doe',
28+
'Embeddings and vector search are powerful tools.'
29+
),
30+
(
31+
'John Doe',
32+
'Natural language processing meets database technology.'
33+
) ON CONFLICT DO NOTHING;
34+
-- ========================================
35+
-- Multiple schema: source_products (composite primary key)
36+
-- ========================================
37+
DROP TABLE IF EXISTS source_products CASCADE;
38+
CREATE TABLE source_products (
39+
product_category text NOT NULL,
40+
product_name text NOT NULL,
41+
description text,
42+
price double precision,
43+
amount integer,
44+
modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
45+
PRIMARY KEY (product_category, product_name)
46+
);
47+
INSERT INTO source_products (
48+
product_category,
49+
product_name,
50+
description,
51+
price,
52+
amount,
53+
modified_time
54+
)
55+
VALUES (
56+
'Electronics',
57+
'Wireless Headphones',
58+
'High-quality wireless headphones with noise cancellation',
59+
199.99,
60+
50,
61+
NOW() - INTERVAL '3 days'
62+
),
63+
(
64+
'Electronics',
65+
'Smartphone',
66+
'Latest flagship smartphone with advanced camera',
67+
899.99,
68+
25,
69+
NOW() - INTERVAL '12 days'
70+
),
71+
(
72+
'Electronics',
73+
'Laptop',
74+
'High-performance laptop for work and gaming',
75+
1299.99,
76+
15,
77+
NOW() - INTERVAL '20 days'
78+
),
79+
(
80+
'Appliances',
81+
'Coffee Maker',
82+
'Programmable coffee maker with 12-cup capacity',
83+
89.99,
84+
30,
85+
NOW() - INTERVAL '5 days'
86+
),
87+
(
88+
'Sports',
89+
'Running Shoes',
90+
'Lightweight running shoes for daily training',
91+
129.5,
92+
60,
93+
NOW() - INTERVAL '1 day'
94+
) ON CONFLICT (product_category, product_name) DO NOTHING;
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[project]
2+
name = "postgres-source"
3+
version = "0.1.0"
4+
description = "Demonstrate how to use Postgres tables as the source for CocoIndex."
5+
requires-python = ">=3.11"
6+
dependencies = ["cocoindex[embeddings]>=0.1.83"]
7+
8+
[tool.setuptools]
9+
packages = []

python/cocoindex/sources.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from . import op
44
from .auth_registry import TransientAuthEntryReference
5+
from .setting import DatabaseConnectionSpec
56
import datetime
67

78

@@ -67,3 +68,22 @@ class AzureBlob(op.SourceSpec):
6768

6869
sas_token: TransientAuthEntryReference[str] | None = None
6970
account_access_key: TransientAuthEntryReference[str] | None = None
71+
72+
73+
class Postgres(op.SourceSpec):
74+
"""Import data from a PostgreSQL table."""
75+
76+
_op_category = op.OpCategory.SOURCE
77+
78+
# Table name to read from (required)
79+
table_name: str
80+
81+
# Database connection reference (optional - uses default if not provided)
82+
database: TransientAuthEntryReference[DatabaseConnectionSpec] | None = None
83+
84+
# Optional: specific columns to include (if None, includes all columns)
85+
included_columns: list[str] | None = None
86+
87+
# Optional: column name to use for ordinal tracking (for incremental updates)
88+
# Should be a timestamp, serial, or other incrementing column
89+
ordinal_column: str | None = None

src/ops/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod registry;
44
// All operations
55
mod factory_bases;
66
mod functions;
7+
mod shared;
78
mod sources;
89
mod targets;
910

src/ops/registration.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ fn register_executor_factories(registry: &mut ExecutorFactoryRegistry) -> Result
1212
sources::google_drive::Factory.register(registry)?;
1313
sources::amazon_s3::Factory.register(registry)?;
1414
sources::azure_blob::Factory.register(registry)?;
15+
sources::postgres::Factory.register(registry)?;
1516

1617
functions::parse_json::Factory.register(registry)?;
1718
functions::split_recursively::register(registry)?;

0 commit comments

Comments
 (0)