Skip to content

Commit a118380

Browse files
authored
aws opensearch added ondisk mode and binary quantization 32x compression (#625)
* aws opensearch support on disk * add binary quantization 32x compression * modified readme for aws opensearch * format refine
1 parent cdd2a1e commit a118380

File tree

4 files changed

+100
-55
lines changed

4 files changed

+100
-55
lines changed

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,12 @@ Options:
207207
# Memory Management
208208
--cb-threshold TEXT k-NN Memory circuit breaker threshold
209209
210+
--ondisk Ondisk mode with binary quantization(32x compression)
211+
--oversample-factor Controls the degree of oversampling applied to minority classes in imbalanced datasets to improve model performance by balancing class distributions.(default 1.0)
212+
213+
210214
# Quantization Type
211-
--quantization-type TEXT which type of quantization to use valid values [fp32, fp16]
215+
--quantization-type TEXT which type of quantization to use valid values [fp32, fp16, bq]
212216
--help Show this message and exit.
213217
```
214218
### Run OceanBase from command line

vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py

Lines changed: 56 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -65,87 +65,104 @@ def __init__(
6565
self._load_graphs_to_memory(client)
6666

6767
def _create_index(self, client: OpenSearch) -> None:
68-
ef_search_value = self.case_config.ef_search
69-
log.info(f"Creating index with ef_search: {ef_search_value}")
68+
self._log_index_creation_info()
69+
self._configure_cluster_settings(client)
70+
settings = self._build_index_settings()
71+
vector_field_config = self._build_vector_field_config()
72+
mappings = self._build_mappings(vector_field_config)
73+
self._create_opensearch_index(client, settings, mappings)
74+
75+
def _log_index_creation_info(self) -> None:
76+
log.info(f"Creating index with ef_search: {self.case_config.ef_search}")
7077
log.info(f"Creating index with number_of_replicas: {self.case_config.number_of_replicas}")
71-
7278
log.info(f"Creating index with engine: {self.case_config.engine}")
7379
log.info(f"Creating index with metric type: {self.case_config.metric_type_name}")
7480
log.info(f"All case_config parameters: {self.case_config.__dict__}")
7581

82+
def _configure_cluster_settings(self, client: OpenSearch) -> None:
7683
cluster_settings_body = {
7784
"persistent": {
7885
"knn.algo_param.index_thread_qty": self.case_config.index_thread_qty,
7986
"knn.memory.circuit_breaker.limit": self.case_config.cb_threshold,
8087
}
8188
}
8289
client.cluster.put_settings(body=cluster_settings_body)
83-
settings = {
90+
91+
def _build_index_settings(self) -> dict:
92+
return {
8493
"index": {
8594
"knn": True,
8695
"number_of_shards": self.case_config.number_of_shards,
8796
"number_of_replicas": self.case_config.number_of_replicas,
8897
"translog.flush_threshold_size": self.case_config.flush_threshold_size,
8998
"knn.advanced.approximate_threshold": "-1",
99+
"knn.algo_param.ef_search": self.case_config.ef_search,
90100
},
91101
"refresh_interval": self.case_config.refresh_interval,
92102
}
93-
settings["index"]["knn.algo_param.ef_search"] = ef_search_value
94103

95-
# Get method configuration and log it for debugging
104+
def _build_vector_field_config(self) -> dict:
96105
method_config = self.case_config.index_param()
97106
log.info(f"Raw method config from index_param(): {method_config}")
98107

99-
# For s3vector engine, ensure method only contains engine field
100108
if self.case_config.engine == AWSOS_Engine.s3vector:
101109
method_config = {"engine": "s3vector"}
102-
log.info(f"Cleaned method config for s3vector: {method_config}")
103110

104-
# Prepare vector field configuration
105-
vector_field_config = {
106-
"type": "knn_vector",
107-
"dimension": self.dim,
108-
"method": method_config,
109-
}
111+
if self.case_config.on_disk:
112+
space_type = self.case_config.parse_metric()
113+
vector_field_config = {
114+
"type": "knn_vector",
115+
"dimension": self.dim,
116+
"space_type": space_type,
117+
"data_type": "float",
118+
"mode": "on_disk",
119+
"compression_level": "32x",
120+
}
121+
log.info("Using on-disk vector configuration with compression_level: 32x")
122+
else:
123+
vector_field_config = {
124+
"type": "knn_vector",
125+
"dimension": self.dim,
126+
"method": method_config,
127+
}
110128

111-
# For s3vector engine, space_type should be set at the vector field level
112-
if self.case_config.engine == AWSOS_Engine.s3vector:
129+
if self.case_config.on_disk:
130+
log.info(f"Final on-disk vector field config: {vector_field_config}")
131+
elif self.case_config.engine == AWSOS_Engine.s3vector:
113132
space_type = self.case_config.parse_metric()
114133
vector_field_config["space_type"] = space_type
115-
116-
# Ensure method config is absolutely clean for s3vector - remove any potential extra fields
117134
vector_field_config["method"] = {"engine": "s3vector"}
118-
119-
log.info(f"Setting space_type '{space_type}' at vector field level for s3vector engine")
120135
log.info(f"Final vector field config for s3vector: {vector_field_config}")
136+
else:
137+
log.info(f"Standard vector field config: {vector_field_config}")
121138

122-
# Configure mappings based on engine type
139+
return vector_field_config
140+
141+
def _build_mappings(self, vector_field_config: dict) -> dict:
123142
if self.case_config.engine == AWSOS_Engine.s3vector:
124-
# For s3vector engine, use simplified mappings without _source configuration
125143
mappings = {
126144
"properties": {
127-
# self.id_col_name: {"type": "integer", "store": True},
128145
self.label_col_name: {"type": "keyword"},
129146
self.vector_col_name: vector_field_config,
130147
},
131148
}
132149
log.info("Using simplified mappings for s3vector engine (no _source configuration)")
133150
else:
134-
# For other engines (faiss, lucene), use standard mappings with _source configuration
135151
mappings = {
136152
"_source": {"excludes": [self.vector_col_name], "recovery_source_excludes": [self.vector_col_name]},
137153
"properties": {
138-
# self.id_col_name: {"type": "integer", "store": True},
139154
self.label_col_name: {"type": "keyword"},
140155
self.vector_col_name: vector_field_config,
141156
},
142157
}
143158
log.info("Using standard mappings with _source configuration for non-s3vector engines")
159+
return mappings
160+
161+
def _create_opensearch_index(self, client: OpenSearch, settings: dict, mappings: dict) -> None:
144162
try:
145163
log.info(f"Creating index with settings: {settings}")
146164
log.info(f"Creating index with mappings: {mappings}")
147165

148-
# Additional logging for s3vector to confirm method config before sending
149166
if self.case_config.engine == AWSOS_Engine.s3vector:
150167
method_in_mappings = mappings["properties"][self.vector_col_name]["method"]
151168
log.info(f"Final method config being sent to OpenSearch: {method_in_mappings}")
@@ -155,22 +172,21 @@ def _create_index(self, client: OpenSearch) -> None:
155172
body={"settings": settings, "mappings": mappings},
156173
)
157174

158-
# For s3vector, verify the actual index configuration after creation
159175
if self.case_config.engine == AWSOS_Engine.s3vector:
160-
try:
161-
actual_mapping = client.indices.get_mapping(index=self.index_name)
162-
actual_method = actual_mapping[self.index_name]["mappings"]["properties"][self.vector_col_name][
163-
"method"
164-
]
165-
log.info(f"Actual method config in created index: {actual_method}")
166-
167-
except Exception as e:
168-
log.warning(f"Failed to verify index configuration: {e}")
176+
self._verify_s3vector_index_config(client)
169177

170178
except Exception as e:
171179
log.warning(f"Failed to create index: {self.index_name} error: {e!s}")
172180
raise e from None
173181

182+
def _verify_s3vector_index_config(self, client: OpenSearch) -> None:
183+
try:
184+
actual_mapping = client.indices.get_mapping(index=self.index_name)
185+
actual_method = actual_mapping[self.index_name]["mappings"]["properties"][self.vector_col_name]["method"]
186+
log.info(f"Actual method config in created index: {actual_method}")
187+
except Exception as e:
188+
log.warning(f"Failed to verify index configuration: {e}")
189+
174190
@contextmanager
175191
def init(self) -> None:
176192
"""connect to opensearch"""
@@ -366,11 +382,10 @@ def search_embedding(
366382
"k": k,
367383
"method_parameters": self.case_config.search_param(),
368384
**({"filter": self.filter} if self.filter else {}),
369-
**(
370-
{"rescore": {"oversample_factor": self.case_config.oversample_factor}}
371-
if self.case_config.use_quant
372-
else {}
373-
),
385+
"rescore": {"oversample_factor": self.case_config.oversample_factor}
386+
# if self.case_config.use_quant
387+
# else {}
388+
,
374389
}
375390
log.debug("Using standard knn query with method_parameters for non-s3vector engines")
376391

vectordb_bench/backend/clients/aws_opensearch/cli.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,33 @@ class AWSOpenSearchTypedDict(TypedDict):
131131
str | None,
132132
click.option(
133133
"--quantization-type",
134-
type=click.Choice(["fp32", "fp16"]),
134+
type=click.Choice(["fp32", "fp16", "bq"]),
135135
help="quantization type for vectors (in index)",
136136
default="fp32",
137137
required=False,
138138
),
139139
]
140140

141+
oversample_factor: Annotated[
142+
float,
143+
click.option(
144+
"--oversample-factor",
145+
type=float,
146+
help="Oversample factor for vector search",
147+
default=1.0,
148+
),
149+
]
150+
151+
on_disk: Annotated[
152+
bool,
153+
click.option(
154+
"--on-disk",
155+
is_flag=True,
156+
help="Enable on-disk vector storage mode",
157+
default=False,
158+
),
159+
]
160+
141161

142162
class AWSOpenSearchHNSWTypedDict(CommonTypedDict, AWSOpenSearchTypedDict, HNSWFlavor1): ...
143163

@@ -187,6 +207,8 @@ def AWSOpenSearch(**parameters: Unpack[AWSOpenSearchHNSWTypedDict]):
187207
engine=engine,
188208
quantization_type=AWSOSQuantization(parameters["quantization_type"]),
189209
metric_type_name=parameters["metric_type"],
210+
on_disk=parameters["on_disk"],
211+
oversample_factor=parameters["oversample_factor"],
190212
),
191213
**parameters,
192214
)

vectordb_bench/backend/clients/aws_opensearch/config.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class AWSOS_Engine(Enum):
4040
class AWSOSQuantization(Enum):
4141
fp32 = "fp32"
4242
fp16 = "fp16"
43+
bq = "bq"
4344

4445

4546
class AWSOpenSearchIndexConfig(BaseModel, DBCaseConfig):
@@ -63,6 +64,7 @@ class AWSOpenSearchIndexConfig(BaseModel, DBCaseConfig):
6364
use_routing: bool = False # for label-filter cases
6465
oversample_factor: float = 1.0
6566
quantization_type: AWSOSQuantization = AWSOSQuantization.fp32
67+
on_disk: bool = False
6668

6769
def __eq__(self, obj: any):
6870
return (
@@ -74,6 +76,7 @@ def __eq__(self, obj: any):
7476
and self.number_of_segments == obj.number_of_segments
7577
and self.use_routing == obj.use_routing
7678
and self.quantization_type == obj.quantization_type
79+
and self.on_disk == obj.on_disk
7780
)
7881

7982
def __hash__(self) -> int:
@@ -87,6 +90,7 @@ def __hash__(self) -> int:
8790
self.number_of_segments,
8891
self.use_routing,
8992
self.quantization_type,
93+
self.on_disk,
9094
)
9195
)
9296

@@ -116,6 +120,7 @@ def use_quant(self) -> bool:
116120
def index_param(self) -> dict:
117121
log.info(f"Using engine: {self.engine} for index creation")
118122
log.info(f"Using metric_type: {self.metric_type_name} for index creation")
123+
log.info(f"Using on_disk mode: {self.on_disk} for index creation")
119124
space_type = self.parse_metric()
120125
log.info(f"Resulting space_type: {space_type} for index creation")
121126

@@ -124,26 +129,25 @@ def index_param(self) -> dict:
124129
if self.engine == AWSOS_Engine.s3vector:
125130
return {"engine": "s3vector"}
126131

132+
# For on-disk mode, return empty dict as no method config is needed
133+
if self.on_disk:
134+
return {}
135+
127136
parameters = {"ef_construction": self.efConstruction, "m": self.M}
128137

129-
if self.engine == AWSOS_Engine.faiss and self.quantization_type == AWSOSQuantization.fp16:
130-
parameters["encoder"] = {"name": "sq", "parameters": {"type": "fp16"}}
138+
# Add encoder configuration based on quantization type
139+
if self.engine == AWSOS_Engine.faiss and self.use_quant:
140+
if self.quantization_type == AWSOSQuantization.fp16:
141+
parameters["encoder"] = {"name": "sq", "parameters": {"type": "fp16"}}
142+
elif self.quantization_type == AWSOSQuantization.bq:
143+
parameters["encoder"] = {"name": "binary", "parameters": {"bits": 1}}
131144

132145
# For other engines (faiss, lucene), space_type is set at method level
133146
return {
134147
"name": "hnsw",
135148
"engine": self.engine.value,
136149
"space_type": space_type,
137-
"parameters": {
138-
"ef_construction": self.efConstruction,
139-
"m": self.M,
140-
"ef_search": self.ef_search,
141-
**(
142-
{"encoder": {"name": "sq", "parameters": {"type": self.quantization_type.fp16.value}}}
143-
if self.use_quant
144-
else {}
145-
),
146-
},
150+
"parameters": parameters,
147151
}
148152

149153
def search_param(self) -> dict:

0 commit comments

Comments
 (0)