Skip to content

Conversation

@OVI3D0
Copy link
Member

@OVI3D0 OVI3D0 commented Jan 22, 2026

Description

Sets up abstraction layer in OSB and moves OS client/runner logic into appropriate folders

The goal is to set up OSB that can allow it to support any DB that supports similar operations. This PR creates the package structure, defines interfaces and contracts, refactors the existing OS code into the new structure, and updates imports. There are no functional changes to OSB here, just a reorganization of the existing code.

Here we introduce:
registry.py, which defines the pattern for managing different DB's and their factories
factory.py, which is the interface for creating DB clients without needing to know which implementation to use
A couple classes in clients/opensearch/opensearch.py, which just wrap the current opensearchpy client so it conforms to the new DatabaseClient interface as well as provide an OpenSearchclientFactory class to create the OS clients.

Switches around a few things like checking for DB when loading config, client factory selection with a default of OS client, removes hardcoding for using OS client, skips telemetry for non OS clients.

Issues Resolved

#1006

Testing

  • New functionality includes testing

Tested with big5 workload to ensure no breaking changes


By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@coderabbitai
Copy link

coderabbitai bot commented Jan 22, 2026

📝 Walkthrough

Walkthrough

Introduces a database abstraction layer with interface definitions, registry, and factory pattern to support multiple database backends. Implements comprehensive OpenSearch client integration with REST and gRPC support. Adds placeholder implementations for Milvus and Vespa. Updates worker coordinator to dynamically select database clients. Rebinds import paths across the codebase to use new database module structure.

Changes

Cohort / File(s) Summary
Database Core Abstraction
osbenchmark/database/interface.py, osbenchmark/database/registry.py, osbenchmark/database/factory.py, osbenchmark/database/__init__.py
New abstract DatabaseClient interface with IndicesNamespace, ClusterNamespace, TransportNamespace, NodesNamespace; DatabaseType enum (OPENSEARCH, MILVUS, VESPA) with registry; DatabaseClientFactory to instantiate backend-specific factories with database type detection and validation.
OpenSearch Client Implementation
osbenchmark/database/clients/opensearch/opensearch.py, osbenchmark/database/clients/opensearch/__init__.py
Comprehensive OpenSearch client with UnifiedClient aggregating REST and gRPC stubs, OpenSearchDatabaseClient conforming to DatabaseClient interface, OpenSearchClientFactory supporting sync/async creation, SSL/TLS/auth handling, namespace wrappers (Indices, Cluster, Transport, Nodes), gRPC factory, and Kafka message producer factory.
Database Client Package Scaffolding
osbenchmark/database/clients/__init__.py, osbenchmark/database/clients/milvus/__init__.py, osbenchmark/database/clients/milvus/milvus.py, osbenchmark/database/clients/vespa/__init__.py, osbenchmark/database/clients/vespa/vespa.py
License headers and placeholder module docstrings for Milvus and Vespa client implementations (future development).
Import Path Updates - Public Exports
it/__init__.py, osbenchmark/builder/builder.py
Rebind client symbol from generic osbenchmark.client to opensearch-specific import (osbenchmark.database.clients.opensearch.opensearch as client).
Import Path Updates - Worker Coordination
osbenchmark/worker_coordinator/runner.py, osbenchmark/worker_coordinator/worker_coordinator.py
Add DatabaseClientFactory, DatabaseType, and get_client_factory imports; extend worker coordinator with multi-database support via factory pattern, dynamic database type detection from config, conditional telemetry setup (OpenSearch-only), generalized REST API readiness checks.
Import Path Updates - Other
osbenchmark/workload_generator/workload_generator.py
Update OsClientFactory import path from osbenchmark.client to osbenchmark.database.clients.opensearch.opensearch.
Test Import Updates
tests/client_test.py, tests/kafka_client_test.py, tests/worker_coordinator/runner_test.py, tests/worker_coordinator/worker_coordinator_test.py
Rebind client and factory imports to new module paths (osbenchmark.database.clients.opensearch.opensearch); update logger references and patch paths accordingly.

Sequence Diagrams

sequenceDiagram
    participant PrepareBenchmark
    participant Registry as Database Registry
    participant Factory as DatabaseClientFactory
    participant OSFactory as OpenSearchClientFactory
    participant Client as OpenSearchDatabaseClient

    PrepareBenchmark->>Registry: get_client_factory(database_type)
    Registry-->>PrepareBenchmark: OpenSearchClientFactory
    PrepareBenchmark->>Factory: create_client_factory(database_type, hosts, options)
    Factory->>Registry: get_client_factory(database_type)
    Registry-->>Factory: OpenSearchClientFactory
    Factory->>OSFactory: __init__(hosts, client_options)
    OSFactory-->>Factory: factory instance
    Factory-->>PrepareBenchmark: OpenSearchClientFactory instance
    PrepareBenchmark->>OSFactory: create()
    OSFactory->>Client: __init__(opensearch_client, namespaces)
    Client-->>OSFactory: initialized
    OSFactory-->>PrepareBenchmark: OpenSearchDatabaseClient
    PrepareBenchmark->>Client: bulk/index/search/health operations
