diff --git a/Cargo.toml b/Cargo.toml index 3bec286a3..82c5e8e30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -109,3 +109,5 @@ rand = "0.9.0" indoc = "2.0.6" owo-colors = "4.2.0" json5 = "0.4.1" +aws-config = "1.6.2" +aws-sdk-s3 = "1.85.0" diff --git a/examples/amazon_s3_text_embedding/.env.example b/examples/amazon_s3_text_embedding/.env.example new file mode 100644 index 000000000..e199b294e --- /dev/null +++ b/examples/amazon_s3_text_embedding/.env.example @@ -0,0 +1,6 @@ +# Postgres database address for cocoindex +COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex + +# Amazon S3 Configuration +AMAZON_S3_BUCKET_NAME=your-bucket-name +AMAZON_S3_PREFIX=optional/prefix/path \ No newline at end of file diff --git a/examples/amazon_s3_text_embedding/.gitignore b/examples/amazon_s3_text_embedding/.gitignore new file mode 100644 index 000000000..2eea525d8 --- /dev/null +++ b/examples/amazon_s3_text_embedding/.gitignore @@ -0,0 +1 @@ +.env \ No newline at end of file diff --git a/examples/amazon_s3_text_embedding/README.md b/examples/amazon_s3_text_embedding/README.md new file mode 100644 index 000000000..930af08b6 --- /dev/null +++ b/examples/amazon_s3_text_embedding/README.md @@ -0,0 +1,104 @@ +This example builds an embedding index based on files stored in an Amazon S3 bucket. +It continuously updates the index as files are added / updated / deleted in the source bucket: +it keeps the index in sync with the Amazon S3 bucket effortlessly. + +## Prerequisite + +Before running the example, you need to: + +1. [Install Postgres](https://cocoindex.io/docs/getting_started/installation#-install-postgres) if you don't have one. + +2. Prepare for Amazon S3: + + - **Create an Amazon S3 bucket:** + - Go to the [AWS S3 Console](https://s3.console.aws.amazon.com/s3/home) and click **Create bucket**. Give it a unique name and choose a region. + - Or, use the AWS CLI: + ```sh + aws s3 mb s3://your-s3-bucket-name + ``` + + - **Upload your files to the bucket:** + - In the AWS Console, click your bucket, then click **Upload** and add your `.md`, `.txt`, `.docx`, or other files. + - Or, use the AWS CLI: + ```sh + aws s3 cp localfile.txt s3://your-s3-bucket-name/ + aws s3 cp your-folder/ s3://your-s3-bucket-name/ --recursive + ``` + + - **Set up AWS credentials:** + - The easiest way is to run: + ```sh + aws configure + ``` + Enter your AWS Access Key ID, Secret Access Key, region (e.g., `us-east-1`), and output format (`json`). + - This creates a credentials file at `~/.aws/credentials` and config at `~/.aws/config`. + - Alternatively, you can set environment variables: + ```sh + export AWS_ACCESS_KEY_ID=your-access-key-id + export AWS_SECRET_ACCESS_KEY=your-secret-access-key + export AWS_DEFAULT_REGION=us-east-1 + ``` + - If running on AWS EC2 or Lambda, you can use an [IAM role](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html) with S3 read permissions. + + - **(Optional) Specify a prefix** to restrict to a subfolder in the bucket by setting `AMAZON_S3_PREFIX` in your `.env`. + + See [AWS S3 documentation](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html) for more details. + +3. Create a `.env` file with your Amazon S3 bucket name and (optionally) prefix. + Start from copying the `.env.example`, and then edit it to fill in your bucket name and prefix. + + ```bash + cp .env.example .env + $EDITOR .env + ``` + + Example `.env` file: + ``` + # Database Configuration + DATABASE_URL=postgresql://localhost:5432/cocoindex + + # Amazon S3 Configuration + AMAZON_S3_BUCKET_NAME=your-bucket-name + AMAZON_S3_PREFIX=optional/prefix/path + ``` + +## Run + +Install dependencies: + +```sh +uv pip install -r requirements.txt +``` + +Setup: + +```sh +uv run main.py cocoindex setup +``` + +Run: + +```sh +uv run main.py +``` + +During running, it will keep observing changes in the Amazon S3 bucket and update the index automatically. +At the same time, it accepts queries from the terminal, and performs search on top of the up-to-date index. + + +## CocoInsight +CocoInsight is in Early Access now (Free) 😊 You found us! A quick 3 minute video tutorial about CocoInsight: [Watch on YouTube](https://youtu.be/ZnmyoHslBSc?si=pPLXWALztkA710r9). + +Run CocoInsight to understand your RAG data pipeline: + +```sh +uv run main.py cocoindex server -ci +``` + +You can also add a `-L` flag to make the server keep updating the index to reflect source changes at the same time: + +```sh +uv run main.py cocoindex server -ci -L +``` + +Then open the CocoInsight UI at [https://cocoindex.io/cocoinsight](https://cocoindex.io/cocoinsight). \ No newline at end of file diff --git a/examples/amazon_s3_text_embedding/main.py b/examples/amazon_s3_text_embedding/main.py new file mode 100644 index 000000000..0a94e02aa --- /dev/null +++ b/examples/amazon_s3_text_embedding/main.py @@ -0,0 +1,78 @@ +from dotenv import load_dotenv + +import asyncio +import cocoindex +import datetime +import os + +@cocoindex.flow_def(name="AmazonS3TextEmbedding") +def amazon_s3_text_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): + """ + Define an example flow that embeds text from Amazon S3 into a vector database. + """ + bucket_name = os.environ["AMAZON_S3_BUCKET_NAME"] + prefix = os.environ.get("AMAZON_S3_PREFIX", None) + + data_scope["documents"] = flow_builder.add_source( + cocoindex.sources.AmazonS3( + bucket_name=bucket_name, + prefix=prefix, + included_patterns=["*.md", "*.txt", "*.docx"], + binary=False), + refresh_interval=datetime.timedelta(minutes=1)) + + doc_embeddings = data_scope.add_collector() + + with data_scope["documents"].row() as doc: + doc["chunks"] = doc["content"].transform( + cocoindex.functions.SplitRecursively(), + language="markdown", chunk_size=2000, chunk_overlap=500) + + with doc["chunks"].row() as chunk: + chunk["embedding"] = chunk["text"].transform( + cocoindex.functions.SentenceTransformerEmbed( + model="sentence-transformers/all-MiniLM-L6-v2")) + doc_embeddings.collect(filename=doc["filename"], location=chunk["location"], + text=chunk["text"], embedding=chunk["embedding"]) + + doc_embeddings.export( + "doc_embeddings", + cocoindex.storages.Postgres(), + primary_key_fields=["filename", "location"], + vector_indexes=[ + cocoindex.VectorIndexDef( + field_name="embedding", + metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY)]) + +query_handler = cocoindex.query.SimpleSemanticsQueryHandler( + name="SemanticsSearch", + flow=amazon_s3_text_embedding_flow, + target_name="doc_embeddings", + query_transform_flow=lambda text: text.transform( + cocoindex.functions.SentenceTransformerEmbed( + model="sentence-transformers/all-MiniLM-L6-v2")), + default_similarity_metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY) + +@cocoindex.main_fn() +def _run(): + # Use a `FlowLiveUpdater` to keep the flow data updated. + with cocoindex.FlowLiveUpdater(amazon_s3_text_embedding_flow): + # Run queries in a loop to demonstrate the query capabilities. + while True: + try: + query = input("Enter search query (or Enter to quit): ") + if query == '': + break + results, _ = query_handler.search(query, 10) + print("\nSearch results:") + for result in results: + print(f"[{result.score:.3f}] {result.data['filename']}") + print(f" {result.data['text']}") + print("---") + print() + except KeyboardInterrupt: + break + +if __name__ == "__main__": + load_dotenv(override=True) + _run() diff --git a/examples/amazon_s3_text_embedding/requirements.txt b/examples/amazon_s3_text_embedding/requirements.txt new file mode 100644 index 000000000..d4e76dff7 --- /dev/null +++ b/examples/amazon_s3_text_embedding/requirements.txt @@ -0,0 +1,3 @@ +cocoindex +python-dotenv +boto3 \ No newline at end of file diff --git a/python/cocoindex/sources.py b/python/cocoindex/sources.py index c76fa2ecf..c12eac806 100644 --- a/python/cocoindex/sources.py +++ b/python/cocoindex/sources.py @@ -28,3 +28,15 @@ class GoogleDrive(op.SourceSpec): root_folder_ids: list[str] binary: bool = False recent_changes_poll_interval: datetime.timedelta | None = None + + +class AmazonS3(op.SourceSpec): + """Import data from an Amazon S3 bucket. Supports optional prefix and file filtering by glob patterns.""" + + _op_category = op.OpCategory.SOURCE + + bucket_name: str + prefix: str | None = None + binary: bool = False + included_patterns: list[str] | None = None + excluded_patterns: list[str] | None = None diff --git a/src/ops/registration.rs b/src/ops/registration.rs index 50c086d6f..059bde595 100644 --- a/src/ops/registration.rs +++ b/src/ops/registration.rs @@ -8,6 +8,7 @@ use std::sync::{Arc, LazyLock, RwLock, RwLockReadGuard}; fn register_executor_factories(registry: &mut ExecutorFactoryRegistry) -> Result<()> { sources::local_file::Factory.register(registry)?; sources::google_drive::Factory.register(registry)?; + sources::amazon_s3::Factory.register(registry)?; functions::parse_json::Factory.register(registry)?; functions::split_recursively::Factory.register(registry)?; diff --git a/src/ops/sources/amazon_s3.rs b/src/ops/sources/amazon_s3.rs new file mode 100644 index 000000000..615b42f5e --- /dev/null +++ b/src/ops/sources/amazon_s3.rs @@ -0,0 +1,214 @@ +use aws_config::meta::region::RegionProviderChain; +use aws_sdk_s3::Client; +use aws_config::Region; +use async_stream::try_stream; +use globset::{Glob, GlobSet, GlobSetBuilder}; +use log::warn; +use std::sync::Arc; +use crate::fields_value; + +use crate::base::field_attrs; +use crate::ops::sdk::*; + +#[derive(Debug, Deserialize)] +pub struct Spec { + bucket_name: String, + prefix: Option, + binary: bool, + included_patterns: Option>, + excluded_patterns: Option>, +} + +struct Executor { + client: Client, + bucket_name: String, + prefix: Option, + binary: bool, + included_glob_set: Option, + excluded_glob_set: Option, +} + +impl Executor { + fn is_excluded(&self, key: &str) -> bool { + self.excluded_glob_set + .as_ref() + .is_some_and(|glob_set| glob_set.is_match(key)) + } + + fn is_file_included(&self, key: &str) -> bool { + self.included_glob_set + .as_ref() + .is_none_or(|glob_set| glob_set.is_match(key)) + && !self.is_excluded(key) + } +} + +#[async_trait] +impl SourceExecutor for Executor { + fn list<'a>( + &'a self, + _options: &'a SourceExecutorListOptions, + ) -> BoxStream<'a, Result>> { + let client = &self.client; + let bucket = &self.bucket_name; + let prefix = &self.prefix; + let included_glob_set = &self.included_glob_set; + let excluded_glob_set = &self.excluded_glob_set; + try_stream! { + let mut continuation_token = None; + loop { + let mut req = client + .list_objects_v2() + .bucket(bucket); + if let Some(ref p) = prefix { + req = req.prefix(p); + } + if let Some(ref token) = continuation_token { + req = req.continuation_token(token); + } + let resp = req.send().await?; + if let Some(contents) = &resp.contents { + let mut batch = Vec::new(); + for obj in contents { + if let Some(key) = obj.key() { + // Only include files (not folders) + if key.ends_with('/') { continue; } + let include = included_glob_set + .as_ref() + .map(|gs| gs.is_match(key)) + .unwrap_or(true); + let exclude = excluded_glob_set + .as_ref() + .map(|gs| gs.is_match(key)) + .unwrap_or(false); + if include && !exclude { + batch.push(SourceRowMetadata { + key: KeyValue::Str(key.to_string().into()), + ordinal: obj.last_modified().map(|dt| Ordinal(dt.secs() as i64)), + }); + } + } + } + if !batch.is_empty() { + yield batch; + } + } + if resp.is_truncated == Some(true) { + continuation_token = resp.next_continuation_token.clone().map(|s| s.to_string()); + } else { + break; + } + } + }.boxed() + } + + async fn get_value( + &self, + key: &KeyValue, + options: &SourceExecutorGetOptions, + ) -> Result> { + let key_str = key.str_value()?; + if !self.is_file_included(key_str) { + return Ok(None); + } + let resp = self.client + .get_object() + .bucket(&self.bucket_name) + .key(key_str.as_ref()) + .send() + .await; + let obj = match resp { + Ok(o) => o, + Err(e) => { + warn!("Failed to fetch S3 object {}: {}", key_str, e); + return Ok(None); + } + }; + let bytes = obj.body.collect().await?.into_bytes(); + let value = if options.include_value { + Some(if self.binary { + fields_value!(bytes.to_vec()) + } else { + match String::from_utf8(bytes.to_vec()) { + Ok(s) => fields_value!(s), + Err(e) => { + warn!("Failed to decode S3 object {} as UTF-8: {}", key_str, e); + return Ok(None); + } + } + }) + } else { + None + }; + Ok(Some(SourceValue { value, ordinal: None })) + } +} + +pub struct Factory; + +#[async_trait] +impl SourceFactoryBase for Factory { + type Spec = Spec; + + fn name(&self) -> &str { + "AmazonS3" + } + + fn get_output_schema( + &self, + spec: &Spec, + _context: &FlowInstanceContext, + ) -> Result { + let mut struct_schema = StructSchema::default(); + let mut schema_builder = StructSchemaBuilder::new(&mut struct_schema); + let filename_field = schema_builder.add_field(FieldSchema::new( + "filename", + make_output_type(BasicValueType::Str), + )); + schema_builder.add_field(FieldSchema::new( + "content", + make_output_type(if spec.binary { + BasicValueType::Bytes + } else { + BasicValueType::Str + }) + .with_attr( + field_attrs::CONTENT_FILENAME, + serde_json::to_value(filename_field.to_field_ref())?, + ), + )); + Ok(make_output_type(TableSchema::new( + TableKind::KTable, + struct_schema, + ))) + } + + async fn build_executor( + self: Arc, + spec: Spec, + _context: Arc, + ) -> Result> { + let region_provider = RegionProviderChain::default_provider().or_else(Region::new("us-east-1")); + let config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region(region_provider) + .load() + .await; + let client = Client::new(&config); + Ok(Box::new(Executor { + client, + bucket_name: spec.bucket_name, + prefix: spec.prefix, + binary: spec.binary, + included_glob_set: spec.included_patterns.map(build_glob_set).transpose()?, + excluded_glob_set: spec.excluded_patterns.map(build_glob_set).transpose()?, + })) + } +} + +fn build_glob_set(patterns: Vec) -> Result { + let mut builder = GlobSetBuilder::new(); + for pattern in patterns { + builder.add(Glob::new(pattern.as_str())?); + } + Ok(builder.build()?) +} \ No newline at end of file diff --git a/src/ops/sources/mod.rs b/src/ops/sources/mod.rs index d5b45e352..0e1341c73 100644 --- a/src/ops/sources/mod.rs +++ b/src/ops/sources/mod.rs @@ -1,2 +1,3 @@ pub mod google_drive; pub mod local_file; +pub mod amazon_s3;