Skip to content

Commit 6471558

Browse files
committed
Remove _ClusterBatch
1 parent 8c3f915 commit 6471558

File tree

3 files changed

+113
-33
lines changed

3 files changed

+113
-33
lines changed

integration/test_cluster.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import pytest
2+
3+
import weaviate
4+
from weaviate.collections.classes.config import (
5+
Configure,
6+
DataType,
7+
Property,
8+
)
9+
from weaviate.util import parse_version_string
10+
11+
12+
NODE_NAME = "node1"
13+
NUM_OBJECT = 10
14+
15+
16+
@pytest.fixture(scope="module")
17+
def client():
18+
client = weaviate.connect_to_local()
19+
client.collections.delete_all()
20+
yield client
21+
client.collections.delete_all()
22+
23+
24+
def test_rest_nodes_without_data(client: weaviate.WeaviateClient):
25+
"""get nodes status without data"""
26+
resp = client.cluster.rest_nodes(output="verbose")
27+
assert len(resp) == 1
28+
assert "gitHash" in resp[0]
29+
assert resp[0]["name"] == NODE_NAME
30+
assert resp[0]["shards"] is None
31+
assert resp[0]["stats"]["objectCount"] == 0
32+
assert resp[0]["stats"]["shardCount"] == 0
33+
assert resp[0]["status"] == "HEALTHY"
34+
assert "version" in resp[0]
35+
36+
37+
def test_rest_nodes_with_data(client: weaviate.WeaviateClient):
38+
"""get nodes status with data"""
39+
collection_name_1 = "Collection_1"
40+
uncap_collection_name_1 = "collection_1"
41+
collection = client.collections.create(
42+
name=collection_name_1,
43+
properties=[Property(name="Name", data_type=DataType.TEXT)],
44+
vectorizer_config=Configure.Vectorizer.none(),
45+
)
46+
collection.data.insert_many([{"Name": f"name {i}"} for i in range(NUM_OBJECT)])
47+
48+
collection_name_2 = "Collection_2"
49+
collection = client.collections.create(
50+
name=collection_name_2,
51+
properties=[Property(name="Name", data_type=DataType.TEXT)],
52+
vectorizer_config=Configure.Vectorizer.none(),
53+
)
54+
collection.data.insert_many([{"Name": f"name {i}"} for i in range(NUM_OBJECT * 2)])
55+
56+
# server behaviour changed by https://github.com/weaviate/weaviate/pull/4203
57+
server_is_at_least_124 = parse_version_string(
58+
client.get_meta()["version"]
59+
) > parse_version_string("1.24")
60+
61+
resp = client.cluster.rest_nodes(output="verbose")
62+
assert len(resp) == 1
63+
assert "gitHash" in resp[0]
64+
assert resp[0]["name"] == NODE_NAME
65+
assert len(resp[0]["shards"]) == 2
66+
assert resp[0]["stats"]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT * 3
67+
assert resp[0]["stats"]["shardCount"] == 2
68+
assert resp[0]["status"] == "HEALTHY"
69+
assert "version" in resp[0]
70+
71+
shards = sorted(resp[0]["shards"], key=lambda x: x["class"])
72+
assert shards[0]["class"] == collection_name_1
73+
assert shards[0]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT
74+
assert shards[1]["class"] == collection_name_2
75+
assert shards[1]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT * 2
76+
77+
resp = client.cluster.rest_nodes(collection=collection_name_1, output="verbose")
78+
assert len(resp) == 1
79+
assert "gitHash" in resp[0]
80+
assert resp[0]["name"] == NODE_NAME
81+
assert len(resp[0]["shards"]) == 1
82+
assert resp[0]["stats"]["shardCount"] == 1
83+
assert resp[0]["status"] == "HEALTHY"
84+
assert "version" in resp[0]
85+
assert resp[0]["stats"]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT
86+
87+
resp = client.cluster.rest_nodes(uncap_collection_name_1, output="verbose")
88+
assert len(resp) == 1
89+
assert "gitHash" in resp[0]
90+
assert resp[0]["name"] == NODE_NAME
91+
assert len(resp[0]["shards"]) == 1
92+
assert resp[0]["stats"]["shardCount"] == 1
93+
assert resp[0]["status"] == "HEALTHY"
94+
assert "version" in resp[0]
95+
assert resp[0]["stats"]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT

weaviate/collections/batch/base.py

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,11 @@
66
from collections import deque
77
from copy import copy
88
from dataclasses import dataclass, field
9-
from typing import Any, Dict, Generic, List, Optional, Set, TypeVar, Union, cast
9+
from typing import Any, Dict, Generic, List, Optional, Set, TypeVar, Union
1010

