Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def _build_vector_field_config(self) -> dict:
"data_type": "float",
"mode": "on_disk",
"compression_level": "32x",
"method": method_config,
}
log.info("Using on-disk vector configuration with compression_level: 32x")
else:
Expand Down Expand Up @@ -283,13 +284,19 @@ def insert_chunk(client_idx: int, chunk_idx: int):
other_data[self.label_col_name] = chunk_labels_data[i]
insert_data.append(other_data)

try:
resp = client.bulk(body=insert_data)
log.info(f"Client {client_idx} added {len(resp['items'])} documents")
return len(chunk_embeddings), None
except Exception as e:
log.warning(f"Client {client_idx} failed to insert data: {e!s}")
return 0, e
max_retries = 10
for attempt in range(max_retries):
try:
client.bulk(body=insert_data)
return len(chunk_embeddings), None
except Exception as e:
if "429" in str(e) and attempt < max_retries - 1:
log.warning(f"Client {client_idx} got 429 error, retry {attempt + 1}/{max_retries} after 10s")
time.sleep(10)
else:
log.warning(f"Client {client_idx} failed to insert data: {e!s}")
return 0, e
return 0, Exception("Max retries exceeded")

results = []
with ThreadPoolExecutor(max_workers=len(clients)) as executor:
Expand Down
4 changes: 0 additions & 4 deletions vectordb_bench/backend/clients/aws_opensearch/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,6 @@ def index_param(self) -> dict:
if self.engine == AWSOS_Engine.s3vector:
return {"engine": "s3vector"}

# For on-disk mode, return empty dict as no method config is needed
if self.on_disk:
return {}

parameters = {"ef_construction": self.efConstruction, "m": self.M}

# Add encoder configuration based on quantization type
Expand Down