Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,5 @@ rand = "0.9.0"
indoc = "2.0.6"
owo-colors = "4.2.0"
json5 = "0.4.1"
aws-config = "1.6.2"
aws-sdk-s3 = "1.85.0"
6 changes: 6 additions & 0 deletions examples/amazon_s3_text_embedding/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Postgres database address for cocoindex
COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex

# Amazon S3 Configuration
AMAZON_S3_BUCKET_NAME=your-bucket-name
AMAZON_S3_PREFIX=optional/prefix/path
1 change: 1 addition & 0 deletions examples/amazon_s3_text_embedding/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.env
104 changes: 104 additions & 0 deletions examples/amazon_s3_text_embedding/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
This example builds an embedding index based on files stored in an Amazon S3 bucket.
It continuously updates the index as files are added / updated / deleted in the source bucket:
it keeps the index in sync with the Amazon S3 bucket effortlessly.

## Prerequisite

Before running the example, you need to:

1. [Install Postgres](https://cocoindex.io/docs/getting_started/installation#-install-postgres) if you don't have one.

2. Prepare for Amazon S3:

- **Create an Amazon S3 bucket:**
- Go to the [AWS S3 Console](https://s3.console.aws.amazon.com/s3/home) and click **Create bucket**. Give it a unique name and choose a region.
- Or, use the AWS CLI:
```sh
aws s3 mb s3://your-s3-bucket-name
```

- **Upload your files to the bucket:**
- In the AWS Console, click your bucket, then click **Upload** and add your `.md`, `.txt`, `.docx`, or other files.
- Or, use the AWS CLI:
```sh
aws s3 cp localfile.txt s3://your-s3-bucket-name/
aws s3 cp your-folder/ s3://your-s3-bucket-name/ --recursive
```

- **Set up AWS credentials:**
- The easiest way is to run:
```sh
aws configure
```
Enter your AWS Access Key ID, Secret Access Key, region (e.g., `us-east-1`), and output format (`json`).
- This creates a credentials file at `~/.aws/credentials` and config at `~/.aws/config`.
- Alternatively, you can set environment variables:
```sh
export AWS_ACCESS_KEY_ID=your-access-key-id
export AWS_SECRET_ACCESS_KEY=your-secret-access-key
export AWS_DEFAULT_REGION=us-east-1
```
- If running on AWS EC2 or Lambda, you can use an [IAM role](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html) with S3 read permissions.

- **(Optional) Specify a prefix** to restrict to a subfolder in the bucket by setting `AMAZON_S3_PREFIX` in your `.env`.

See [AWS S3 documentation](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html) for more details.

3. Create a `.env` file with your Amazon S3 bucket name and (optionally) prefix.
Start from copying the `.env.example`, and then edit it to fill in your bucket name and prefix.

```bash
cp .env.example .env
$EDITOR .env
```

Example `.env` file:
```
# Database Configuration
DATABASE_URL=postgresql://localhost:5432/cocoindex

# Amazon S3 Configuration
AMAZON_S3_BUCKET_NAME=your-bucket-name
AMAZON_S3_PREFIX=optional/prefix/path
```

## Run

Install dependencies:

```sh
uv pip install -r requirements.txt
```

Setup:

```sh
uv run main.py cocoindex setup
```

Run:

```sh
uv run main.py
```

During running, it will keep observing changes in the Amazon S3 bucket and update the index automatically.
At the same time, it accepts queries from the terminal, and performs search on top of the up-to-date index.


## CocoInsight
CocoInsight is in Early Access now (Free) 😊 You found us! A quick 3 minute video tutorial about CocoInsight: [Watch on YouTube](https://youtu.be/ZnmyoHslBSc?si=pPLXWALztkA710r9).

Run CocoInsight to understand your RAG data pipeline:

```sh
uv run main.py cocoindex server -ci
```

You can also add a `-L` flag to make the server keep updating the index to reflect source changes at the same time:

```sh
uv run main.py cocoindex server -ci -L
```

Then open the CocoInsight UI at [https://cocoindex.io/cocoinsight](https://cocoindex.io/cocoinsight).
78 changes: 78 additions & 0 deletions examples/amazon_s3_text_embedding/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from dotenv import load_dotenv

import asyncio
import cocoindex
import datetime
import os

@cocoindex.flow_def(name="AmazonS3TextEmbedding")
def amazon_s3_text_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
"""
Define an example flow that embeds text from Amazon S3 into a vector database.
"""
bucket_name = os.environ["AMAZON_S3_BUCKET_NAME"]
prefix = os.environ.get("AMAZON_S3_PREFIX", None)

data_scope["documents"] = flow_builder.add_source(
cocoindex.sources.AmazonS3(
bucket_name=bucket_name,
prefix=prefix,
included_patterns=["*.md", "*.txt", "*.docx"],
binary=False),
refresh_interval=datetime.timedelta(minutes=1))

doc_embeddings = data_scope.add_collector()

with data_scope["documents"].row() as doc:
doc["chunks"] = doc["content"].transform(
cocoindex.functions.SplitRecursively(),
language="markdown", chunk_size=2000, chunk_overlap=500)

with doc["chunks"].row() as chunk:
chunk["embedding"] = chunk["text"].transform(
cocoindex.functions.SentenceTransformerEmbed(
model="sentence-transformers/all-MiniLM-L6-v2"))
doc_embeddings.collect(filename=doc["filename"], location=chunk["location"],
text=chunk["text"], embedding=chunk["embedding"])

doc_embeddings.export(
"doc_embeddings",
cocoindex.storages.Postgres(),
primary_key_fields=["filename", "location"],
vector_indexes=[
cocoindex.VectorIndexDef(
field_name="embedding",
metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY)])

query_handler = cocoindex.query.SimpleSemanticsQueryHandler(
name="SemanticsSearch",
flow=amazon_s3_text_embedding_flow,
target_name="doc_embeddings",
query_transform_flow=lambda text: text.transform(
cocoindex.functions.SentenceTransformerEmbed(
model="sentence-transformers/all-MiniLM-L6-v2")),
default_similarity_metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY)

@cocoindex.main_fn()
def _run():
# Use a `FlowLiveUpdater` to keep the flow data updated.
with cocoindex.FlowLiveUpdater(amazon_s3_text_embedding_flow):
# Run queries in a loop to demonstrate the query capabilities.
while True:
try:
query = input("Enter search query (or Enter to quit): ")
if query == '':
break
results, _ = query_handler.search(query, 10)
print("\nSearch results:")
for result in results:
print(f"[{result.score:.3f}] {result.data['filename']}")
print(f" {result.data['text']}")
print("---")
print()
except KeyboardInterrupt:
break

if __name__ == "__main__":
load_dotenv(override=True)
_run()
3 changes: 3 additions & 0 deletions examples/amazon_s3_text_embedding/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
cocoindex
python-dotenv
boto3
12 changes: 12 additions & 0 deletions python/cocoindex/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,15 @@ class GoogleDrive(op.SourceSpec):
root_folder_ids: list[str]
binary: bool = False
recent_changes_poll_interval: datetime.timedelta | None = None


class AmazonS3(op.SourceSpec):
"""Import data from an Amazon S3 bucket. Supports optional prefix and file filtering by glob patterns."""

_op_category = op.OpCategory.SOURCE

bucket_name: str
prefix: str | None = None
binary: bool = False
included_patterns: list[str] | None = None
excluded_patterns: list[str] | None = None
1 change: 1 addition & 0 deletions src/ops/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::sync::{Arc, LazyLock, RwLock, RwLockReadGuard};
fn register_executor_factories(registry: &mut ExecutorFactoryRegistry) -> Result<()> {
sources::local_file::Factory.register(registry)?;
sources::google_drive::Factory.register(registry)?;
sources::amazon_s3::Factory.register(registry)?;

functions::parse_json::Factory.register(registry)?;
functions::split_recursively::Factory.register(registry)?;
Expand Down
Loading