Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions examples/postgres_source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
[![GitHub](https://img.shields.io/github/stars/cocoindex-io/cocoindex?color=5B5BD6)](https://github.com/cocoindex-io/cocoindex)

This example demonstrates how to use Postgres tables as the source for CocoIndex.
It reads structured data from existing PostgreSQL tables, performs calculations, generates embeddings, and stores them in a separate CocoIndex table.
It reads structured product data from existing PostgreSQL tables, performs calculations, generates embeddings, and stores them in a separate CocoIndex table.

We appreciate a star ⭐ at [CocoIndex Github](https://github.com/cocoindex-io/cocoindex) if this is helpful.

This example contains two flows:
This example contains one flow:

1. `postgres_message_indexing_flow`: Read from a simpler table `source_messages` (single primary key), and generate embeddings for the `message` column.
2. `postgres_product_indexing_flow`: Read from a more complex table `source_products` (composite primary key), compute additional fields and generates embeddings.
`postgres_product_indexing_flow`: Read from a table `source_products` (composite primary key), compute additional fields like total value and full description, then generate embeddings for semantic search.


## Prerequisites
Expand All @@ -25,7 +24,7 @@ Before running the example, you need to:

2. Follow the [CocoIndex PostgreSQL setup guide](https://cocoindex.io/docs/getting_started/quickstart) to install and configure PostgreSQL with pgvector extension.

3. Create source tables `source_messages` and `source_products` with sample data:
3. Create source table `source_products` with sample data:

```bash
$ psql "postgres://cocoindex:cocoindex@localhost/cocoindex" -f ./prepare_source_data.sql
Expand Down
54 changes: 1 addition & 53 deletions examples/postgres_source/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,6 @@
import os


@cocoindex.flow_def(name="PostgresMessageIndexing")
def postgres_message_indexing_flow(
flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
) -> None:
"""
Define a flow that reads data from a PostgreSQL table, generates embeddings,
and stores them in another PostgreSQL table with pgvector.
"""

data_scope["messages"] = flow_builder.add_source(
cocoindex.sources.Postgres(
table_name="source_messages",
# Optional. Use the default CocoIndex database if not specified.
database=cocoindex.add_transient_auth_entry(
cocoindex.sources.DatabaseConnectionSpec(
url=os.getenv("SOURCE_DATABASE_URL"),
)
),
# Optional.
ordinal_column="created_at",
)
)

indexed_messages = data_scope.add_collector()
with data_scope["messages"].row() as message_row:
# Use the indexing column for embedding generation
message_row["embedding"] = message_row["message"].transform(
cocoindex.functions.SentenceTransformerEmbed(
model="sentence-transformers/all-MiniLM-L6-v2"
)
)
# Collect the data - include key columns and content
indexed_messages.collect(
id=message_row["id"],
author=message_row["author"],
message=message_row["message"],
embedding=message_row["embedding"],
)

indexed_messages.export(
"output",
cocoindex.targets.Postgres(),
primary_key_fields=["id"],
vector_indexes=[
cocoindex.VectorIndexDef(
field_name="embedding",
metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY,
)
],
)


@cocoindex.op.function()
def calculate_total_value(
price: float,
Expand All @@ -76,7 +24,7 @@ def postgres_product_indexing_flow(
flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
) -> None:
"""
Define a flow that reads data from a PostgreSQL table, generates embeddings,
Define a flow that reads product data from a PostgreSQL table, generates embeddings,
and stores them in another PostgreSQL table with pgvector.
"""
data_scope["products"] = flow_builder.add_source(
Expand Down
33 changes: 1 addition & 32 deletions examples/postgres_source/prepare_source_data.sql
Original file line number Diff line number Diff line change
@@ -1,38 +1,7 @@
-- Usage: run with psql from your shell, for example:
-- $ psql "postgres://cocoindex:cocoindex@localhost/cocoindex" -f ./prepare_source_data.sql
-- ========================================
-- Simple schema: source_messages (single primary key)
-- ========================================
DROP TABLE IF EXISTS source_messages CASCADE;
CREATE TABLE source_messages (
id uuid NOT NULL PRIMARY KEY DEFAULT gen_random_uuid(),
author text NOT NULL,
message text NOT NULL,
created_at timestamp DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO source_messages (author, message)
VALUES (
'Jane Smith',
'Hello world! This is a test message.'
),
(
'John Doe',
'PostgreSQL source integration is working great!'
),
(
'Jane Smith',
'CocoIndex makes database processing so much easier.'
),
(
'John Doe',
'Embeddings and vector search are powerful tools.'
),
(
'John Doe',
'Natural language processing meets database technology.'
) ON CONFLICT DO NOTHING;
-- ========================================
-- Multiple schema: source_products (composite primary key)
-- Product schema: source_products (composite primary key)
-- ========================================
DROP TABLE IF EXISTS source_products CASCADE;
CREATE TABLE source_products (
Expand Down