Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crewai_tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
CodeDocsSearchTool,
CodeInterpreterTool,
ComposioTool,
CouchbaseFTSVectorSearchTool,
CrewaiEnterpriseTools,
CSVSearchTool,
DallETool,
Expand Down
1 change: 1 addition & 0 deletions crewai_tools/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .code_docs_search_tool.code_docs_search_tool import CodeDocsSearchTool
from .code_interpreter_tool.code_interpreter_tool import CodeInterpreterTool
from .composio_tool.composio_tool import ComposioTool
from .couchbase_tool.couchbase_tool import CouchbaseFTSVectorSearchTool
from .crewai_enterprise_tools.crewai_enterprise_tools import CrewaiEnterpriseTools
from .csv_search_tool.csv_search_tool import CSVSearchTool
from .dalle_tool.dalle_tool import DallETool
Expand Down
62 changes: 62 additions & 0 deletions crewai_tools/tools/couchbase_tool/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# CouchbaseFTSVectorSearchTool
## Description
Couchbase is a NoSQL database with vector search capabilities. Users can store and query vector embeddings. You can learn more about Couchbase vector search here: https://docs.couchbase.com/cloud/vector-search/vector-search.html

This tool is specifically crafted for performing semantic search using Couchbase. Use this tool to find semantically similar docs to a given query.

## Installation
Install the crewai_tools package by executing the following command in your terminal:

```shell
uv pip install 'crewai[tools]'
```

