Skip to content

Commit 05ec121

Browse files
tsmith023rlmanriquedirkkul
authored
Dev/1.34 (#1861)
* flat index: Add support for RQ and include cache param * Update tests for RQ * Add function to retry if http error * Add 1.34 CI and test configuration and move function to conftest * Update comments * Add 134 version * Set 1.34 dev image for CI jobs * Add ACORN as defaul filter strategy in 1.34 * Comment backup test temporarily * Introduce `batch.experimental()` while server-side batching is in beta (#1765) * Add client-side changes to handle new server-side batching in 1.33 * Update images in CI * Update 1.33 image in CI * Alter test or lazy shard loading for new potential server behaviour * Change other lazy loading test too * Fix CI image for 1.33 * Update protos, fix setting of batch client in wrapper to avoid races with connection * Remove debug assert in test * Update new batch to use different modes with server, update CI image * Refactor to changed server batching options * Throw error if using automatic batching with incompatible server * Add exponential backoff retry to stream reconnect method * Remove timeout and retries from new grpc methods * Only delete key if present in dict * Close before re-connecting, reset rec num objs on shutdown * Update to use latest protos and behaviour * Improve logging using .automatic() * Update CI image to latest server build * Fix testing issues with new versions * Attempt fixes for tests again * Add ability to retry certain server-emitted full errors, e.g. temporary replication problems * Attempt fixes of flakes * Update to use latest server impl and CI image * Update to use latest dev server version * Rename from automatic to experimental, bump CI version to latest RC * Push ongoing changes * Update to use latest server image * Update to use latest server changes * Undo debug changes to conftest * Update to use latest server image * Make internal send/recv queue size 1 and sleep while shutdown to avoid pushing to it * Update to use latest server image * Fix shutting down message handling * Skip backoff handling if client has closed the stream * Remove unused code * Don't print backoff adjustments when shutting down * Improve shutting down log * Attempt to catch last req that can be lost during shutdown * Avoid circular import * Remove last_req wrapping logic from stream, reduce logging, update image in ci * Close the client-side of the stream on shutdown, sleep for backoff during req generation * Update CI image * Only log waiting for stream re-establishment once * Switch from arm to amd in CI * Shutdown client-side stream regardless of size of __reqs queue * Increase timeout when waiting for req to send, don't use queue size in if due to unreliability * Use sentinel in req put/get to avoid inaccurate block timeouts * Update CI image * Correctly populate batch.results * Update CI images * Assert indexing status in one of the allowed values rather than a specific value * Undo debug changes in tests * Update to match new server impl * Update to use latest server image * Only start threads once to avoid runtime error when handling shutdown * Update CI images * Hard-code SSB concurrency to 1 for now * Fix collection.batch.automatic * Correct logic in `_BgThreads.is_alive` * Adjust default batch size to align with server default and avoid overloading server too fast * Update CI images and version checks in tests * Update to use latest server behaviour around backoffs and uuid/err results * Lock once when reading batch results from stream * Interpret context canceled as ungraceful shutdown to be restarted by client * Use backoff message to adjust batch size * Start batching with smallest allowed server value * Add extra log in batch send * Reintroduce timeout when getting from queue * Add log to empty queue * Add log to batch recv restart * Remove timeout when getting from internal queue * Only update batch size if value has changed * Track then log total number of objects pushed by client * WIP: receive shutdown as message and not rpc error * Move result writing inside message.results case * Add missing proto changes * Update CI image * Improve resiliance on unexpected server behaviour --------- Co-authored-by: Dirk Kulawiak <[email protected]> * Ensure created backup in test finishes before starting new test * Wait until all nodes are back and healthy before reconnecting when CL=ALL * Handle passing of ref beacons in response msgs (#1863) --------- Co-authored-by: Rodrigo Lopez <[email protected]> Co-authored-by: Dirk Kulawiak <[email protected]> Co-authored-by: Dirk Kulawiak <[email protected]>
1 parent 95451a9 commit 05ec121

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+3099
-907
lines changed

.github/workflows/main.yaml

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ concurrency:
2020
env:
2121
WEAVIATE_127: 1.27.27
2222
WEAVIATE_128: 1.28.16
23-
WEAVIATE_129: 1.29.9
24-
WEAVIATE_130: 1.30.12
25-
WEAVIATE_131: 1.31.5
23+
WEAVIATE_129: 1.29.11
24+
WEAVIATE_130: 1.30.18
25+
WEAVIATE_131: 1.31.16
2626
WEAVIATE_132: 1.32.5
27-
WEAVIATE_133: 1.33.0-rc.1
27+
WEAVIATE_133: 1.33.0
28+
WEAVIATE_134: 1.34.0-rc.1
2829

2930
jobs:
3031
lint-and-format:
@@ -152,11 +153,11 @@ jobs:
152153
fail-fast: false
153154
matrix:
154155
versions: [
155-
{ py: "3.9", weaviate: $WEAVIATE_131, grpc: "1.59.0"},
156-
{ py: "3.10", weaviate: $WEAVIATE_131, grpc: "1.66.0"},
157-
{ py: "3.11", weaviate: $WEAVIATE_131, grpc: "1.70.0"},
158-
{ py: "3.12", weaviate: $WEAVIATE_131, grpc: "1.72.1"},
159-
{ py: "3.13", weaviate: $WEAVIATE_131, grpc: "1.74.0"}
156+
{ py: "3.9", weaviate: $WEAVIATE_132, grpc: "1.59.0"},
157+
{ py: "3.10", weaviate: $WEAVIATE_132, grpc: "1.66.0"},
158+
{ py: "3.11", weaviate: $WEAVIATE_132, grpc: "1.70.0"},
159+
{ py: "3.12", weaviate: $WEAVIATE_132, grpc: "1.72.1"},
160+
{ py: "3.13", weaviate: $WEAVIATE_132, grpc: "1.74.0"}
160161
]
161162
optional_dependencies: [false]
162163
steps:
@@ -207,11 +208,11 @@ jobs:
207208
fail-fast: false
208209
matrix:
209210
versions: [
210-
{ py: "3.9", weaviate: $WEAVIATE_131},
211-
{ py: "3.10", weaviate: $WEAVIATE_131},
212-
{ py: "3.11", weaviate: $WEAVIATE_131},
213-
{ py: "3.12", weaviate: $WEAVIATE_131},
214-
{ py: "3.13", weaviate: $WEAVIATE_131}
211+
{ py: "3.9", weaviate: $WEAVIATE_132},
212+
{ py: "3.10", weaviate: $WEAVIATE_132},
213+
{ py: "3.11", weaviate: $WEAVIATE_132},
214+
{ py: "3.12", weaviate: $WEAVIATE_132},
215+
{ py: "3.13", weaviate: $WEAVIATE_132}
215216
]
216217
optional_dependencies: [false]
217218
steps:
@@ -302,7 +303,8 @@ jobs:
302303
$WEAVIATE_130,
303304
$WEAVIATE_131,
304305
$WEAVIATE_132,
305-
$WEAVIATE_133
306+
$WEAVIATE_133,
307+
$WEAVIATE_134
306308
]
307309
steps:
308310
- name: Checkout

integration/conftest.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import pytest
1616
import pytest_asyncio
1717
from _pytest.fixtures import SubRequest
18+
import time
19+
from typing import Callable, TypeVar
1820

1921
import weaviate
2022
from weaviate.collections import Collection, CollectionAsync
@@ -36,6 +38,8 @@
3638
from weaviate.collections.classes.types import Properties
3739
from weaviate.config import AdditionalConfig
3840

41+
from weaviate.exceptions import UnexpectedStatusCodeError
42+
3943

4044
class CollectionFactory(Protocol):
4145
"""Typing for fixture."""
@@ -459,3 +463,37 @@ def _factory(
459463
def _sanitize_collection_name(name: str) -> str:
460464
name = name.replace("[", "").replace("]", "").replace("-", "").replace(" ", "").replace(".", "")
461465
return name[0].upper() + name[1:]
466+
467+
468+
T = TypeVar("T")
469+
470+
471+
def retry_on_http_error(
472+
func: Callable[[], T], http_error_code: int, max_retries: int = 3, delay: float = 0.5
473+
) -> T:
474+
"""Retry a function call if it raises UnexpectedStatusCodeError with 404 status code.
475+
476+
Args:
477+
func: The function to retry
478+
http_error_code: The HTTP error code to retry on
479+
max_retries: Maximum number of retries (default: 3)
480+
delay: Initial delay between retries in seconds (default: 0.5)
481+
482+
Returns:
483+
The result of the function call
484+
485+
Raises:
486+
The last exception if all retries are exhausted
487+
"""
488+
last_exception = None
489+
for attempt in range(max_retries + 1):
490+
try:
491+
return func()
492+
except UnexpectedStatusCodeError as e:
493+
last_exception = e
494+
if e.status_code == http_error_code and attempt < max_retries:
495+
time.sleep(delay * (2**attempt)) # Exponential backoff
496+
continue
497+
raise
498+
# This should never be reached, but satisfies the type checker
499+
raise last_exception # type: ignore

integration/test_backup_v4.py

Lines changed: 39 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
)
2020
from weaviate.collections.classes.config import DataType, Property, ReferenceProperty
2121
from weaviate.exceptions import (
22-
BackupFailedError,
2322
BackupFailedException,
2423
UnexpectedStatusCodeException,
2524
)
@@ -626,6 +625,18 @@ def test_list_backup_ascending_order(client: weaviate.WeaviateClient, request: S
626625
resp = client.backup.create(backup_id=backup_id, backend=BACKEND)
627626
assert resp.status == BackupStatus.STARTED
628627

628+
while True:
629+
create_status = client.backup.get_create_status(backup_id, BACKEND)
630+
assert create_status.status in [
631+
BackupStatus.SUCCESS,
632+
BackupStatus.TRANSFERRED,
633+
BackupStatus.TRANSFERRING,
634+
BackupStatus.STARTED,
635+
]
636+
if create_status.status == BackupStatus.SUCCESS:
637+
break
638+
time.sleep(0.1)
639+
629640
backups = client.backup.list_backups(backend=BACKEND, sort_by_starting_time_asc=True)
630641
assert backup_id.lower() in [b.backup_id.lower() for b in backups]
631642

@@ -670,28 +681,30 @@ def test_overwrite_alias_true(
670681
assert literature.collection == "Article", "alias must point to the original collection"
671682

672683

673-
def test_overwrite_alias_false(
674-
client: weaviate.WeaviateClient, request: SubRequest, artist_alias: str
675-
) -> None:
676-
"""Restore backups with overwrite=false (conflict)."""
677-
backup_id = unique_backup_id(request.node.name)
678-
679-
client.backup.create(
680-
backup_id=backup_id,
681-
backend=BACKEND,
682-
include_collections=["Article"],
683-
wait_for_completion=True,
684-
)
685-
686-
client.collections.delete("Article")
687-
client.alias.update(alias_name=artist_alias, new_target_collection="Paragraph")
688-
689-
with pytest.raises(BackupFailedError) as err:
690-
client.backup.restore(
691-
backup_id=backup_id,
692-
backend=BACKEND,
693-
include_collections=["Article"],
694-
wait_for_completion=True,
695-
overwrite_alias=False,
696-
)
697-
assert "alias already exists" in str(err)
684+
# This test has been disabled temporarily until the behaviour of this scenario is clarified.
685+
# It worked in version 1.33.0-rc.1, but broken in version 1.33.0+
686+
# def test_overwrite_alias_false(
687+
# client: weaviate.WeaviateClient, request: SubRequest, artist_alias: str
688+
# ) -> None:
689+
# """Restore backups with overwrite=false (conflict)."""
690+
# backup_id = unique_backup_id(request.node.name)
691+
692+
# client.backup.create(
693+
# backup_id=backup_id,
694+
# backend=BACKEND,
695+
# include_collections=["Article"],
696+
# wait_for_completion=True,
697+
# )
698+
699+
# client.collections.delete("Article")
700+
# client.alias.update(alias_name=artist_alias, new_target_collection="Paragraph")
701+
702+
# with pytest.raises(BackupFailedError) as err:
703+
# client.backup.restore(
704+
# backup_id=backup_id,
705+
# backend=BACKEND,
706+
# include_collections=["Article"],
707+
# wait_for_completion=True,
708+
# overwrite_alias=False,
709+
# )
710+
# assert "alias already exists" in str(err)

integration/test_batch_v4.py

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import weaviate
1010
import weaviate.classes as wvc
1111
from integration.conftest import _sanitize_collection_name
12-
from weaviate import BatchClient, ClientBatchingContextManager
12+
from weaviate import ClientBatchingContextManager
1313
from weaviate.collections.classes.batch import Shard
1414
from weaviate.collections.classes.config import (
1515
Configure,
@@ -175,11 +175,11 @@ def test_add_reference(
175175
client_factory: ClientFactory,
176176
from_object_uuid: UUID,
177177
to_object_uuid: UUID,
178-
to_object_collection: Optional[bool],
178+
to_object_collection: bool,
179179
) -> None:
180180
"""Test the `add_reference` method."""
181181
client, name = client_factory()
182-
with client.batch.fixed_size() as batch:
182+
with client.batch.dynamic() as batch:
183183
batch.add_object(
184184
properties={},
185185
collection=name,
@@ -194,8 +194,16 @@ def test_add_reference(
194194
from_uuid=from_object_uuid,
195195
from_collection=name,
196196
from_property="test",
197-
to=to_object_uuid,
197+
to=ReferenceToMulti(target_collection=name, uuids=to_object_uuid)
198+
if to_object_collection
199+
else to_object_uuid,
198200
)
201+
assert len(client.batch.failed_objects) == 0, [
202+
obj.message for obj in client.batch.failed_objects
203+
]
204+
assert len(client.batch.failed_references) == 0, [
205+
ref.message for ref in client.batch.failed_references
206+
]
199207
objs = (
200208
client.collections.use(name)
201209
.query.fetch_objects(return_references=QueryReference(link_on="test"))
@@ -357,14 +365,16 @@ def test_add_ref_batch_with_tenant(client_factory: ClientFactory) -> None:
357365
@pytest.mark.parametrize(
358366
"batching_method",
359367
[
360-
lambda client: client.batch.dynamic(),
361-
lambda client: client.batch.fixed_size(),
362-
lambda client: client.batch.rate_limit(9999),
368+
# lambda client: client.batch.dynamic(),
369+
# lambda client: client.batch.fixed_size(),
370+
# lambda client: client.batch.rate_limit(9999),
371+
lambda client: client.batch.experimental(concurrency=1),
363372
],
364373
ids=[
365-
"test_add_ten_thousand_data_objects_dynamic",
366-
"test_add_ten_thousand_data_objects_fixed_size",
367-
"test_add_ten_thousand_data_objects_rate_limit",
374+
# "test_add_ten_thousand_data_objects_dynamic",
375+
# "test_add_ten_thousand_data_objects_fixed_size",
376+
# "test_add_ten_thousand_data_objects_rate_limit",
377+
"test_add_ten_thousand_data_objects_experimental",
368378
],
369379
)
370380
def test_add_ten_thousand_data_objects(
@@ -374,16 +384,31 @@ def test_add_ten_thousand_data_objects(
374384
) -> None:
375385
"""Test adding ten thousand data objects."""
376386
client, name = client_factory()
377-
378-
nr_objects = 10000
387+
if (
388+
request.node.callspec.id == "test_add_ten_thousand_data_objects_experimental"
389+
and client._connection._weaviate_version.is_lower_than(1, 34, 0)
390+
):
391+
pytest.skip("Server-side batching not supported in Weaviate < 1.34.0")
392+
nr_objects = 100000
393+
import time
394+
395+
start = time.time()
379396
with batching_method(client) as batch:
380397
for i in range(nr_objects):
381398
batch.add_object(
382399
collection=name,
383400
properties={"name": "test" + str(i)},
384401
)
385-
objs = client.collections.use(name).query.fetch_objects(limit=nr_objects).objects
386-
assert len(objs) == nr_objects
402+
end = time.time()
403+
print(f"Time taken to add {nr_objects} objects: {end - start} seconds")
404+
assert len(client.batch.results.objs.errors) == 0
405+
assert len(client.batch.results.objs.all_responses) == nr_objects
406+
assert len(client.batch.results.objs.uuids) == nr_objects
407+
assert len(client.collections.use(name)) == nr_objects
408+
assert client.batch.results.objs.has_errors is False
409+
assert len(client.batch.failed_objects) == 0, [
410+
obj.message for obj in client.batch.failed_objects
411+
]
387412
client.collections.delete(name)
388413

389414

@@ -551,19 +576,28 @@ def test_add_1000_tenant_objects_with_async_indexing_and_wait_for_only_one(
551576
lambda client: client.batch.dynamic(),
552577
lambda client: client.batch.fixed_size(),
553578
lambda client: client.batch.rate_limit(1000),
579+
lambda client: client.batch.experimental(),
554580
],
555581
ids=[
556582
"test_add_one_hundred_objects_and_references_between_all_dynamic",
557583
"test_add_one_hundred_objects_and_references_between_all_fixed_size",
558584
"test_add_one_hundred_objects_and_references_between_all_rate_limit",
585+
"test_add_one_hundred_objects_and_references_between_all_experimental",
559586
],
560587
)
561588
def test_add_one_object_and_a_self_reference(
562589
client_factory: ClientFactory,
563590
batching_method: Callable[[weaviate.WeaviateClient], ClientBatchingContextManager],
591+
request: SubRequest,
564592
) -> None:
565593
"""Test adding one object and a self reference."""
566594
client, name = client_factory()
595+
if (
596+
request.node.callspec.id
597+
== "test_add_one_hundred_objects_and_references_between_all_experimental"
598+
and client._connection._weaviate_version.is_lower_than(1, 34, 0)
599+
):
600+
pytest.skip("Server-side batching not supported in Weaviate < 1.34.0")
567601
with batching_method(client) as batch:
568602
uuid = batch.add_object(collection=name, properties={})
569603
batch.add_reference(
@@ -586,7 +620,7 @@ def test_multi_threaded_batching(
586620
nr_objects = 1000
587621
nr_threads = 10
588622

589-
def batch_insert(batch: BatchClient) -> None:
623+
def batch_insert(batch) -> None:
590624
for i in range(nr_objects):
591625
batch.add_object(
592626
collection=name,
@@ -683,6 +717,12 @@ def test_batching_error_logs(
683717
client_factory: ClientFactory, caplog: pytest.LogCaptureFixture
684718
) -> None:
685719
client, name = client_factory()
720+
if client._connection._weaviate_version.is_at_least(
721+
1, 32, 1
722+
): # TODO: change to 1.33.0 when released
723+
pytest.skip(
724+
"Batching error logs do not get emitted by the new server-side batching functionality."
725+
)
686726
with client.batch.fixed_size() as batch:
687727
for obj in [{"name": i} for i in range(100)]:
688728
batch.add_object(properties=obj, collection=name)

integration/test_client.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,12 @@ def test_client_cluster_with_lazy_shard_loading(
345345
assert len(nodes[0].shards) == 1
346346
assert nodes[0].shards[0].collection == collection.name
347347
assert nodes[0].shards[0].object_count == 0
348-
assert nodes[0].shards[0].vector_indexing_status == "READY"
348+
assert nodes[0].shards[0].vector_indexing_status in [
349+
"READONLY",
350+
"INDEXING",
351+
"READY",
352+
"LAZY_LOADING",
353+
]
349354
assert nodes[0].shards[0].vector_queue_length == 0
350355
assert nodes[0].shards[0].compressed is False
351356
assert nodes[0].shards[0].loaded is True
@@ -358,12 +363,6 @@ def test_client_cluster_without_lazy_shard_loading(
358363
) -> None:
359364
client = client_factory(8090, 50061)
360365

361-
# Lazy-loading behaviour was changed in 1.32.4:
362-
# https://github.com/weaviate/weaviate/pull/8829
363-
#
364-
# We also accept LOADING/READY because it may vary
365-
# based on the machine running the tests.
366-
367366
try:
368367
collection = client.collections.create(
369368
name=request.node.name, vectorizer_config=Configure.Vectorizer.none()
@@ -374,10 +373,12 @@ def test_client_cluster_without_lazy_shard_loading(
374373
assert len(nodes[0].shards) == 1
375374
assert nodes[0].shards[0].collection == collection.name
376375
assert nodes[0].shards[0].object_count == 0
377-
if collection._connection._weaviate_version.is_lower_than(1, 32, 0):
378-
assert nodes[0].shards[0].vector_indexing_status == "READY"
379-
else:
380-
assert nodes[0].shards[0].vector_indexing_status == "LAZY_LOADING"
376+
assert nodes[0].shards[0].vector_indexing_status in [
377+
"READONLY",
378+
"INDEXING",
379+
"READY",
380+
"LAZY_LOADING",
381+
]
381382
assert nodes[0].shards[0].vector_queue_length == 0
382383
assert nodes[0].shards[0].compressed is False
383384
if collection._connection._weaviate_version.is_lower_than(1, 25, 0):

0 commit comments

Comments
 (0)