Loading
sequenceDiagram
    participant App as Application
    participant Interface as DatabaseClient Interface
    participant OS as OpenSearchDatabaseClient
    participant REST as OpenSearch REST
    participant gRPC as gRPC Stubs

    App->>OS: indices.create(index)
    OS->>Interface: delegate to indices property
    Interface->>OS: OpenSearchIndicesNamespace
    OS->>REST: perform create request
    REST-->>OS: response
    OS-->>App: Dict result

    App->>OS: search(index, body)
    OS->>REST: perform search
    REST-->>OS: response
    OS-->>App: Dict result

    App->>OS: document_service (via UnifiedClient)
    OS->>gRPC: return document service stub
    gRPC-->>App: gRPC stub
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 51.02% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Add database abstraction layer for multi-database support' clearly and concisely summarizes the main objective of the PR: introducing a database abstraction layer to enable multi-database support. This directly reflects the primary change described in the summary.
Description check ✅ Passed The description is comprehensive and directly related to the changeset. It explains the motivation (supporting any DB with similar operations), what is being introduced (registry, factory, interface classes, and wrappers), refactoring details, and confirms no functional changes. It also references issue #1006, testing approach, and Apache 2.0 license compliance.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

@OVI3D0 OVI3D0 force-pushed the feature/database-abstraction branch from e5038b9 to 7e0ceea Compare January 22, 2026 20:39
@OVI3D0
Copy link
Member Author

OVI3D0 commented Jan 22, 2026

@coderabbitai review

@coderabbitai
Copy link

coderabbitai bot commented Jan 22, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🤖 Fix all issues with AI agents
In `@osbenchmark/database/clients/opensearch/opensearch.py`:
- Around line 349-363: The methods document_service and search_service assume
self._grpc_stubs is iterable and check "if cluster_name in self._grpc_stubs",
which will raise TypeError when _grpc_stubs is None; add a null/emptiness check
first (e.g., if not self._grpc_stubs: raise the same SystemSetupError) or ensure
_grpc_stubs is a dict before using membership, then proceed to check
cluster_name in self._grpc_stubs and return the stub or raise SystemSetupError;
apply this change in both document_service and search_service.
- Around line 234-235: The debug statement inside the retry loop is misleading
because it logs success before any check; update the logger.debug call inside
the for attempt in range(max_attempts) loop (referencing variables attempt,
max_attempts and logger.debug) to indicate it's an attempt/health-check (e.g.,
"Attempt %s/%s: checking REST API availability") and consider using attempt+1
for human-readable attempt counts so the message reflects an in-progress check
rather than a successful result.
- Around line 365-373: The __del__ method can raise AttributeError when
self._grpc_stubs is None; update __del__ in opensearch.py to first check whether
self._grpc_stubs is truthy (or is a dict) before iterating
self._grpc_stubs.values(), e.g., skip the loop if _grpc_stubs is None, then
proceed to safely close any '_channel' in cluster_stubs and call
self._opensearch.close() (keeping the existing try/except around
channel.close()), so __del__ never assumes _grpc_stubs is a dict and avoids
iterating None.
- Around line 318-322: The gRPC client currently uses
grpc.aio.insecure_channel(…) which sends unencrypted traffic; update the code
around grpc.aio.insecure_channel, grpc_addr and self.grpc_channel_options to
support TLS by creating grpc.ssl_channel_credentials (using server CA cert,
optional client cert/key and hostname checking) and using
grpc.aio.secure_channel when TLS config is provided, while preserving the
existing insecure_channel fallback when no TLS files/config are supplied; add
configuration parsing to accept certificate paths and apply them to channel
creation so gRPC mirrors the REST client's certificate verification and
client-auth behavior.

In `@osbenchmark/database/factory.py`:
- Around line 71-78: The except block handling database_type should suppress
exception chaining by re-raising the new ValueError using "from None"; update
the raise in the except that currently constructs the message with DatabaseType
and valid_types so it becomes raise ValueError(...) from None to avoid including
the original ValueError traceback when creating db_enum from
database_type.lower().

In `@osbenchmark/worker_coordinator/worker_coordinator.py`:
- Around line 49-54: The import line that forces side-effect registration uses
an unnecessary `noqa: F401` comment; edit the `import osbenchmark.database  #
noqa: F401  # pylint: disable=unused-import` statement in worker_coordinator.py
to remove only the `noqa: F401` portion while keeping the `# pylint:
disable=unused-import` comment so the side-effect import remains and pylint
suppression stays intact.
- Around line 1020-1027: Remove the unused database parameter from the
wait_for_rest_api method signature and from all its call sites: change "def
wait_for_rest_api(self, database)" to "def wait_for_rest_api(self)" and ensure
callers no longer pass a database argument; keep the internal logic that uses
self._default_client_factory.wait_for_rest_layer() and preserve the existing
logging and exception behavior in wait_for_rest_api.
🧹 Nitpick comments (5)
osbenchmark/database/clients/vespa/__init__.py (1)

1-5: Consider adding a module docstring for consistency.

Other __init__.py files in this PR include a docstring (e.g., """Database client implementations for OpenSearch Benchmark."""). Adding one here would maintain consistency across the package structure.

 # SPDX-License-Identifier: Apache-2.0
 #
 # The OpenSearch Contributors require contributions made to
 # this file be licensed under the Apache-2.0 license or a
 # compatible open source license.
+
+"""Vespa client implementation (placeholder for future development)."""
osbenchmark/database/__init__.py (1)