## Setup
Before instantiating the tool, you need a Couchbase cluster.
- Create a cluster on [Couchbase Capella](https://docs.couchbase.com/cloud/get-started/create-account.html), Couchbase's cloud database solution.
- Create a [local Couchbase server](https://docs.couchbase.com/server/current/getting-started/start-here.html).

You will need to create a bucket, scope and collection on the cluster. Then, [follow this guide](https://docs.couchbase.com/python-sdk/current/hello-world/start-using-sdk.html) to create a Couchbase Cluster object and load documents into your collection.

Follow the docs below to create a vector search index on Couchbase.
- [Create a vector search index on Couchbase Capella.](https://docs.couchbase.com/cloud/vector-search/create-vector-search-index-ui.html)
- [Create a vector search index on your local Couchbase server.](https://docs.couchbase.com/server/current/vector-search/create-vector-search-index-ui.html)

Ensure that the `Dimension` field in the index matches the embedding model. For example, OpenAI's `text-embedding-3-small` model has an embedding dimension of 1536 dimensions, and so the `Dimension` field must be 1536 in the index.

## Example
To utilize the CouchbaseFTSVectorSearchTool for different use cases, follow these examples:

```python
from crewai_tools import CouchbaseFTSVectorSearchTool

# Instantiate a Couchbase Cluster object from the Couchbase SDK

tool = CouchbaseFTSVectorSearchTool(
cluster=cluster,
collection_name="collection",
scope_name="scope",
bucket_name="bucket",
index_name="index",
embedding_function=embed_fn
)

# Adding the tool to an agent
rag_agent = Agent(
name="rag_agent",
role="You are a helpful assistant that can answer questions with the help of the CouchbaseFTSVectorSearchTool.",
llm="gpt-4o-mini",
tools=[tool],
)
```

## Arguments
- `cluster`: An initialized Couchbase `Cluster` instance.
- `bucket_name`: The name of the Couchbase bucket.
- `scope_name`: The name of the scope within the bucket.
- `collection_name`: The name of the collection within the scope.
- `index_name`: The name of the search index (vector index).
- `embedding_function`: A function that takes a string and returns its embedding (list of floats).
- `embedding_key`: Name of the field in the search index storing the vector. (Optional, defaults to 'embedding')
- `scoped_index`: Whether the index is scoped (True) or cluster-level (False). (Optional, defaults to True)
- `limit`: The maximum number of search results to return. (Optional, defaults to 3)
254 changes: 254 additions & 0 deletions crewai_tools/tools/couchbase_tool/couchbase_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
import json
import os
from typing import Any, Optional, Type, List, Dict, Callable

try:
import couchbase.search as search
from couchbase.cluster import Cluster
from couchbase.options import SearchOptions
from couchbase.vector_search import VectorQuery, VectorSearch

COUCHBASE_AVAILABLE = True
except ImportError:
COUCHBASE_AVAILABLE = False
search = Any
Cluster = Any
SearchOptions = Any
VectorQuery = Any
VectorSearch = Any

from crewai.tools import BaseTool
from pydantic import BaseModel, Field, SkipValidation


class CouchbaseToolSchema(BaseModel):
"""Input for CouchbaseTool."""

query: str = Field(
...,
description="The query to search retrieve relevant information from the Couchbase database. Pass only the query, not the question.",
)

class CouchbaseFTSVectorSearchTool(BaseTool):
"""Tool to search the Couchbase database"""

model_config = {"arbitrary_types_allowed": True}
name: str = "CouchbaseFTSVectorSearchTool"
description: str = "A tool to search the Couchbase database for relevant information on internal documents."
args_schema: Type[BaseModel] = CouchbaseToolSchema
cluster: SkipValidation[Optional[Cluster]] = None
collection_name: Optional[str] = None,
scope_name: Optional[str] = None,
bucket_name: Optional[str] = None,
index_name: Optional[str] = None,
embedding_key: Optional[str] = Field(
default="embedding",
description="Name of the field in the search index that stores the vector"
)
scoped_index: Optional[bool] = Field(
default=True,
description="Specify whether the index is scoped. Is True by default."
),
limit: Optional[int] = Field(default=3)
embedding_function: SkipValidation[Callable[[str], List[float]]] = Field(
default=None,
description="A function that takes a string and returns a list of floats. This is used to embed the query before searching the database."
)

def _check_bucket_exists(self) -> bool:
"""Check if the bucket exists in the linked Couchbase cluster"""
bucket_manager = self.cluster.buckets()
try:
bucket_manager.get_bucket(self.bucket_name)
return True
except Exception:
return False

def _check_scope_and_collection_exists(self) -> bool:
"""Check if the scope and collection exists in the linked Couchbase bucket
Raises a ValueError if either is not found"""
scope_collection_map: Dict[str, Any] = {}

# Get a list of all scopes in the bucket
for scope in self._bucket.collections().get_all_scopes():
scope_collection_map[scope.name] = []

# Get a list of all the collections in the scope
for collection in scope.collections:
scope_collection_map[scope.name].append(collection.name)

# Check if the scope exists
if self.scope_name not in scope_collection_map.keys():
raise ValueError(
f"Scope {self.scope_name} not found in Couchbase "
f"bucket {self.bucket_name}"
)

# Check if the collection exists in the scope
if self.collection_name not in scope_collection_map[self.scope_name]:
raise ValueError(
f"Collection {self.collection_name} not found in scope "
f"{self.scope_name} in Couchbase bucket {self.bucket_name}"
)

return True

def _check_index_exists(self) -> bool:
"""Check if the Search index exists in the linked Couchbase cluster
Raises a ValueError if the index does not exist"""
if self.scoped_index:
all_indexes = [
index.name for index in self._scope.search_indexes().get_all_indexes()
]
if self.index_name not in all_indexes:
raise ValueError(
f"Index {self.index_name} does not exist. "
" Please create the index before searching."
)
else:
all_indexes = [
index.name for index in self.cluster.search_indexes().get_all_indexes()
]
if self.index_name not in all_indexes:
raise ValueError(
f"Index {self.index_name} does not exist. "
" Please create the index before searching."
)

return True

def __init__(self, **kwargs):
"""Initialize the CouchbaseFTSVectorSearchTool.

Args:
**kwargs: Keyword arguments to pass to the BaseTool constructor and
to configure the Couchbase connection and search parameters.
Requires 'cluster', 'bucket_name', 'scope_name',
'collection_name', 'index_name', and 'embedding_function'.

Raises:
ValueError: If required parameters are missing, the Couchbase cluster
cannot be reached, or the specified bucket, scope,
collection, or index does not exist.
ImportError: If the 'couchbase' package is not installed and the user
chooses not to install it.
"""
super().__init__(**kwargs)
if COUCHBASE_AVAILABLE:
try:
if not self.cluster:
raise ValueError("Cluster instance must be provided")

if not self.bucket_name:
raise ValueError("Bucket name must be provided")

if not self.scope_name:
raise ValueError("Scope name must be provided")

if not self.collection_name:
raise ValueError("Collection name must be provided")

if not self.index_name:
raise ValueError("Index name must be provided")

if not self.embedding_function:
raise ValueError("Embedding function must be provided")

self._bucket = self.cluster.bucket(self.bucket_name)
self._scope = self._bucket.scope(self.scope_name)
self._collection = self._scope.collection(self.collection_name)
except Exception as e:
raise ValueError(
"Error connecting to couchbase. "
"Please check the connection and credentials"
) from e

# check if bucket exists
if not self._check_bucket_exists():
raise ValueError(
f"Bucket {self.bucket_name} does not exist. "
" Please create the bucket before searching."
)

self._check_scope_and_collection_exists()
self._check_index_exists()
else:
import click

if click.confirm(
"The 'couchbase' package is required to use the CouchbaseFTSVectorSearchTool. "
"Would you like to install it?"
):
import subprocess

subprocess.run(["uv", "add", "couchbase"], check=True)
else:
raise ImportError(
"The 'couchbase' package is required to use the CouchbaseFTSVectorSearchTool. "
"Please install it with: uv add couchbase"
)

def _run(self, query: str) -> str:
"""Execute a vector search query against the Couchbase index.

Args:
query: The search query string.

Returns:
A JSON string containing the search results.

Raises:
ImportError: If the 'couchbase' package is not installed.
ValueError: If the search query fails or returns results without fields.
"""
if not COUCHBASE_AVAILABLE:
raise ImportError(
"You are missing the 'couchbase' package. Would you like to install it?"
)

query_embedding = self.embedding_function(query)
fields = ["*"]

search_req = search.SearchRequest.create(
VectorSearch.from_vector_query(
VectorQuery(
self.embedding_key,
query_embedding,
self.limit
)
)
)

try:
if self.scoped_index:
search_iter = self._scope.search(
self.index_name,
search_req,
SearchOptions(
limit=self.limit,
fields=fields,
)
)
else:
search_iter = self.cluster.search(
self.index_name,
search_req,
SearchOptions(
limit=self.limit,
fields=fields
)
)

json_response = ""

for row in search_iter.rows():
if row.fields:
json_response += json.dumps(row.fields, indent=2)
else:
raise ValueError(
"Search results do not contain the fields from the document."
)
except Exception as e:
raise ValueError(f"Search failed with error: {e}")

return json_response
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ apify = [
databricks-sdk = [
"databricks-sdk>=0.46.0",
]
couchbase = [
"couchbase>=4.3.5",
]
mcp = [
"mcp>=1.6.0",
"mcpadapt>=0.1.3",
Expand Down
Loading