Skip to content

Commit f5f43a4

Browse files
committed
Initial implementation of Cassandra engine
1 parent 9849594 commit f5f43a4

File tree

13 files changed

+740
-23
lines changed

13 files changed

+740
-23
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,6 @@ NOTES.md
66

77
results/*
88
tools/custom/data.json
9+
10+
.venv
11+
*.DS_Store

engine/clients/cassandra/README.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# Apache Cassandra®
2+
3+
## Setup
4+
### Pre-requisites
5+
Adjust the configuration file [`cassandra-single-node.json`](../../../experiments/configurations/cassandra-single-node.json) to your needs. The default configuration is set to use the default Apache Cassandra® settings.
6+
### Start up the server
7+
Run the following command to start the server (alternatively, run `docker compose up -d` to run in detached mode):
8+
```bash
9+
$ docker compose up
10+
11+
[+] Running 1/1
12+
✔ cassandra Pulled 1.4s
13+
[+] Running 1/1
14+
✔ Container cassandra-benchmark Recreated 0.1s
15+
Attaching to cassandra-benchmark
16+
cassandra-benchmark | CompileCommand: dontinline org/apache/cassandra/db/Columns$Serializer.deserializeLargeSubset(Lorg/apache/cassandra/io/util/DataInputPlus;Lorg/apache/cassandra/db/Columns;I)Lorg/apache/cassandra/db/Columns; bool dontinline = true
17+
...
18+
cassandra-benchmark | INFO [main] 2025-04-04 22:27:38,592 StorageService.java:957 - Cassandra version: 5.0.3
19+
cassandra-benchmark | INFO [main] 2025-04-04 22:27:38,592 StorageService.java:958 - Git SHA: b0226c8ea122c3e5ea8680efb0744d33924fd732
20+
cassandra-benchmark | INFO [main] 2025-04-04 22:27:38,592 StorageService.java:959 - CQL version: 3.4.7
21+
...
22+
cassandra-benchmark | INFO [main] 2025-04-04 22:28:25,091 StorageService.java:3262 - Node /172.18.0.2:7000 state jump to NORMAL
23+
```
24+
> [!TIP]
25+
> Other helpful commands:
26+
> - Run `docker exec -it cassandra-benchmark cqlsh` to access the Cassandra shell.
27+
> - Run `docker compose logs --follow --tail 10` to view & follow the logs of the container running Cassandra.
28+
29+
### Start up the client benchmark
30+
Run the following command to start the client benchmark using `glove-25-angular` dataset as an example:
31+
```bash
32+
% python3 -m run --engines cassandra-single-node --datasets glove-25-angular
33+
```
34+
and you'll see the following output:
35+
```bash
36+
Running experiment: cassandra-single-node - glove-25-angular
37+
Downloading http://ann-benchmarks.com/glove-25-angular.hdf5...
38+
...
39+
Experiment stage: Configure
40+
Experiment stage: Upload
41+
1183514it [mm:ss, <...>it/s]
42+
Upload time: <...>
43+
Index is not ready yet, sleeping for 2 minutes...
44+
...
45+
Total import time: <...>
46+
Experiment stage: Search
47+
10000it [mm:ss, <...>it/s]
48+
10000it [mm:ss, <...>it/s]
49+
Experiment stage: Done
50+
Results saved to: /path/to/repository/results
51+
...
52+
```
53+
54+
> [!TIP]
55+
> If you want to see the detailed results, set environment variable `DETAILED_RESULTS=1` before running the benchmark.

engine/clients/cassandra/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from engine.clients.cassandra.configure import CassandraConfigurator
2+
from engine.clients.cassandra.search import CassandraSearcher
3+
from engine.clients.cassandra.upload import CassandraUploader
4+
5+
__all__ = [
6+
"CassandraConfigurator",
7+
"CassandraSearcher",
8+
"CassandraUploader"
9+
]

engine/clients/cassandra/config.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import os
2+
3+
CASSANDRA_KEYSPACE = os.getenv("CASSANDRA_KEYSPACE", "benchmark")
4+
CASSANDRA_TABLE = os.getenv("CASSANDRA_TABLE", "vectors")
5+
ASTRA_API_ENDPOINT = os.getenv("ASTRA_API_ENDPOINT", None)
6+
ASTRA_API_KEY = os.getenv("ASTRA_API_KEY", None)
7+
ASTRA_SCB_PATH = os.getenv("ASTRA_SCB_PATH", None)

engine/clients/cassandra/configure.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
2+
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy, ExponentialReconnectionPolicy
3+
from cassandra import ConsistencyLevel, ProtocolVersion
4+
5+
from benchmark.dataset import Dataset
6+
from engine.base_client.configure import BaseConfigurator
7+
from engine.base_client.distances import Distance
8+
from engine.clients.cassandra.config import CASSANDRA_KEYSPACE, CASSANDRA_TABLE
9+
10+
11+
class CassandraConfigurator(BaseConfigurator):
12+
SPARSE_VECTOR_SUPPORT = False
13+
DISTANCE_MAPPING = {
14+
Distance.L2: "euclidean",
15+
Distance.COSINE: "cosine",
16+
Distance.DOT: "dot_product"
17+
}
18+
19+
def __init__(self, host, collection_params: dict, connection_params: dict):
20+
super().__init__(host, collection_params, connection_params)
21+
22+
# Set up execution profiles for consistency and performance
23+
profile = ExecutionProfile(
24+
load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()),
25+
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
26+
request_timeout=60
27+
)
28+
29+
# Initialize Cassandra cluster connection
30+
self.cluster = Cluster(
31+
contact_points=[host],
32+
execution_profiles={EXEC_PROFILE_DEFAULT: profile},
33+
protocol_version=ProtocolVersion.V4,
34+
reconnection_policy=ExponentialReconnectionPolicy(base_delay=1, max_delay=60),
35+
**connection_params
36+
)
37+
self.session = self.cluster.connect()
38+
39+
def clean(self):
40+
"""Drop the keyspace if it exists"""
41+
self.session.execute(f"DROP KEYSPACE IF EXISTS {CASSANDRA_KEYSPACE}")
42+
43+
def recreate(self, dataset: Dataset, collection_params):
44+
"""Create keyspace and table for vector search"""
45+
# Create keyspace if not exists
46+
self.session.execute(
47+
f"""CREATE KEYSPACE IF NOT EXISTS {CASSANDRA_KEYSPACE}
48+
WITH REPLICATION = {{ 'class': 'SimpleStrategy', 'replication_factor': 1 }}"""
49+
)
50+
51+
# Use the keyspace
52+
self.session.execute(f"USE {CASSANDRA_KEYSPACE}")
53+
54+
# Get the distance metric
55+
distance_metric = self.DISTANCE_MAPPING.get(dataset.config.distance)
56+
vector_size = dataset.config.vector_size
57+
58+
# Create vector table
59+
# Using a simple schema that supports vector similarity search
60+
self.session.execute(
61+
f"""CREATE TABLE IF NOT EXISTS {CASSANDRA_TABLE} (
62+
id int PRIMARY KEY,
63+
embedding vector<float, {vector_size}>,
64+
metadata map<text, text>
65+
)"""
66+
)
67+
68+
# Create vector index using the appropriate distance metric
69+
self.session.execute(
70+
f"""CREATE CUSTOM INDEX IF NOT EXISTS vector_index ON {CASSANDRA_TABLE}(embedding)
71+
USING 'StorageAttachedIndex'
72+
WITH OPTIONS = {{ 'similarity_function': '{distance_metric}' }}"""
73+
)
74+
75+
# Add additional schema fields based on collection_params if needed
76+
for field_name, field_type in dataset.config.schema.items():
77+
if field_type in ["keyword", "text"]:
78+
# For text fields, we would typically add them to metadata
79+
pass
80+
elif field_type in ["int", "float"]:
81+
# For numeric fields that need separate indexing
82+
# In a real implementation, we might alter the table to add these columns
83+
pass
84+
85+
return collection_params
86+
87+
def execution_params(self, distance, vector_size) -> dict:
88+
"""Return any execution parameters needed for the dataset"""
89+
return {"normalize": distance == Distance.COSINE}
90+
91+
def delete_client(self):
92+
"""Close the Cassandra connection"""
93+
if hasattr(self, 'session') and self.session:
94+
self.session.shutdown()
95+
if hasattr(self, 'cluster') and self.cluster:
96+
self.cluster.shutdown()

engine/clients/cassandra/parser.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
from typing import Any, List, Optional
2+
3+
from engine.base_client.parser import BaseConditionParser, FieldValue
4+
5+
6+
class CassandraConditionParser(BaseConditionParser):
7+
def build_condition(
8+
self, and_subfilters: Optional[List[Any]], or_subfilters: Optional[List[Any]]
9+
) -> Optional[Any]:
10+
"""
11+
Build a CQL condition expression that combines AND and OR subfilters
12+
"""
13+
conditions = []
14+
15+
# Add AND conditions
16+
if and_subfilters and len(and_subfilters) > 0:
17+
and_conds = " AND ".join([f"({cond})" for cond in and_subfilters if cond])
18+
if and_conds:
19+
conditions.append(f"({and_conds})")
20+
21+
# Add OR conditions
22+
if or_subfilters and len(or_subfilters) > 0:
23+
or_conds = " OR ".join([f"({cond})" for cond in or_subfilters if cond])
24+
if or_conds:
25+
conditions.append(f"({or_conds})")
26+
27+
# Combine all conditions
28+
if not conditions:
29+
return None
30+
31+
return " AND ".join(conditions)
32+
33+
def build_exact_match_filter(self, field_name: str, value: FieldValue) -> Any:
34+
"""
35+
Build a CQL exact match condition for metadata fields
36+
For Cassandra, we format metadata as a map with string values
37+
"""
38+
if isinstance(value, str):
39+
return f"metadata['{field_name}'] = '{value}'"
40+
else:
41+
return f"metadata['{field_name}'] = '{str(value)}'"
42+
43+
def build_range_filter(
44+
self,
45+
field_name: str,
46+
lt: Optional[FieldValue],
47+
gt: Optional[FieldValue],
48+
lte: Optional[FieldValue],
49+
gte: Optional[FieldValue],
50+
) -> Any:
51+
"""
52+
Build a CQL range filter condition
53+
"""
54+
conditions = []
55+
56+
if lt is not None:
57+
if isinstance(lt, str):
58+
conditions.append(f"metadata['{field_name}'] < '{lt}'")
59+
else:
60+
conditions.append(f"metadata['{field_name}'] < '{str(lt)}'")
61+
62+
if gt is not None:
63+
if isinstance(gt, str):
64+
conditions.append(f"metadata['{field_name}'] > '{gt}'")
65+
else:
66+
conditions.append(f"metadata['{field_name}'] > '{str(gt)}'")
67+
68+
if lte is not None:
69+
if isinstance(lte, str):
70+
conditions.append(f"metadata['{field_name}'] <= '{lte}'")
71+
else:
72+
conditions.append(f"metadata['{field_name}'] <= '{str(lte)}'")
73+
74+
if gte is not None:
75+
if isinstance(gte, str):
76+
conditions.append(f"metadata['{field_name}'] >= '{gte}'")
77+
else:
78+
conditions.append(f"metadata['{field_name}'] >= '{str(gte)}'")
79+
80+
return " AND ".join(conditions)
81+
82+
def build_geo_filter(
83+
self, field_name: str, lat: float, lon: float, radius: float
84+
) -> Any:
85+
"""
86+
Build a CQL geo filter condition
87+
Note: Basic Cassandra doesn't have built-in geo filtering.
88+
This is a simplified approach that won't actually work without extensions.
89+
"""
90+
# In a real implementation with a geo extension, we'd implement proper geo filtering
91+
# For this benchmark, we'll return a placeholder condition that doesn't filter
92+
return "1=1" # Always true condition as a placeholder

engine/clients/cassandra/search.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import multiprocessing as mp
2+
from typing import List, Tuple
3+
4+
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
5+
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy, ExponentialReconnectionPolicy
6+
from cassandra import ConsistencyLevel, ProtocolVersion
7+
8+
from dataset_reader.base_reader import Query
9+
from engine.base_client.distances import Distance
10+
from engine.base_client.search import BaseSearcher
11+
from engine.clients.cassandra.config import CASSANDRA_KEYSPACE, CASSANDRA_TABLE
12+
from engine.clients.cassandra.parser import CassandraConditionParser
13+
14+
15+
class CassandraSearcher(BaseSearcher):
16+
search_params = {}
17+
session = None
18+
cluster = None
19+
parser = CassandraConditionParser()
20+
21+
@classmethod
22+
def init_client(cls, host, distance, connection_params: dict, search_params: dict):
23+
# Set up execution profiles for consistency and performance
24+
profile = ExecutionProfile(
25+
load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()),
26+
consistency_level=ConsistencyLevel.LOCAL_ONE, # Use LOCAL_ONE for faster reads
27+
request_timeout=60
28+
)
29+
30+
# Initialize Cassandra cluster connection
31+
cls.cluster = Cluster(
32+
contact_points=[host],
33+
execution_profiles={EXEC_PROFILE_DEFAULT: profile},
34+
reconnection_policy=ExponentialReconnectionPolicy(base_delay=1, max_delay=60),
35+
protocol_version=ProtocolVersion.V4,
36+
**connection_params
37+
)
38+
cls.session = cls.cluster.connect(CASSANDRA_KEYSPACE)
39+
cls.search_params = search_params
40+
41+
# Update prepared statements with current search parameters
42+
cls.update_prepared_statements(distance)
43+
44+
@classmethod
45+
def get_mp_start_method(cls):
46+
return "fork" if "fork" in mp.get_all_start_methods() else "spawn"
47+
48+
@classmethod
49+
def update_prepared_statements(cls, distance):
50+
"""Create prepared statements for vector searches"""
51+
# Prepare a vector similarity search query
52+
limit = cls.search_params.get("top", 10)
53+
54+
if distance == Distance.COSINE:
55+
SIMILARITY_FUNC = "similarity_cosine"
56+
elif distance == Distance.L2:
57+
SIMILARITY_FUNC = "similarity_euclidean"
58+
elif distance == Distance.DOT:
59+
SIMILARITY_FUNC = "similarity_dot_product"
60+
else:
61+
raise ValueError(f"Unsupported distance metric: {distance}")
62+
63+
cls.ann_search_stmt = cls.session.prepare(
64+
f"""SELECT id, {SIMILARITY_FUNC}(embedding, ?) as distance
65+
FROM {CASSANDRA_TABLE}
66+
ORDER BY embedding ANN OF ?
67+
LIMIT {limit}"""
68+
)
69+
70+
# Prepare a statement for filtered vector search
71+
cls.filtered_search_query_template = (
72+
f"""SELECT id, {SIMILARITY_FUNC}(embedding, ?) as distance
73+
FROM {CASSANDRA_TABLE}
74+
WHERE {{conditions}}
75+
ORDER BY embedding ANN OF ?
76+
LIMIT {limit}"""
77+
)
78+
79+
@classmethod
80+
def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]:
81+
"""Execute a vector similarity search with optional filters"""
82+
# Convert query vector to a format Cassandra can use
83+
query_vector = query.vector.tolist() if hasattr(query.vector, 'tolist') else query.vector
84+
85+
# Generate filter conditions if metadata conditions exist
86+
filter_conditions = cls.parser.parse(query.meta_conditions)
87+
88+
try:
89+
if filter_conditions:
90+
# Use the filtered search query
91+
query_with_conditions = cls.filtered_search_query_template.format(conditions=filter_conditions)
92+
results = cls.session.execute(
93+
cls.session.prepare(query_with_conditions),
94+
(query_vector, query_vector)
95+
)
96+
else:
97+
# Use the basic ANN search query
98+
results = cls.session.execute(
99+
cls.ann_search_stmt,
100+
(query_vector, query_vector)
101+
)
102+
103+
# Extract and return results
104+
return [(row.id, row.distance) for row in results]
105+
106+
except Exception as ex:
107+
print(f"Error during Cassandra vector search: {ex}")
108+
raise ex
109+
110+
@classmethod
111+
def delete_client(cls):
112+
"""Close the Cassandra connection"""
113+
if cls.session:
114+
cls.session.shutdown()
115+
if cls.cluster:
116+
cls.cluster.shutdown()

0 commit comments

Comments
 (0)