1111
from pydantic import ValidationError
1212
from typing_extensions import TypeAlias
1313

14-
from httpx import ConnectError
15-
16-
from weaviate.cluster.types import Node
1714
from weaviate.collections.batch.grpc_batch_objects import _BatchGRPC
1815
from weaviate.collections.batch.rest import _BatchREST
1916
from weaviate.collections.classes.batch import (
@@ -35,12 +32,12 @@
3532
ReferenceInputs,
3633
)
3734
from weaviate.collections.classes.types import WeaviateProperties
35+
from weaviate.collections.cluster import _ClusterAsync
3836
from weaviate.connect import ConnectionV4
3937
from weaviate.event_loop import _EventLoop
40-
from weaviate.exceptions import WeaviateBatchValidationError, EmptyResponseException
38+
from weaviate.exceptions import WeaviateBatchValidationError
4139
from weaviate.logger import logger
4240
from weaviate.types import UUID, VECTORS
43-
from weaviate.util import _decode_json_response_dict
4441
from weaviate.warnings import _Warnings
4542

4643
BatchResponse = List[Dict[str, Any]]
@@ -183,7 +180,7 @@ def __init__(
183180

184181
self.__results_lock = threading.Lock()
185182

186-
self.__cluster = _ClusterBatch(self.__connection)
183+
self.__cluster = _ClusterAsync(self.__connection)
187184

188185
self.__batching_mode: _BatchMode = batch_mode
189186
self.__max_batch_size: int = 1000
@@ -360,7 +357,7 @@ def batch_send_wrapper() -> None:
360357
return demonBatchSend
361358

362359
def __dynamic_batching(self) -> None:
363-
status = self.__loop.run_until_complete(self.__cluster.get_nodes_status)
360+
status = self.__loop.run_until_complete(self.__cluster.rest_nodes)
364361
if "batchStats" not in status[0] or "queueLength" not in status[0]["batchStats"]:
365362
# async indexing - just send a lot
366363
self.__batching_mode = _FixedSizeBatching(1000, 10)
@@ -700,23 +697,3 @@ def __check_bg_thread_alive(self) -> None:
700697
return
701698

702699
raise self.__bg_thread_exception or Exception("Batch thread died unexpectedly")
703-
704-
705-
class _ClusterBatch:
706-
def __init__(self, connection: ConnectionV4):
707-
self._connection = connection
708-
709-
async def get_nodes_status(
710-
self,
711-
) -> List[Node]:
712-
try:
713-
response = await self._connection.get(path="/nodes")
714-
except ConnectError as conn_err:
715-
raise ConnectError("Get nodes status failed due to connection error") from conn_err
716-
717-
response_typed = _decode_json_response_dict(response, "Nodes status")
718-
assert response_typed is not None
719-
nodes = response_typed.get("nodes")
720-
if nodes is None or nodes == []:
721-
raise EmptyResponseException("Nodes status response returned empty")
722-
return cast(List[Node], nodes)

weaviate/collections/cluster/cluster.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from typing import List, Literal, Optional, Union, overload
55

6+
from weaviate.cluster.types import Node as NodeREST
67
from weaviate.collections.classes.cluster import Node, Shards, _ConvertFromREST, Stats
78
from weaviate.exceptions import (
89
EmptyResponseError,
@@ -73,6 +74,17 @@ async def nodes(
7374
`weaviate.EmptyResponseError`
7475
If the response is empty.
7576
"""
77+
nodes = await self.rest_nodes(collection, output)
78+
if output == "verbose":
79+
return _ConvertFromREST.nodes_verbose(nodes)
80+
else:
81+
return _ConvertFromREST.nodes_minimal(nodes)
82+
83+
async def rest_nodes(
84+
self,
85+
collection: Optional[str] = None,
86+
output: Optional[Literal["minimal", "verbose"]] = None,
87+
) -> List[NodeREST]:
7688
path = "/nodes"
7789
if collection is not None:
7890
path += "/" + _capitalize_first_letter(collection)
@@ -86,8 +98,4 @@ async def nodes(
8698
nodes = response_typed.get("nodes")
8799
if nodes is None or nodes == []:
88100
raise EmptyResponseError("Nodes status response returned empty")
89-
90-
if output == "verbose":
91-
return _ConvertFromREST.nodes_verbose(nodes)
92-
else:
93-
return _ConvertFromREST.nodes_minimal(nodes)
101+
return nodes

0 commit comments

Comments
 (0)