68-71: Consider sorting __all__ for consistency.

Static analysis flags that __all__ is not sorted. This is a minor style consideration.

Suggested fix
 __all__ = [
-    'DatabaseType',
     'DatabaseClientFactory',
+    'DatabaseType',
 ]
osbenchmark/database/clients/opensearch/opensearch.py (3)

33-35: Consider lazy-loading gRPC dependencies.

These gRPC imports at module level will cause an ImportError if the grpc or opensearch.protobufs packages are not installed, preventing the entire module from loading—even for users who only need REST client functionality.

Consider lazy-loading these imports inside GrpcClientFactory or guarding with a try/except to make gRPC truly optional.

♻️ Suggested approach
-import grpc
-from opensearch.protobufs.services.document_service_pb2_grpc import DocumentServiceStub
-from opensearch.protobufs.services.search_service_pb2_grpc import SearchServiceStub
+# gRPC imports are lazy-loaded in GrpcClientFactory to keep them optional

Then in GrpcClientFactory.create_grpc_stubs():

def create_grpc_stubs(self):
    # pylint: disable=import-outside-toplevel
    import grpc
    from opensearch.protobufs.services.document_service_pb2_grpc import DocumentServiceStub
    from opensearch.protobufs.services.search_service_pb2_grpc import SearchServiceStub
    # ... rest of the method

243-265: Improve exception chaining and re-raising.

Several exception handling patterns can be improved:

  1. Line 245: Use raise ... from e to preserve the exception chain.
  2. Lines 262 and 265: Use bare raise instead of raise e to preserve the original traceback.
♻️ Proposed fix
         except opensearchpy.ConnectionError as e:
             if "SSL: UNKNOWN_PROTOCOL" in str(e):
-                raise exceptions.SystemSetupError("Could not connect to cluster via https. Is this an https endpoint?", e)
+                raise exceptions.SystemSetupError(
+                    "Could not connect to cluster via https. Is this an https endpoint?"
+                ) from e
             else:
                 logger.debug("Got connection error on attempt [%s]. Sleeping...", attempt)
                 time.sleep(3)
                 except Exception as e:
                     logger.warning("Encountered exception %s when attempting to probe endpoint health", e)
-                    raise e
+                    raise
             else:
                 logger.warning("Got unexpected status code [%s] on attempt [%s].", e.status_code, attempt)
-                raise e
+                raise

406-417: Consider adding a comment explaining the late import.

The import at line 410-417 is placed mid-file to avoid circular imports. While the pylint disable comment hints at this, a brief explanation would help future maintainers understand why this deviates from the standard import-at-top convention.

♻️ Suggested comment
 # ============================================================================
 # DatabaseClient Interface Implementation for OpenSearch
 # ============================================================================

