Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@
*
* <ul>
* <li>{@link #SEMANTIC}: Use dense vector embeddings and similarity search.
* <li>{@link #KEYWORD}: Use keyword or lexical search when supported by the store.
* <li>{@link #HYBRID}: Combine semantic and keyword search strategies.
* </ul>
*/
public enum VectorStoreQueryMode {
/** Semantic similarity search using embeddings. */
SEMANTIC,
/** Keyword/lexical search (store dependent). */
KEYWORD,
/** Hybrid search combining semantic and keyword results. */
HYBRID;
/** Keyword/lexical search (store dependent). TODO: term-based retrieval */
// KEYWORD,
/** Hybrid search combining semantic and keyword results. TODO: semantic + keyword retrieval */
// HYBRID;
}
246 changes: 234 additions & 12 deletions docs/content/docs/development/vector_stores.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ under the License.

# Vector Stores

{{< hint info >}}
Vector stores are currently supported in the Python API only. Java API support is planned for future releases.
{{< /hint >}}

{{< hint info >}}
This page covers semantic search using vector stores. Additional query modes (keyword, hybrid) are planned for future releases.
{{< /hint >}}
Expand All @@ -50,43 +46,99 @@ To use vector stores in your agents, you need to configure both a vector store a

Flink Agents provides decorators to simplify vector store setup within agents:

{{< tabs "Resource Decorators" >}}

{{< tab "Python" >}}

#### @vector_store

The `@vector_store` decorator marks a method that creates a vector store. Vector stores automatically integrate with embedding models for text-based search.

{{< /tab >}}

{{< tab "Java" >}}

#### @VectorStore

The `@VectorStore` annotation marks a method that creates a vector store.

{{< /tab >}}

{{< /tabs >}}

### Query Objects

Vector stores use structured query objects for consistent interfaces:

{{< tabs "Query Objects" >}}

{{< tab "Python" >}}

```python
# Create a semantic search query
query = VectorStoreQuery(
mode=VectorStoreQueryMode.SEMANTIC,
query_text="What is Apache Flink Agents?",
limit=3
)
```

### Query Results
{{< /tab >}}

When you execute a query, you receive a `VectorStoreQueryResult` object that contains the search results:
{{< tab "Java" >}}

```python
# Execute the query
result = vector_store.query(query)
```java
// Create a semantic search query
VectorStoreQuery query = new VectorStoreQuery(
"What is Apache Flink Agents?", // query text
3 // limit
);
```

{{< /tab >}}

{{< /tabs >}}

### Query Results

When you execute a query, you receive a `VectorStoreQueryResult` object that contains the search results:

The `VectorStoreQueryResult` contains:
- **documents**: A list of `Document` objects representing the retrieved results
- Each `Document` has:
- **content**: The actual text content of the document
- **metadata**: Associated metadata (source, category, timestamp, etc.)
- **id**: Unique identifier of the document (if available)

{{< tabs "Query Results" >}}

{{< tab "Python" >}}

```python
# Execute the query
result = vector_store.query(query)
```

{{< /tab >}}

{{< tab "Java" >}}

```java
// Execute the query
VectorStoreQueryResult result = vectorStore.query(query);
```

{{< /tab >}}

{{< /tabs >}}

### Usage Example

Here's how to define and use vector stores in your agent:

{{< tabs "Usage Example" >}}

{{< tab "Python" >}}

```python
class MyAgent(Agent):

Expand Down Expand Up @@ -127,7 +179,6 @@ class MyAgent(Agent):
# Create a semantic search query
user_query = str(event.input)
query = VectorStoreQuery(
mode=VectorStoreQueryMode.SEMANTIC,
query_text=user_query,
limit=3
)
Expand All @@ -139,12 +190,73 @@ class MyAgent(Agent):
# Process the retrieved context as needed for your use case
```

{{< /tab >}}

{{< tab "Java" >}}

```java
public class MyAgent extends Agent {

@EmbeddingModelConnection
public static ResourceDescriptor embeddingConnection() {
return ResourceDescriptor.Builder.newBuilder(OpenAIEmbeddingModelConnection.class.getName())
.addInitialArgument("api_key", "your-api-key-here")
.build();
}

@EmbeddingModelSetup
public static ResourceDescriptor embeddingModel() {
return ResourceDescriptor.Builder.newBuilder(OpenAIEmbeddingModelSetup.class.getName())
.addInitialArgument("connection", "embeddingConnection")
.addInitialArgument("model", "text-embedding-3-small")
.build();
}

@VectorStore
public static ResourceDescriptor vectorStore() {
return ResourceDescriptor.Builder.newBuilder(ElasticsearchVectorStore.class.getName())
.addInitialArgument("embedding_model", "embeddingModel")
.addInitialArgument("host", "http://localhost:9200")
.addInitialArgument("index", "my_documents")
.addInitialArgument("vector_field", "content_vector")
.addInitialArgument("dims", 1536)
.build();
}

@Action(listenEvents = InputEvent.class)
public static void searchDocuments(InputEvent event, RunnerContext ctx) {
// Option 1: Manual search via the vector store
VectorStore vectorStore = (VectorStore) ctx.getResource("vectorStore", ResourceType.VECTOR_STORE);
String queryText = (String) event.getInput();
VectorStoreQuery query = new VectorStoreQuery(queryText, 3);
VectorStoreQueryResult result = vectorStore.query(query);

// Option 2: Request context retrieval via built-in events
ctx.sendEvent(new ContextRetrievalRequestEvent(queryText, "vectorStore"));
}

@Action(listenEvents = ContextRetrievalResponseEvent.class)
public static void onSearchResponse(ContextRetrievalResponseEvent event, RunnerContext ctx) {
List<Document> documents = event.getDocuments();
// Process the retrieved documents...
}
}
```

{{< /tab >}}

{{< /tabs >}}

## Built-in Providers

### Chroma

[Chroma](https://www.trychroma.com/home) is an open-source vector database that provides efficient storage and querying of embeddings with support for multiple deployment modes.

{{< hint info >}}
Chroma is currently supported in the Python API only.
{{< /hint >}}

#### Prerequisites

1. Install ChromaDB: `pip install chromadb`
Expand All @@ -169,6 +281,10 @@ class MyAgent(Agent):

#### Usage Example

{{< tabs "Chroma Usage Example" >}}

{{< tab "Python" >}}

```python
class MyAgent(Agent):

Expand Down Expand Up @@ -208,6 +324,10 @@ class MyAgent(Agent):
...
```

{{< /tab >}}

{{< /tabs >}}

#### Deployment Modes

ChromaDB supports multiple deployment modes:
Expand Down Expand Up @@ -265,6 +385,66 @@ def chroma_store() -> ResourceDescriptor:
)
```

### Elasticsearch

[Elasticsearch](https://www.elastic.co/elasticsearch/) is a distributed, RESTful search and analytics engine that supports vector search through dense vector fields and K-Nearest Neighbors (KNN).

{{< hint info >}}
Elasticsearch is currently supported in the Java API only.
{{< /hint >}}

#### Prerequisites

1. An Elasticsearch cluster (version 8.0 or later for KNN support).
2. An index with a `dense_vector` field.

#### ElasticsearchVectorStore Parameters

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `embedding_model` | str | Required | Reference to embedding model resource name |
| `index` | str | Required | Target Elasticsearch index name |
| `vector_field` | str | Required | Name of the dense vector field used for KNN |
| `dims` | int | `768` | Vector dimensionality |
| `k` | int | None | Number of nearest neighbors to return; can be overridden per query |
| `num_candidates` | int | None | Candidate set size for ANN search; can be overridden per query |
| `filter_query` | str | None | Raw JSON Elasticsearch filter query (DSL) applied as a post-filter |
| `host` | str | `"http://localhost:9200"` | Elasticsearch endpoint |
| `hosts` | str | None | Comma-separated list of Elasticsearch endpoints |
| `username` | str | None | Username for basic authentication |
| `password` | str | None | Password for basic authentication |
| `api_key_base64` | str | None | Base64-encoded API key for authentication |
| `api_key_id` | str | None | API key ID for authentication |
| `api_key_secret` | str | None | API key secret for authentication |

#### Usage Example

{{< tabs "Elasticsearch Usage Example" >}}

{{< tab "Java" >}}

Here's how to define an Elasticsearch vector store in your Java agent:

```java
@VectorStore
public static ResourceDescriptor vectorStore() {
return ResourceDescriptor.Builder.newBuilder(ElasticsearchVectorStore.class.getName())
.addInitialArgument("embedding_model", "embeddingModel")
.addInitialArgument("host", "http://localhost:9200")
.addInitialArgument("index", "my_documents")
.addInitialArgument("vector_field", "content_vector")
.addInitialArgument("dims", 1536)
// Optional authentication
// .addInitialArgument("username", "elastic")
// .addInitialArgument("password", "secret")
.build();
}
```

{{< /tab >}}

{{< /tabs >}}

## Custom Providers

{{< hint warning >}}
Expand All @@ -277,6 +457,10 @@ If you want to use vector stores not offered by the built-in providers, you can

The base class handles text-to-vector conversion and provides the high-level query interface. You only need to implement the core vector search functionality.

{{< tabs "Custom Vector Store" >}}

{{< tab "Python" >}}

```python
class MyVectorStore(BaseVectorStore):
# Add your custom configuration fields here
Expand All @@ -294,4 +478,42 @@ class MyVectorStore(BaseVectorStore):
# - kwargs: Vector store-specific parameters
# - Returns: List of Document objects matching the search criteria
pass
```
```

{{< /tab >}}

{{< tab "Java" >}}

```java
public class MyVectorStore extends BaseVectorStore {

public MyVectorStore(
ResourceDescriptor descriptor,
BiFunction<String, ResourceType, Resource> getResource) {
super(descriptor, getResource);
}

@Override
public Map<String, Object> getStoreKwargs() {
// Return vector store-specific configuration
// These parameters are merged with query-specific parameters
Map<String, Object> kwargs = new HashMap<>();
kwargs.put("index", "my_index");
return kwargs;
}

@Override
public List<Document> queryEmbedding(float[] embedding, int limit, Map<String, Object> args) {
// Core method: perform vector search using pre-computed embedding
// - embedding: Pre-computed embedding vector for semantic search
// - limit: Maximum number of results to return
// - args: Vector store-specific parameters
// - Returns: List of Document objects matching the search criteria
return null;
}
}
```

{{< /tab >}}

{{< /tabs >}}