Skip to content

Commit bf3e373

Browse files
georgeh0wykrrr
andauthored
feat(azure-blob): support Azure blob store as a data source (#746)
* add Azure Blob Storage source with full authentication support (#736) - Implement Azure Blob Storage source following S3 pattern - Support multiple authentication methods with priority: * Connection string (highest priority) * SAS token (with proper permissions validation) * Account key (full access) * Anonymous access (public containers) - Include file pattern filtering (include/exclude glob patterns) * feat(azure-blob): simplify and fix authentication * docs: revise the docs for Azure Blob source --------- Co-authored-by: wykrrr <[email protected]>
1 parent 306c45e commit bf3e373

File tree

12 files changed

+1136
-12
lines changed

12 files changed

+1136
-12
lines changed

Cargo.lock

Lines changed: 602 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,13 @@ json5 = "0.4.1"
114114
aws-config = "1.6.2"
115115
aws-sdk-s3 = "1.85.0"
116116
aws-sdk-sqs = "1.67.0"
117+
time = { version = "0.3", features = ["macros", "serde"] }
117118
numpy = "0.25.0"
118119
infer = "0.19.0"
119120
serde_with = { version = "3.13.0", features = ["base64"] }
120121
google-cloud-aiplatform-v1 = "0.4.0"
122+
123+
azure_core = "0.21.0"
124+
azure_storage = "0.21.0"
125+
azure_storage_blobs = "0.21.0"
126+
azure_identity = "0.21.0"

docs/docs/ops/sources.md

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,72 @@ The spec takes the following fields:
148148
### Schema
149149

150150
The output is a [*KTable*](/docs/core/data_types#ktable) with the following sub fields:
151+
152+
* `filename` (*Str*, key): the filename of the file, including the path, relative to the root directory, e.g. `"dir1/file1.md"`.
153+
* `content` (*Str* if `binary` is `False`, otherwise *Bytes*): the content of the file.
154+
155+
156+
## AzureBlob
157+
158+
The `AzureBlob` source imports files from Azure Blob Storage.
159+
160+
### Setup for Azure Blob Storage
161+
162+
#### Get Started
163+
164+
If you didn't have experience with Azure Blob Storage, you can refer to the [quickstart](https://learn.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-portal).
165+
These are actions you need to take:
166+
167+
* Create a storage account in the [Azure Portal](https://portal.azure.com/).
168+
* Create a container in the storage account.
169+
* Upload your files to the container.
170+
* Grant the user / identity / service principal (depends on your authentication method, see below) access to the container. At minimum, a **Storage Blob Data Reader** role is needed. See [this doc](https://learn.microsoft.com/en-us/azure/storage/blobs/authorize-data-operations-portal) for reference.
171+
172+
#### Authentication
173+
174+
We use Azure’s **Default Credential** system (DefaultAzureCredential) for secure and flexible authentication.
175+
This allows you to connect to Azure services without putting any secrets in the code or flow spec.
176+
It automatically chooses the best authentication method based on your environment:
177+
178+
* On your local machine: uses your Azure CLI login (`az login`) or environment variables.
179+
180+
```sh
181+
az login
182+
# Optional: Set a default subscription if you have more than one
183+
az account set --subscription "<YOUR_SUBSCRIPTION_NAME_OR_ID>"
184+
```
185+
* In Azure (VM, App Service, AKS, etc.): uses the resource’s Managed Identity.
186+
* In automated environments: supports Service Principals via environment variables
187+
* `AZURE_CLIENT_ID`
188+
* `AZURE_TENANT_ID`
189+
* `AZURE_CLIENT_SECRET`
190+
191+
You can refer to [this doc](https://learn.microsoft.com/en-us/azure/developer/python/sdk/authentication/overview) for more details.
192+
193+
### Spec
194+
195+
The spec takes the following fields:
196+
197+
* `account_name` (`str`): the name of the storage account.
198+
* `container_name` (`str`): the name of the container.
199+
* `prefix` (`str`, optional): if provided, only files with path starting with this prefix will be imported.
200+
* `binary` (`bool`, optional): whether reading files as binary (instead of text).
201+
* `included_patterns` (`list[str]`, optional): a list of glob patterns to include files, e.g. `["*.txt", "docs/**/*.md"]`.
202+
If not specified, all files will be included.
203+
* `excluded_patterns` (`list[str]`, optional): a list of glob patterns to exclude files, e.g. `["*.tmp", "**/*.log"]`.
204+
Any file or directory matching these patterns will be excluded even if they match `included_patterns`.
205+
If not specified, no files will be excluded.
206+
207+
:::info
208+
209+
`included_patterns` and `excluded_patterns` are using Unix-style glob syntax. See [globset syntax](https://docs.rs/globset/latest/globset/index.html#syntax) for the details.
210+
211+
:::
212+
213+
### Schema
214+
215+
The output is a [*KTable*](/docs/core/data_types#ktable) with the following sub fields:
216+
151217
* `filename` (*Str*, key): the filename of the file, including the path, relative to the root directory, e.g. `"dir1/file1.md"`.
152218
* `content` (*Str* if `binary` is `False`, otherwise *Bytes*): the content of the file.
153219

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Database Configuration
2+
COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex
3+
4+
# Azure Blob Storage Configuration (Public test container - ready to use!)
5+
AZURE_STORAGE_ACCOUNT_NAME=testnamecocoindex1
6+
AZURE_BLOB_CONTAINER_NAME=testpublic1
7+
AZURE_BLOB_PREFIX=
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.env
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
This example builds an embedding index based on files stored in an Azure Blob Storage container.
2+
It continuously updates the index as files are added / updated / deleted in the source container:
3+
it keeps the index in sync with the Azure Blob Storage container effortlessly.
4+
5+
## Prerequisite
6+
7+
Before running the example, you need to:
8+
9+
1. [Install Postgres](https://cocoindex.io/docs/getting_started/installation#-install-postgres) if you don't have one.
10+
11+
2. Prepare for Azure Blob Storage.
12+
See [Setup for Azure Blob Storage](https://cocoindex.io/docs/ops/sources#setup-for-azure-blob-storage) for more details.
13+
14+
3. Create a `.env` file with your Azure Blob Storage container name and (optionally) prefix.
15+
Start from copying the `.env.example`, and then edit it to fill in your bucket name and prefix.
16+
17+
```bash
18+
cp .env.example .env
19+
$EDITOR .env
20+
```
21+
22+
Example `.env` file:
23+
```
24+
# Database Configuration
25+
DATABASE_URL=postgresql://localhost:5432/cocoindex
26+
27+
# Azure Blob Storage Configuration
28+
AZURE_BLOB_STORAGE_ACCOUNT_NAME=your-account-name
29+
AZURE_BLOB_STORAGE_CONTAINER_NAME=your-container-name
30+
```
31+
32+
## Run
33+
34+
Install dependencies:
35+
36+
```sh
37+
pip install -e .
38+
```
39+
40+
Run:
41+
42+
```sh
43+
python main.py
44+
```
45+
46+
During running, it will keep observing changes in the Amazon S3 bucket and update the index automatically.
47+
At the same time, it accepts queries from the terminal, and performs search on top of the up-to-date index.
48+
49+
50+
## CocoInsight
51+
CocoInsight is in Early Access now (Free) 😊 You found us! A quick 3 minute video tutorial about CocoInsight: [Watch on YouTube](https://youtu.be/ZnmyoHslBSc?si=pPLXWALztkA710r9).
52+
53+
Run CocoInsight to understand your RAG data pipeline:
54+
55+
```sh
56+
cocoindex server -ci main.py
57+
```
58+
59+
You can also add a `-L` flag to make the server keep updating the index to reflect source changes at the same time:
60+
61+
```sh
62+
cocoindex server -ci -L main.py
63+
```
64+
65+
Then open the CocoInsight UI at [https://cocoindex.io/cocoinsight](https://cocoindex.io/cocoinsight).
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
from dotenv import load_dotenv
2+
from psycopg_pool import ConnectionPool
3+
import cocoindex
4+
import os
5+
from typing import Any
6+
7+
8+
@cocoindex.transform_flow()
9+
def text_to_embedding(
10+
text: cocoindex.DataSlice[str],
11+
) -> cocoindex.DataSlice[list[float]]:
12+
"""
13+
Embed the text using a SentenceTransformer model.
14+
This is a shared logic between indexing and querying, so extract it as a function.
15+
"""
16+
return text.transform(
17+
cocoindex.functions.SentenceTransformerEmbed(
18+
model="sentence-transformers/all-MiniLM-L6-v2"
19+
)
20+
)
21+
22+
23+
@cocoindex.flow_def(name="AzureBlobTextEmbedding")
24+
def azure_blob_text_embedding_flow(
25+
flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
26+
) -> None:
27+
"""
28+
Define an example flow that embeds text from Azure Blob Storage into a vector database.
29+
"""
30+
account_name = os.environ["AZURE_STORAGE_ACCOUNT_NAME"]
31+
container_name = os.environ["AZURE_BLOB_CONTAINER_NAME"]
32+
prefix = os.environ.get("AZURE_BLOB_PREFIX", None)
33+
34+
data_scope["documents"] = flow_builder.add_source(
35+
cocoindex.sources.AzureBlob(
36+
account_name=account_name,
37+
container_name=container_name,
38+
prefix=prefix,
39+
included_patterns=["*.md", "*.mdx", "*.txt", "*.docx"],
40+
binary=False,
41+
)
42+
)
43+
44+
doc_embeddings = data_scope.add_collector()
45+
46+
with data_scope["documents"].row() as doc:
47+
doc["chunks"] = doc["content"].transform(
48+
cocoindex.functions.SplitRecursively(),
49+
language="markdown",
50+
chunk_size=2000,
51+
chunk_overlap=500,
52+
)
53+
54+
with doc["chunks"].row() as chunk:
55+
chunk["embedding"] = text_to_embedding(chunk["text"])
56+
doc_embeddings.collect(
57+
filename=doc["filename"],
58+
location=chunk["location"],
59+
text=chunk["text"],
60+
embedding=chunk["embedding"],
61+
)
62+
63+
doc_embeddings.export(
64+
"doc_embeddings",
65+
cocoindex.targets.Postgres(),
66+
primary_key_fields=["filename", "location"],
67+
vector_indexes=[
68+
cocoindex.VectorIndexDef(
69+
field_name="embedding",
70+
metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY,
71+
)
72+
],
73+
)
74+
75+
76+
def search(pool: ConnectionPool, query: str, top_k: int = 5) -> list[dict[str, Any]]:
77+
# Get the table name, for the export target in the azure_blob_text_embedding_flow above.
78+
table_name = cocoindex.utils.get_target_default_name(
79+
azure_blob_text_embedding_flow, "doc_embeddings"
80+
)
81+
# Evaluate the transform flow defined above with the input query, to get the embedding.
82+
query_vector = text_to_embedding.eval(query)
83+
# Run the query and get the results.
84+
with pool.connection() as conn:
85+
with conn.cursor() as cur:
86+
cur.execute(
87+
f"""
88+
SELECT filename, text, embedding <=> %s::vector AS distance
89+
FROM {table_name} ORDER BY distance LIMIT %s
90+
""",
91+
(query_vector, top_k),
92+
)
93+
return [
94+
{"filename": row[0], "text": row[1], "score": 1.0 - row[2]}
95+
for row in cur.fetchall()
96+
]
97+
98+
99+
def _main() -> None:
100+
# Initialize the database connection pool.
101+
pool = ConnectionPool(os.getenv("COCOINDEX_DATABASE_URL"))
102+
103+
azure_blob_text_embedding_flow.setup()
104+
update_stats = azure_blob_text_embedding_flow.update()
105+
print(update_stats)
106+
107+
# Run queries in a loop to demonstrate the query capabilities.
108+
while True:
109+
query = input("Enter search query (or Enter to quit): ")
110+
if query == "":
111+
break
112+
# Run the query function with the database connection pool and the query.
113+
results = search(pool, query)
114+
print("\nSearch results:")
115+
for result in results:
116+
print(f"[{result['score']:.3f}] {result['filename']}")
117+
print(f" {result['text']}")
118+
print("---")
119+
print()
120+
121+
122+
if __name__ == "__main__":
123+
load_dotenv()
124+
cocoindex.init()
125+
_main()
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[project]
2+
name = "azure-blob-text-embedding"
3+
version = "0.1.0"
4+
description = "Simple example for cocoindex: build embedding index based on Azure Blob Storage files."
5+
requires-python = ">=3.11"
6+
dependencies = ["cocoindex[embeddings]>=0.1.63", "python-dotenv>=1.0.1"]
7+
8+
[tool.setuptools]
9+
packages = []

python/cocoindex/sources.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,18 @@ class AmazonS3(op.SourceSpec):
4343
included_patterns: list[str] | None = None
4444
excluded_patterns: list[str] | None = None
4545
sqs_queue_url: str | None = None
46+
47+
48+
class AzureBlob(op.SourceSpec):
49+
"""
50+
Import data from an Azure Blob Storage container. Supports optional prefix and file filtering by glob patterns.
51+
"""
52+
53+
_op_category = op.OpCategory.SOURCE
54+
55+
account_name: str
56+
container_name: str
57+
prefix: str | None = None
58+
binary: bool = False
59+
included_patterns: list[str] | None = None
60+
excluded_patterns: list[str] | None = None

src/ops/registration.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ fn register_executor_factories(registry: &mut ExecutorFactoryRegistry) -> Result
1111
sources::local_file::Factory.register(registry)?;
1212
sources::google_drive::Factory.register(registry)?;
1313
sources::amazon_s3::Factory.register(registry)?;
14+
sources::azure_blob::Factory.register(registry)?;
1415

1516
functions::parse_json::Factory.register(registry)?;
1617
functions::split_recursively::register(registry)?;

0 commit comments

Comments
 (0)