-# pylint: disable=wrong-import-position
+# Import placed here to avoid circular dependency with interface.py
+# pylint: disable=wrong-import-position
 from osbenchmark.database.interface import (
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 232afd3 and 7e0ceea.

📒 Files selected for processing (20)
  • it/__init__.py
  • osbenchmark/builder/builder.py
  • osbenchmark/database/__init__.py
  • osbenchmark/database/clients/__init__.py
  • osbenchmark/database/clients/milvus/__init__.py
  • osbenchmark/database/clients/milvus/milvus.py
  • osbenchmark/database/clients/opensearch/__init__.py
  • osbenchmark/database/clients/opensearch/opensearch.py
  • osbenchmark/database/clients/vespa/__init__.py
  • osbenchmark/database/clients/vespa/vespa.py
  • osbenchmark/database/factory.py
  • osbenchmark/database/interface.py
  • osbenchmark/database/registry.py
  • osbenchmark/worker_coordinator/runner.py
  • osbenchmark/worker_coordinator/worker_coordinator.py
  • osbenchmark/workload_generator/workload_generator.py
  • tests/client_test.py
  • tests/kafka_client_test.py
  • tests/worker_coordinator/runner_test.py
  • tests/worker_coordinator/worker_coordinator_test.py
🧰 Additional context used
🧬 Code graph analysis (13)
osbenchmark/database/clients/opensearch/__init__.py (1)
osbenchmark/database/clients/opensearch/opensearch.py (2)
  • opensearch (376-378)
  • OpenSearchClientFactory (594-657)
osbenchmark/workload_generator/workload_generator.py (1)
osbenchmark/database/clients/opensearch/opensearch.py (2)
  • opensearch (376-378)
  • OsClientFactory (43-218)
osbenchmark/database/registry.py (1)
osbenchmark/worker_coordinator/runner.py (1)
  • get (2608-2613)
tests/client_test.py (3)
osbenchmark/database/clients/opensearch/opensearch.py (1)
  • opensearch (376-378)
osbenchmark/client.py (1)
  • opensearch (376-378)
osbenchmark/__init__.py (1)
  • doc_link (86-87)
osbenchmark/builder/builder.py (2)
osbenchmark/database/clients/opensearch/opensearch.py (1)
  • opensearch (376-378)
osbenchmark/client.py (1)
  • opensearch (376-378)
tests/worker_coordinator/runner_test.py (2)
osbenchmark/database/clients/opensearch/opensearch.py (1)
  • opensearch (376-378)
osbenchmark/client.py (1)
  • opensearch (376-378)
osbenchmark/database/interface.py (1)
osbenchmark/database/clients/opensearch/opensearch.py (22)
  • create (169-175)
  • create (271-282)
  • create (390-392)
  • create (426-427)
  • create (628-640)
  • index (553-561)
  • delete (429-430)
  • exists (432-433)
  • refresh (435-436)
  • stats (438-439)
  • stats (492-493)
  • perform_request (472-479)
  • info (495-496)
  • info (572-574)
  • indices (528-529)
  • cluster (532-533)
  • transport (536-537)
  • nodes (540-541)
  • bulk (543-551)
  • search (563-570)
  • return_raw_response (576-579)
  • close (581-584)
tests/kafka_client_test.py (1)
osbenchmark/database/clients/opensearch/opensearch.py (2)
  • opensearch (376-378)
  • MessageProducerFactory (269-282)
osbenchmark/database/__init__.py (3)
osbenchmark/database/registry.py (2)
  • register_database (48-59)
  • DatabaseType (37-41)
osbenchmark/database/factory.py (1)
  • DatabaseClientFactory (37-116)
osbenchmark/database/clients/opensearch/opensearch.py (2)
  • opensearch (376-378)
  • OpenSearchClientFactory (594-657)
osbenchmark/database/factory.py (2)
osbenchmark/database/registry.py (2)
  • DatabaseType (37-41)
  • get_client_factory (62-72)
osbenchmark/exceptions.py (1)
  • SystemSetupError (60-63)
osbenchmark/worker_coordinator/runner.py (1)
osbenchmark/context.py (1)
  • RequestContextHolder (77-140)
it/__init__.py (2)
osbenchmark/database/clients/opensearch/opensearch.py (1)
  • opensearch (376-378)
osbenchmark/client.py (1)
  • opensearch (376-378)
osbenchmark/worker_coordinator/worker_coordinator.py (2)
osbenchmark/database/factory.py (2)
  • DatabaseClientFactory (37-116)
  • create_client_factory (47-88)
osbenchmark/database/registry.py (2)
  • DatabaseType (37-41)
  • get_client_factory (62-72)
🪛 Ruff (0.14.13)
osbenchmark/database/interface.py

168-172: DatabaseClient.return_raw_response is an empty method in an abstract base class, but has no abstract decorator

(B027)


174-178: DatabaseClient.close is an empty method in an abstract base class, but has no abstract decorator

(B027)

osbenchmark/database/__init__.py

68-71: __all__ is not sorted

Apply an isort-style sorting to __all__

(RUF022)

osbenchmark/database/factory.py

75-78: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


75-78: Avoid specifying long messages outside the exception class

(TRY003)


83-86: Avoid specifying long messages outside the exception class

(TRY003)

osbenchmark/database/clients/opensearch/opensearch.py

56-56: Possible hardcoded password assigned to: "basic_auth_password"

(S105)


194-194: Unused function argument: session

(ARG001)


194-194: Unused function argument: trace_config_ctx

(ARG001)


194-194: Unused function argument: params

(ARG001)


197-197: Unused function argument: session

(ARG001)


197-197: Unused function argument: trace_config_ctx

(ARG001)


197-197: Unused function argument: params

(ARG001)


242-242: Consider moving this statement to an else block

(TRY300)


245-245: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


245-245: Avoid specifying long messages outside the exception class

(TRY003)


259-259: Consider moving this statement to an else block

(TRY300)


262-262: Use raise without specifying exception name

Remove exception name

(TRY201)


265-265: Use raise without specifying exception name

Remove exception name

(TRY201)


282-282: Avoid specifying long messages outside the exception class

(TRY003)


354-355: Avoid specifying long messages outside the exception class

(TRY003)


362-363: Avoid specifying long messages outside the exception class

(TRY003)


371-371: Do not catch blind exception: Exception

(BLE001)


543-543: Unused method argument: doc_type

(ARG002)


553-553: Unused method argument: doc_type

(ARG002)


563-563: Unused method argument: doc_type

(ARG002)

osbenchmark/worker_coordinator/worker_coordinator.py

54-54: Unused noqa directive (non-enabled: F401)

Remove unused noqa directive

(RUF100)


1020-1020: Unused method argument: database

(ARG002)


1027-1027: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Integration-Tests (3.10)
  • GitHub Check: Integration-Tests (3.11)
  • GitHub Check: Integration-Tests (3.13)
  • GitHub Check: Integration-Tests (3.12)
  • GitHub Check: docker (linux/arm64)
  • GitHub Check: docker (linux/amd64)
  • GitHub Check: Unit-Tests (ubuntu-latest)
  • GitHub Check: Unit-Tests (macos-latest)
🔇 Additional comments (27)
osbenchmark/database/clients/milvus/milvus.py (1)

1-25: LGTM!

Placeholder module with proper license header and clear docstring indicating future development. This aligns well with the database abstraction layer architecture being introduced.

osbenchmark/database/clients/__init__.py (1)

1-7: LGTM!

Clean package initializer with appropriate license header and docstring. The subpackages handle their own exports appropriately.

osbenchmark/database/clients/opensearch/__init__.py (1)

1-11: LGTM!

Clean package initialization with proper __all__ declaration. The re-export of OpenSearchClientFactory provides a convenient import path while keeping the implementation details in the submodule.

osbenchmark/workload_generator/workload_generator.py (1)

13-13: LGTM!

Import path correctly updated to the new OpenSearch client module location. The usage of OsClientFactory at lines 39-40 remains unchanged.

it/__init__.py (1)

36-37: Import path update is correct.

Both OsClientFactory (line 43) and wait_for_rest_layer (line 221) are defined in osbenchmark/database/clients/opensearch/opensearch.py. The client alias maintains backward compatibility for usages at lines 194-195.

osbenchmark/database/clients/milvus/__init__.py (1)

1-7: LGTM: clear package placeholder.
No issues with the initializer and docstring.

osbenchmark/database/clients/vespa/vespa.py (1)

1-26: LGTM: explicit placeholder stub.
This is clear and non-invasive for now.

tests/kafka_client_test.py (1)

28-28: Import path aligned with new OpenSearch client module.
Looks consistent with the refactor.

osbenchmark/worker_coordinator/runner.py (1)

50-50: LGTM: RequestContextHolder import aligned with context module.
No issues spotted.

tests/client_test.py (1)

37-38: Tests updated to new OpenSearch client module path.
Imports and logger targets look consistent.

Also applies to: 72-72, 110-110, 150-150, 225-225, 272-272, 319-319, 404-404

tests/worker_coordinator/worker_coordinator_test.py (1)

1969-1974: LGTM: factory patch path updated for new OpenSearch client module.
Looks good.

tests/worker_coordinator/runner_test.py (1)

34-35: Import path update looks consistent with the new DB abstraction layout.

osbenchmark/builder/builder.py (1)

37-39: Import path reorganization looks correct.

The client import is properly rebased to the new database abstraction layer module path. The function references at lines 267 and 280 (client.OsClientFactory, client.wait_for_rest_layer) remain compatible with the new module structure.

osbenchmark/database/__init__.py (1)

56-62: Registration at import time is appropriate for this use case.

The side-effect registration pattern is suitable for a plugin/driver registry. The import ensures OpenSearch is always available when the osbenchmark.database package is used.

osbenchmark/database/registry.py (1)

37-82: Registry implementation is clean and follows standard plugin patterns.

The DatabaseType enum, global registry, and accessor functions provide a straightforward extensibility mechanism. The Optional[Type] return from get_client_factory is properly handled by callers in factory.py.

osbenchmark/database/factory.py (1)

90-116: Database type detection with backward compatibility looks good.

The detect_database_type method properly defaults to OpenSearch when no explicit type is configured, maintaining backward compatibility with existing configurations.

osbenchmark/database/interface.py (2)

39-65: Interface defines appropriate sync/async method signatures.

The mix of async methods (create, delete, exists, refresh) and sync methods (stats, forcemerge) for IndicesNamespace is correctly documented as being driven by telemetry requirements.


159-178: Optional methods with empty implementations are intentional.

The info(), return_raw_response(), and close() methods are correctly designed as optional with default no-op implementations. The static analysis warnings (B027) can be safely ignored as the docstrings explicitly document these as "Optional method - implementations can provide no-op."

osbenchmark/worker_coordinator/worker_coordinator.py (4)

650-657: Database type resolution with fallback is appropriate.

The try/except block correctly handles both invalid database type strings (ValueError) and missing registry entries (KeyError), falling back to OpenSearch for backward compatibility.


979-984: Factory instance tracking for REST layer check is well implemented.

Storing _default_client_factory enables the wait_for_rest_layer call to work with different database types through their respective factory implementations.


990-1018: Telemetry conditional on database type is correctly implemented.

The logic properly skips OpenSearch-specific telemetry devices for non-OpenSearch databases, with appropriate logging to inform users.


2191-2206: Multi-database client creation using factory pattern is well integrated.

The os_clients inner function now uses DatabaseClientFactory.create_client_factory to support multiple database backends while maintaining the existing interface for the rest of the codebase.

osbenchmark/database/clients/opensearch/opensearch.py (5)

43-218: LGTM!

The OsClientFactory class handles SSL/TLS configuration, authentication, and cloud provider integration comprehensively. The password masking logic correctly sanitizes sensitive data before logging.


269-282: LGTM!

The MessageProducerFactory provides a clean factory method for creating message producers with appropriate error handling for unsupported types.


381-403: LGTM!

The UnifiedClientFactory properly orchestrates creation of both REST and optional gRPC clients. The design correctly defers gRPC stub creation only when grpc_hosts is provided.


420-500: LGTM!

The namespace wrapper classes correctly implement the adapter pattern, delegating to the underlying opensearchpy namespaces while conforming to the DatabaseClient interface. The __getattr__ fallback ensures full compatibility with operations not explicitly defined in the interface.


503-657: LGTM!

The OpenSearchDatabaseClient correctly implements the DatabaseClient interface with transparent delegation. The doc_type parameters are appropriately retained for backwards compatibility with deprecated opensearchpy 1.x APIs, and the comments clearly document this design decision.

The OpenSearchClientFactory properly separates async client creation (wrapped in the interface) from sync client creation (unwrapped for telemetry), which aligns with the PR's stated goals.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines +234 to +235
for attempt in range(max_attempts):
logger.debug("REST API is available after %s attempts", attempt)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Misleading log message.

This debug log states "REST API is available after X attempts" but it's logged at the start of each loop iteration, before the availability check succeeds. The message should reflect that this is an attempt being made, not a success.

🐛 Proposed fix
     for attempt in range(max_attempts):
-        logger.debug("REST API is available after %s attempts", attempt)
+        logger.debug("Checking REST API availability, attempt [%s/%s]", attempt + 1, max_attempts)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for attempt in range(max_attempts):
logger.debug("REST API is available after %s attempts", attempt)
for attempt in range(max_attempts):
logger.debug("Checking REST API availability, attempt [%s/%s]", attempt + 1, max_attempts)
🤖 Prompt for AI Agents
In `@osbenchmark/database/clients/opensearch/opensearch.py` around lines 234 -
235, The debug statement inside the retry loop is misleading because it logs
success before any check; update the logger.debug call inside the for attempt in
range(max_attempts) loop (referencing variables attempt, max_attempts and
logger.debug) to indicate it's an attempt/health-check (e.g., "Attempt %s/%s:
checking REST API availability") and consider using attempt+1 for human-readable
attempt counts so the message reflects an in-progress check rather than a
successful result.

Comment on lines +318 to +322
channel = grpc.aio.insecure_channel(
target=grpc_addr,
options=self.grpc_channel_options,
compression=None
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's examine the opensearch.py file around the gRPC channel creation
cat -n osbenchmark/database/clients/opensearch/opensearch.py | sed -n '285,331p'

Repository: opensearch-project/opensearch-benchmark

Length of output: 2306


🏁 Script executed:

# Also get more context - look at the full GrpcClientFactory class
wc -l osbenchmark/database/clients/opensearch/opensearch.py

Repository: opensearch-project/opensearch-benchmark

Length of output: 140


🏁 Script executed:

# Search for the REST client implementation to compare SSL/TLS support
fd ".*opensearch.*" --type f | head -20

Repository: opensearch-project/opensearch-benchmark

Length of output: 1314


🏁 Script executed:

# Search for grpc_channel_options definition and usage
rg "grpc_channel_options" osbenchmark/database/clients/opensearch/

Repository: opensearch-project/opensearch-benchmark

Length of output: 275


🏁 Script executed:

# Check for any TLS/SSL related configuration in the gRPC client
rg -i "tls|ssl|secure.*channel" osbenchmark/database/clients/opensearch/opensearch.py -B 2 -A 2

Repository: opensearch-project/opensearch-benchmark

Length of output: 5340


🏁 Script executed:

# Search for TODO/FIXME related to gRPC or TLS
rg -i "todo|fixme" osbenchmark/database/clients/opensearch/opensearch.py | head -20

Repository: opensearch-project/opensearch-benchmark

Length of output: 65


🏁 Script executed:

# Check if there are any other gRPC implementations or configurations
rg -i "grpc|secure_channel" osbenchmark/ --type py | grep -v "\.pyc"

Repository: opensearch-project/opensearch-benchmark

Length of output: 11736


🏁 Script executed:

# Look for documentation or comments explaining the design choice
rg -B 5 -A 5 "insecure_channel" osbenchmark/

Repository: opensearch-project/opensearch-benchmark

Length of output: 1655


🏁 Script executed:

# Check if there's a discussion in recent commits about gRPC security
git log --oneline --grep="grpc\|gRPC" | head -20

Repository: opensearch-project/opensearch-benchmark

Length of output: 65


gRPC channel lacks TLS support.

The gRPC client uses insecure_channel() which transmits data unencrypted, creating an asymmetry with the REST client's comprehensive SSL/TLS implementation (certificate verification, client authentication, hostname checking). This is a security concern for production environments.

Note: The same insecure implementation exists in both osbenchmark/database/clients/opensearch/opensearch.py and osbenchmark/client.py.

Consider adding secure channel support with TLS options and certificate configuration to match the REST client's security model.

🤖 Prompt for AI Agents
In `@osbenchmark/database/clients/opensearch/opensearch.py` around lines 318 -
322, The gRPC client currently uses grpc.aio.insecure_channel(…) which sends
unencrypted traffic; update the code around grpc.aio.insecure_channel, grpc_addr
and self.grpc_channel_options to support TLS by creating
grpc.ssl_channel_credentials (using server CA cert, optional client cert/key and
hostname checking) and using grpc.aio.secure_channel when TLS config is
provided, while preserving the existing insecure_channel fallback when no TLS
files/config are supplied; add configuration parsing to accept certificate paths
and apply them to channel creation so gRPC mirrors the REST client's certificate
verification and client-auth behavior.

Comment on lines +349 to +363
def document_service(self, cluster_name="default"):
"""Get the gRPC DocumentService stub for the specified cluster."""
if cluster_name in self._grpc_stubs:
return self._grpc_stubs[cluster_name].get('document_service')
else:
raise exceptions.SystemSetupError(
"gRPC DocumentService not available. Please configure --grpc-target-hosts.")

def search_service(self, cluster_name="default"):
"""Get the gRPC SearchService stub for the specified cluster."""
if cluster_name in self._grpc_stubs:
return self._grpc_stubs[cluster_name].get('search_service')
else:
raise exceptions.SystemSetupError(
"gRPC SearchService not available. Please configure --grpc-target-hosts.")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Add null check before accessing _grpc_stubs.

Both document_service and search_service methods check if cluster_name in self._grpc_stubs without first verifying that _grpc_stubs is not None. This will raise TypeError: argument of type 'NoneType' is not iterable when gRPC is not configured.

🐛 Proposed fix
     def document_service(self, cluster_name="default"):
         """Get the gRPC DocumentService stub for the specified cluster."""
-        if cluster_name in self._grpc_stubs:
+        if self._grpc_stubs and cluster_name in self._grpc_stubs:
             return self._grpc_stubs[cluster_name].get('document_service')
         else:
             raise exceptions.SystemSetupError(
                 "gRPC DocumentService not available. Please configure --grpc-target-hosts.")

     def search_service(self, cluster_name="default"):
         """Get the gRPC SearchService stub for the specified cluster."""
-        if cluster_name in self._grpc_stubs:
+        if self._grpc_stubs and cluster_name in self._grpc_stubs:
             return self._grpc_stubs[cluster_name].get('search_service')
         else:
             raise exceptions.SystemSetupError(
                 "gRPC SearchService not available. Please configure --grpc-target-hosts.")
🧰 Tools
🪛 Ruff (0.14.13)

354-355: Avoid specifying long messages outside the exception class

(TRY003)


362-363: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In `@osbenchmark/database/clients/opensearch/opensearch.py` around lines 349 -
363, The methods document_service and search_service assume self._grpc_stubs is
iterable and check "if cluster_name in self._grpc_stubs", which will raise
TypeError when _grpc_stubs is None; add a null/emptiness check first (e.g., if
not self._grpc_stubs: raise the same SystemSetupError) or ensure _grpc_stubs is
a dict before using membership, then proceed to check cluster_name in
self._grpc_stubs and return the stub or raise SystemSetupError; apply this
change in both document_service and search_service.

Comment on lines +365 to +373
def __del__(self):
"""Close all gRPC channels."""
for cluster_stubs in self._grpc_stubs.values():
if '_channel' in cluster_stubs:
try:
cluster_stubs['_channel'].close()
except Exception as e:
self._logger.warning("Error closing gRPC channel: %s", e)
self._opensearch.close()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

__del__ will crash when grpc_stubs is None.

The __del__ method iterates over self._grpc_stubs.values(), but _grpc_stubs can be None (set on line 342 when grpc_stubs parameter is not provided). This will raise AttributeError: 'NoneType' object has no attribute 'values' during garbage collection.

🐛 Proposed fix
     def __del__(self):
         """Close all gRPC channels."""
-        for cluster_stubs in self._grpc_stubs.values():
-            if '_channel' in cluster_stubs:
-                try:
-                    cluster_stubs['_channel'].close()
-                except Exception as e:
-                    self._logger.warning("Error closing gRPC channel: %s", e)
-        self._opensearch.close()
+        if self._grpc_stubs:
+            for cluster_stubs in self._grpc_stubs.values():
+                if '_channel' in cluster_stubs:
+                    try:
+                        cluster_stubs['_channel'].close()
+                    except Exception:  # noqa: BLE001
+                        self._logger.warning("Error closing gRPC channel")
+        if self._opensearch:
+            self._opensearch.close()
🧰 Tools
🪛 Ruff (0.14.13)

371-371: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
In `@osbenchmark/database/clients/opensearch/opensearch.py` around lines 365 -
373, The __del__ method can raise AttributeError when self._grpc_stubs is None;
update __del__ in opensearch.py to first check whether self._grpc_stubs is
truthy (or is a dict) before iterating self._grpc_stubs.values(), e.g., skip the
loop if _grpc_stubs is None, then proceed to safely close any '_channel' in
cluster_stubs and call self._opensearch.close() (keeping the existing try/except
around channel.close()), so __del__ never assumes _grpc_stubs is a dict and
avoids iterating None.

Comment on lines +71 to +78
try:
db_enum = DatabaseType(database_type.lower())
except ValueError:
valid_types = [db.value for db in DatabaseType]
raise ValueError(
f"Unsupported database type: '{database_type}'. "
f"Valid types are: {', '.join(valid_types)}"
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use raise ... from None to suppress exception chaining.

When re-raising with a more informative message, suppress the original exception chain to avoid confusing tracebacks.

Suggested fix
         try:
             db_enum = DatabaseType(database_type.lower())
         except ValueError:
             valid_types = [db.value for db in DatabaseType]
             raise ValueError(
                 f"Unsupported database type: '{database_type}'. "
                 f"Valid types are: {', '.join(valid_types)}"
-            )
+            ) from None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
db_enum = DatabaseType(database_type.lower())
except ValueError:
valid_types = [db.value for db in DatabaseType]
raise ValueError(
f"Unsupported database type: '{database_type}'. "
f"Valid types are: {', '.join(valid_types)}"
)
try:
db_enum = DatabaseType(database_type.lower())
except ValueError:
valid_types = [db.value for db in DatabaseType]
raise ValueError(
f"Unsupported database type: '{database_type}'. "
f"Valid types are: {', '.join(valid_types)}"
) from None
🧰 Tools
🪛 Ruff (0.14.13)

75-78: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


75-78: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In `@osbenchmark/database/factory.py` around lines 71 - 78, The except block
handling database_type should suppress exception chaining by re-raising the new
ValueError using "from None"; update the raise in the except that currently
constructs the message with DatabaseType and valid_types so it becomes raise
ValueError(...) from None to avoid including the original ValueError traceback
when creating db_enum from database_type.lower().

Comment on lines +49 to +54
from osbenchmark.database.clients.opensearch import opensearch as client
from osbenchmark import actor, config, exceptions, metrics, workload, paths, PROGRAM_NAME, telemetry
from osbenchmark.worker_coordinator import runner, scheduler
from osbenchmark.database.factory import DatabaseClientFactory
from osbenchmark.database.registry import DatabaseType, get_client_factory
import osbenchmark.database # noqa: F401 # pylint: disable=unused-import
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove unused noqa directive.

The noqa: F401 directive on line 54 is flagged as unused by static analysis. The import of osbenchmark.database triggers side-effect registration which is intentional, but the noqa comment is unnecessary.

Suggested fix
 from osbenchmark.database.factory import DatabaseClientFactory
 from osbenchmark.database.registry import DatabaseType, get_client_factory
-import osbenchmark.database  # noqa: F401  # pylint: disable=unused-import
+import osbenchmark.database  # pylint: disable=unused-import
🧰 Tools
🪛 Ruff (0.14.13)

54-54: Unused noqa directive (non-enabled: F401)

Remove unused noqa directive

(RUF100)

🤖 Prompt for AI Agents
In `@osbenchmark/worker_coordinator/worker_coordinator.py` around lines 49 - 54,
The import line that forces side-effect registration uses an unnecessary `noqa:
F401` comment; edit the `import osbenchmark.database  # noqa: F401  # pylint:
disable=unused-import` statement in worker_coordinator.py to remove only the
`noqa: F401` portion while keeping the `# pylint: disable=unused-import` comment
so the side-effect import remains and pylint suppression stays intact.

Comment on lines +1020 to +1027
def wait_for_rest_api(self, database):
self.logger.info("Checking if REST API is available.")
if client.wait_for_rest_layer(os_default, max_attempts=40):
# Use the factory's wait_for_rest_layer method which handles different database types
if self._default_client_factory.wait_for_rest_layer():
self.logger.info("REST API is available.")
else:
self.logger.error("REST API layer is not yet available. Stopping benchmark.")
raise exceptions.SystemSetupError("OpenSearch REST API layer is not available.")
raise exceptions.SystemSetupError("Database REST API layer is not available.")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Unused database parameter in wait_for_rest_api.

The database parameter is passed but never used. The method relies on self._default_client_factory instead. Consider removing the parameter or using it.

Suggested fix (remove unused parameter)
-    def wait_for_rest_api(self, database):
+    def wait_for_rest_api(self):
         self.logger.info("Checking if REST API is available.")
         # Use the factory's wait_for_rest_layer method which handles different database types
         if self._default_client_factory.wait_for_rest_layer():

And update the caller at line 1062:

-            self.wait_for_rest_api(os_clients)
+            self.wait_for_rest_api()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def wait_for_rest_api(self, database):
self.logger.info("Checking if REST API is available.")
if client.wait_for_rest_layer(os_default, max_attempts=40):
# Use the factory's wait_for_rest_layer method which handles different database types
if self._default_client_factory.wait_for_rest_layer():
self.logger.info("REST API is available.")
else:
self.logger.error("REST API layer is not yet available. Stopping benchmark.")
raise exceptions.SystemSetupError("OpenSearch REST API layer is not available.")
raise exceptions.SystemSetupError("Database REST API layer is not available.")
def wait_for_rest_api(self):
self.logger.info("Checking if REST API is available.")
# Use the factory's wait_for_rest_layer method which handles different database types
if self._default_client_factory.wait_for_rest_layer():
self.logger.info("REST API is available.")
else:
self.logger.error("REST API layer is not yet available. Stopping benchmark.")
raise exceptions.SystemSetupError("Database REST API layer is not available.")
🧰 Tools
🪛 Ruff (0.14.13)

1020-1020: Unused method argument: database

(ARG002)


1027-1027: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In `@osbenchmark/worker_coordinator/worker_coordinator.py` around lines 1020 -
1027, Remove the unused database parameter from the wait_for_rest_api method
signature and from all its call sites: change "def wait_for_rest_api(self,
database)" to "def wait_for_rest_api(self)" and ensure callers no longer pass a
database argument; keep the internal logic that uses
self._default_client_factory.wait_for_rest_layer() and preserve the existing
logging and exception behavior in wait_for_rest_api.

@OVI3D0
Copy link
Member Author

OVI3D0 commented Jan 23, 2026

Looks like all integ tests are failing with the same error:

Error: FO     osbenchmark.utils.process:process.py:128 [ERROR] ❌ Cannot list. Could not load '/home/runner/work/opensearch-benchmark/opensearch-benchmark/.osb/benchmarks/workloads/default/http_logs/workload.json'. The complete workload has been written to '/tmp/tmpsb9dq35m.json' for diagnosis. 

I've uploaded a fix in the workloads repo that should address these failures: opensearch-project/opensearch-benchmark-workloads#723

@OVI3D0 OVI3D0 closed this Jan 27, 2026
@OVI3D0 OVI3D0 reopened this Feb 10, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant