Skip to content

Commit 90e1d33

Browse files
authored
docs: postgres example (#958)
1 parent b9f46e9 commit 90e1d33

File tree

9 files changed

+298
-0
lines changed

9 files changed

+298
-0
lines changed
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
---
2+
title: Transform Data From Structured Source in PostgreSQL
3+
description: Transform data from PostgreSQL table as source, transform with both AI models and non-AI data mappings, and write them into PostgreSQL/PgVector for semantic + structured search.
4+
sidebar_class_name: hidden
5+
slug: /examples/postgres_source
6+
canonicalUrl: '/examples/postgres_source'
7+
sidebar_custom_props:
8+
image: /img/examples/postgres_source/cover.png
9+
tags: [data-mapping, vector-index, postgres]
10+
tags: [data-mapping, vector-index, postgres]
11+
---
12+
import { GitHubButton, YouTubeButton, DocumentationButton } from '../../../src/components/GitHubButton';
13+
14+
<GitHubButton url="https://github.com/cocoindex-io/cocoindex/tree/main/examples/postgres_source" margin="0 0 24px 0" /
15+
>
16+
![PostgreSQL Product Indexing Flow](/img/examples/postgres_source/cover.png)
17+
18+
[CocoIndex](https://github.com/cocoindex-io/cocoindex) is one framework for building **incremental** data flows across **structured and unstructured** sources. This tutorial shows how to take data from PostgreSQL table as source, transform with both AI and non-AI data mappings, and write them into a new PostgreSQL table with PgVector for semantic + structured search.
19+
20+
## PostgreSQL Product Indexing Flow
21+
![PostgreSQL Product Indexing Flow](/img/examples/postgres_source/flow.png)
22+
23+
- Reading data from a PostgreSQL table `source_products`.
24+
- Computing additional fields (`total_value`, `full_description`).
25+
- Generating embeddings for semantic search.
26+
- Storing the results in another PostgreSQL table with a vector index using pgvector
27+
28+
29+
### Connect to source
30+
31+
`flow_builder.add_source` reads rows from `source_products`.
32+
33+
```python
34+
@cocoindex.flow_def(name="PostgresProductIndexing")
35+
def postgres_product_indexing_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope) -> None:
36+
37+
data_scope["products"] = flow_builder.add_source(
38+
cocoindex.sources.Postgres(
39+
table_name="source_products",
40+
# Optional. Use the default CocoIndex database if not specified.
41+
database=cocoindex.add_transient_auth_entry(
42+
cocoindex.DatabaseConnectionSpec(
43+
url=os.environ["SOURCE_DATABASE_URL"],
44+
)
45+
),
46+
# Optional.
47+
ordinal_column="modified_time",
48+
notification=cocoindex.sources.PostgresNotification(),
49+
),
50+
)
51+
```
52+
This step adds source data from PostgreSQL table `source_products` to the flow as a `KTable`.
53+
54+
![Add PostgreSQL Source](/img/examples/postgres_source/source.png)
55+
56+
- Incremental Sync: When new or updated rows are found, only those rows are run through the pipeline, so downstream indexes and search results reflect the latest data while unchanged rows are untouched.
57+
- `ordinal_column` is recommended for change detection so the pipeline processes what's changed.
58+
- `notification`: when present, enable change capture based on Postgres LISTEN/NOTIFY.
59+
60+
Check [Postgres source](https://cocoindex.io/docs/ops/sources#postgres) for more details.
61+
62+
If you use the Postgres database hosted by Supabase, please click Connect on your project dashboard and find the URL there. Check [DatabaseConnectionSpec](https://cocoindex.io/docs/core/settings#databaseconnectionspec)
63+
for more details.
64+
65+
## Simple Data Mapping / Transformation
66+
67+
Create a simple transformation to calculate the total price.
68+
69+
```python
70+
@cocoindex.op.function()
71+
def calculate_total_value(price: float, amount: int) -> float:
72+
"""Compute total value for each product."""
73+
return price * amount
74+
```
75+
76+
Plug into the flow:
77+
78+
```python
79+
with data_scope["products"].row() as product:
80+
# Compute total value
81+
product["total_value"] = flow_builder.transform(
82+
calculate_total_value,
83+
product["price"],
84+
product["amount"],
85+
)
86+
```
87+
88+
![Calculate Total Value](/img/examples/postgres_source/price.png)
89+
90+
### Data Transformation & AI Transformation
91+
92+
Create a custom function creates a `full_description` field by combining the product’s category, name, and description.
93+
94+
```python
95+
@cocoindex.op.function()
96+
def make_full_description(category: str, name: str, description: str) -> str:
97+
"""Create a detailed product description for embedding."
98+
return f"Category: {category}\nName: {name}\n\n{description}"
99+
100+
```
101+
102+
Embeddings often perform better with more context. By combining fields into a single text string, we ensure that the semantic meaning of the product is captured fully.
103+
104+
Now plug into the flow:
105+
106+
```python
107+
with data_scope["products"].row() as product:
108+
#.. other transformations
109+
110+
# Compute full description
111+
product["full_description"] = flow_builder.transform(
112+
make_full_description,
113+
product["product_category"],
114+
product["product_name"],
115+
product["description"],
116+
)
117+
118+
# Generate embeddings
119+
product["embedding"] = product["full_description"].transform(
120+
cocoindex.functions.SentenceTransformerEmbed(
121+
model="sentence-transformers/all-MiniLM-L6-v2"
122+
)
123+
)
124+
125+
# Collect data
126+
indexed_product.collect(
127+
product_category=product["product_category"],
128+
product_name=product["product_name"],
129+
description=product["description"],
130+
price=product["price"],
131+
amount=product["amount"],
132+
total_value=product["total_value"],
133+
embedding=product["embedding"],
134+
)
135+
```
136+
137+
This takes each product row, and does the following:
138+
139+
1. builds a rich description.
140+
141+
![Make Full Description](/img/examples/postgres_source/description.png)
142+
143+
2. turns it into an embedding
144+
145+
![Embed Full Description](/img/examples/postgres_source/embed.png)
146+
147+
3. collects the embedding along with structured fields (category, name, price, etc.).
148+
149+
![Collect Embedding](/img/examples/postgres_source/collector.png)
150+
151+
152+
## Export
153+
154+
```python
155+
indexed_product.export(
156+
"output",
157+
cocoindex.targets.Postgres(),
158+
primary_key_fields=["product_category", "product_name"],
159+
vector_indexes=[
160+
cocoindex.VectorIndexDef(
161+
field_name="embedding",
162+
metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY,
163+
)
164+
],
165+
)
166+
```
167+
168+
All transformed rows are collected and exported to a new PostgreSQL table with a vector index, ready for semantic search.
169+
170+
171+
## Field lineage
172+
When the transform flow starts to getting complex, it's hard to understand how each field is derived.
173+
CocoIndex provides a way to visualize the lineage of each field, to make it easier to trace and troubleshoot field origins and downstream dependencies.
174+
175+
For example, the following image shows the lineage of the `embedding` field, you can click from the final output backward all the way to the source fields, step by step.
176+
177+
![Field Lineage](/img/examples/postgres_source/lineage.png)
178+
179+
180+
## Running the Pipeline
181+
182+
1. Set up dependencies:
183+
184+
```bash
185+
pip install -e .
186+
```
187+
188+
2. Create the source table with sample data:
189+
190+
```bash
191+
psql "postgres://cocoindex:cocoindex@localhost/cocoindex" -f ./prepare_source_data.sql
192+
```
193+
194+
3. Setup tables and update the index:
195+
196+
```bash
197+
cocoindex update --setup main.py
198+
```
199+
200+
4. Run CocoInsight:
201+
202+
```bash
203+
cocoindex server -ci main.py
204+
```
205+
You can walk through the project step by step in CocoInsight to see exactly how each field is constructed and what happens behind the scenes. It connects to your local CocoIndex server, with zero pipeline data retention.
206+
207+
208+
## Continuous Updating
209+
210+
For continuous updating when the source changes, add `-L`:
211+
212+
```bash
213+
cocoindex server -ci -L main.py
214+
```
215+
Check [live updates](https://cocoindex.io/docs/tutorials/live_updates) for more details.
216+
217+
## Search and Query the Index
218+
219+
### Query
220+
221+
Runs a semantic similarity search over the indexed products table, returning the top matches for a given query.
222+
223+
```python
224+
def search(pool: ConnectionPool, query: str, top_k: int = 5) -> list[dict[str, Any]]:
225+
# Get the table name, for the export target in the text_embedding_flow above.
226+
table_name = cocoindex.utils.get_target_default_name(
227+
postgres_product_indexing_flow, "output"
228+
)
229+
# Evaluate the transform flow defined above with the input query, to get the embedding.
230+
query_vector = text_to_embedding.eval(query)
231+
# Run the query and get the results.
232+
with pool.connection() as conn:
233+
register_vector(conn)
234+
with conn.cursor(row_factory=dict_row) as cur:
235+
cur.execute(
236+
f"""
237+
SELECT
238+
product_category,
239+
product_name,
240+
description,
241+
amount,
242+
total_value,
243+
(embedding <=> %s) AS distance
244+
FROM {table_name}
245+
ORDER BY distance ASC
246+
LIMIT %s
247+
""",
248+
(query_vector, top_k),
249+
)
250+
return cur.fetchall()
251+
```
252+
This function
253+
254+
- Converts the query text into an embedding (`query_vector`).
255+
- Compares it with each product’s stored embedding (`embedding`) using vector distance.
256+
- Returns the closest matches, including both metadata and the similarity score (`distance`).
257+
258+
### Create an command-line interactive loop
259+
260+
```python
261+
def _main() -> None:
262+
# Initialize the database connection pool.
263+
pool = ConnectionPool(os.environ["COCOINDEX_DATABASE_URL"])
264+
# Run queries in a loop to demonstrate the query capabilities.
265+
while True:
266+
query = input("Enter search query (or Enter to quit): ")
267+
if query == "":
268+
break
269+
# Run the query function with the database connection pool and the query.
270+
results = search(pool, query)
271+
print("\nSearch results:")
272+
for result in results:
273+
score = 1.0 - result["distance"]
274+
print(
275+
f"[{score:.3f}] {result['product_category']} | {result['product_name']} | {result['amount']} | {result['total_value']}"
276+
)
277+
print(f" {result['description']}")
278+
print("---")
279+
print()
280+
281+
if __name__ == "__main__":
282+
load_dotenv()
283+
cocoindex.init()
284+
_main()
285+
```
286+
287+
### Run as a Service
288+
289+
This [example](https://cocoindex.io/docs/examples/image_search#fast-api-application) runs as a service using Fast API.
290+
291+
292+
## Why One Framework for Structured + Unstructured?
293+
294+
- One mental model: Treat files, APIs, and databases uniformly; AI steps are ordinary ops.
295+
- Incremental by default: Use an ordinal column to sync only changes; no fragile glue jobs.
296+
- Consistency: Embeddings are always derived from the exact transformed row state.
297+
- Operational simplicity: One deployment, one lineage view, fewer moving parts.
298+
101 KB
Loading
668 KB
Loading
108 KB
Loading
109 KB
Loading
70.4 KB
Loading
210 KB
Loading
132 KB
Loading
94.4 KB
Loading

0 commit comments

Comments
 (0)