diff --git a/weaviate_cli/commands/create.py b/weaviate_cli/commands/create.py index 27ba916..d92c7a4 100644 --- a/weaviate_cli/commands/create.py +++ b/weaviate_cli/commands/create.py @@ -26,6 +26,7 @@ CreateDataDefaults, CreateRoleDefaults, PERMISSION_HELP_STRING, + MAX_WORKERS, ) @@ -89,7 +90,7 @@ def create() -> None: @click.option( "--training_limit", default=CreateCollectionDefaults.training_limit, - help="Training limit for PQ and SQ (default: 10000).", + help=f"Training limit for PQ and SQ (default: {CreateCollectionDefaults.training_limit}).", ) @click.option( "--multitenant", is_flag=True, help="Enable multitenancy (default: False)." @@ -228,7 +229,7 @@ def create_collection_cli( @click.option( "--number_tenants", default=CreateTenantsDefaults.number_tenants, - help="Number of tenants to create (default: 100).", + help=f"Number of tenants to create (default: {CreateTenantsDefaults.number_tenants}).", ) @click.option( "--tenant_batch_size", @@ -336,7 +337,8 @@ def create_backup_cli(ctx, backend, backup_id, include, exclude, wait, cpu_for_b @click.option( "--limit", default=CreateDataDefaults.limit, - help="Number of objects to import (default: 1000).", + help=f"Number of objects to import (default: {CreateDataDefaults.limit}).", + type=int, ) @click.option( "--consistency_level", @@ -390,6 +392,23 @@ def create_backup_cli(ctx, backend, backup_id, include, exclude, wait, cpu_for_b is_flag=True, help="Enable multi-vector (default: False).", ) +@click.option( + "--dynamic_batch", + is_flag=True, + help="Enable dynamic batching (default: False).", +) +@click.option( + "--batch_size", + default=CreateDataDefaults.batch_size, + help=f"Number of objects to ingest in each batch (default: {CreateDataDefaults.batch_size}).", + type=int, +) +@click.option( + "--concurrent_requests", + default=MAX_WORKERS, + type=int, + help=f"Number of concurrent requests to send to the server (default: {MAX_WORKERS}).", +) @click.pass_context def create_data_cli( ctx, @@ -406,6 +425,9 @@ def create_data_cli( wait_for_indexing, verbose, multi_vector, + dynamic_batch, + batch_size, + concurrent_requests, ): """Ingest data into a collection in Weaviate.""" @@ -423,6 +445,12 @@ def create_data_cli( click.echo("Error: --uuid has no effect unless --limit=1 is enabled.") sys.exit(1) + if dynamic_batch and not randomize: + click.echo( + "Error: --dynamic_batch has no effect unless --randomize is enabled." + ) + sys.exit(1) + client: Optional[WeaviateClient] = None try: client = get_client_from_context(ctx) @@ -442,6 +470,9 @@ def create_data_cli( wait_for_indexing=wait_for_indexing, verbose=verbose, multi_vector=multi_vector, + dynamic_batch=dynamic_batch, + batch_size=batch_size, + concurrent_requests=concurrent_requests, ) except Exception as e: click.echo(f"Error: {e}") diff --git a/weaviate_cli/defaults.py b/weaviate_cli/defaults.py index 444f4ac..40098a5 100644 --- a/weaviate_cli/defaults.py +++ b/weaviate_cli/defaults.py @@ -1,4 +1,5 @@ from dataclasses import dataclass, field +import multiprocessing from typing import Optional, List, Dict @@ -50,6 +51,14 @@ QUERY_MAXIMUM_RESULTS = 10000 MAX_OBJECTS_PER_BATCH = 5000 +try: + _CPU_COUNT = multiprocessing.cpu_count() +except (NotImplementedError, OSError): + _CPU_COUNT = None +# Fallback to 1 worker if CPU count is unavailable or invalid +_SAFE_CPU_COUNT = _CPU_COUNT if isinstance(_CPU_COUNT, int) and _CPU_COUNT > 0 else 1 +MAX_WORKERS = min(32, _SAFE_CPU_COUNT + 4) + @dataclass class CreateCollectionDefaults: @@ -102,6 +111,8 @@ class CreateDataDefaults: wait_for_indexing: bool = False verbose: bool = False multi_vector: bool = False + batch_size: int = 1000 + dynamic_batch: bool = False @dataclass diff --git a/weaviate_cli/managers/config_manager.py b/weaviate_cli/managers/config_manager.py index 3ad41dc..4bd375a 100644 --- a/weaviate_cli/managers/config_manager.py +++ b/weaviate_cli/managers/config_manager.py @@ -9,6 +9,7 @@ from weaviate.exceptions import WeaviateGRPCUnavailableError from pathlib import Path from typing import Dict, Optional, Union +from weaviate.config import AdditionalConfig, Timeout as WeaviateTimeout class ConfigManager: @@ -107,6 +108,9 @@ def _create_client( ) -> Union[weaviate.WeaviateClient, weaviate.WeaviateAsyncClient]: """Internal method to create client based on async flag""" auth_config: Optional[weaviate.auth.AuthCredentials] = None + additional_config: Optional[AdditionalConfig] = ( + self.__additional_config_if_slow() + ) if "auth" in self.config: if self.config["auth"].get("type") == "user": @@ -133,6 +137,7 @@ def _create_client( grpc_port=self.config["grpc_port"], auth_credentials=auth_config, headers=headers, + additional_config=additional_config, ) if async_client: return weaviate.use_async_with_local(**common_kwargs) @@ -144,6 +149,7 @@ def _create_client( cluster_url=self.config["host"], auth_credentials=auth_config, headers=headers, + additional_config=additional_config, ) if async_client: try: @@ -189,6 +195,7 @@ def _create_client( grpc_port=grpc_port, auth_credentials=auth_config, headers=headers, + additional_config=additional_config, ) else: common_kwargs = dict( @@ -200,8 +207,25 @@ def _create_client( grpc_port=self.config["grpc_port"], auth_credentials=auth_config, headers=headers, + additional_config=additional_config, ) if async_client: return weaviate.use_async_with_custom(**common_kwargs) return weaviate.connect_to_custom(**common_kwargs) + + def __additional_config_if_slow(self) -> Optional[AdditionalConfig]: + """Return AdditionalConfig with doubled timeouts when SLOW_CONNECTION is set.""" + env_flag = os.getenv("SLOW_CONNECTION") + if not env_flag: + return None + if str(env_flag).strip().lower() not in {"1", "true", "yes", "on"}: + return None + defaults = AdditionalConfig().timeout + return AdditionalConfig( + timeout=WeaviateTimeout( + query=defaults.query * 2, + insert=defaults.insert * 2, + init=defaults.init * 2, + ) + ) diff --git a/weaviate_cli/managers/data_manager.py b/weaviate_cli/managers/data_manager.py index f7bd69e..3a1d7a0 100644 --- a/weaviate_cli/managers/data_manager.py +++ b/weaviate_cli/managers/data_manager.py @@ -1,37 +1,33 @@ -import click +import importlib.resources as resources import json -import numpy as np +import math import random -import os -from importlib import resources -from weaviate_cli.utils import get_random_string, pp_objects -from weaviate import WeaviateClient -from weaviate.classes.query import MetadataQuery -from weaviate.collections.classes.tenants import TenantActivityStatus -from typing import Dict, List, Optional, Union, Any, Tuple, Callable +import time +from collections import deque +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Union, Any, Tuple + +import click +import numpy as np import weaviate.classes.config as wvc +from faker import Faker +from weaviate import WeaviateClient from weaviate.classes.query import Filter +from weaviate.classes.query import MetadataQuery from weaviate.collections import Collection -from datetime import datetime, timedelta +from weaviate.collections.classes.tenants import TenantActivityStatus + from weaviate_cli.defaults import ( MAX_OBJECTS_PER_BATCH, QUERY_MAXIMUM_RESULTS, - CreateCollectionDefaults, + MAX_WORKERS, CreateDataDefaults, CreateTenantsDefaults, QueryDataDefaults, UpdateDataDefaults, DeleteDataDefaults, ) -import importlib.resources as resources -from pathlib import Path -import math -from faker import Faker -import multiprocessing -from functools import partial -import concurrent.futures -import time -from concurrent.futures import ThreadPoolExecutor +from weaviate_cli.utils import pp_objects PROPERTY_NAME_MAPPING = { "releaseDate": "release_date", @@ -40,13 +36,32 @@ "spokenLanguages": "spoken_languages", } + +class _ErrorTracker: + """ + Tracks total failures and a FIFO of up to N unique error messages. + Uniqueness is by message text; adjust `key = msg` if you want a stricter key. + """ + + def __init__(self, max_examples: int = 10) -> None: + self.total: int = 0 + self._seen: set = set() + self.examples: deque = deque(maxlen=max_examples) + + def add_failed_objects(self, failed_objects) -> None: + if not failed_objects: + return + for fo in failed_objects: + self.total += 1 + msg = getattr(fo, "message", "") or "" + key = msg # could be (msg, getattr(fo, "status_code", None)) + if key not in self._seen: + self._seen.add(key) + self.examples.append((getattr(fo, "original_uuid", None), msg)) + + # Constants for data generation optimization -CHUNK_SIZE = 10000 # Number of objects to generate per chunk/worker -MAX_WORKERS = min( - 32, multiprocessing.cpu_count() + 4 -) # Use more than CPU count to account for I/O-bound tasks -# Define movie-related data outside the class to be accessible by worker processes MOVIE_GENRES = [ "Action", "Adventure", @@ -152,7 +167,6 @@ STATUSES = ["Released", "In Production", "Post Production", "Planned", "Cancelled"] -# Standalone function for parallel processing def generate_movie_object(is_update: bool = False, seed: Optional[int] = None) -> Dict: """Generate a single movie object - standalone function for parallel processing""" # Set seed if provided for deterministic generation @@ -164,9 +178,7 @@ def generate_movie_object(is_update: bool = False, seed: Optional[int] = None) - fake = Faker() # Generate a realistic release date - date = datetime.now() - timedelta( - days=random.randint(0, 20 * 365) - ) # Random date in last 20 years + date = datetime.now() - timedelta(days=random.randint(0, 20 * 365)) release_date = date.strftime("%Y-%m-%dT%H:%M:%SZ") # Select random language codes for spoken languages @@ -213,15 +225,13 @@ def generate_movie_object(is_update: bool = False, seed: Optional[int] = None) - } -def generate_chunk(args) -> List[Dict]: - """Generate a chunk of movie objects - standalone function for multiprocessing""" - chunk_size, chunk_id, is_update, skip_seed = args - # Use different seeds for each worker to ensure unique random data - base_seed = 42 + chunk_id * 1000 if not skip_seed else None - return [ - generate_movie_object(is_update, base_seed + i if base_seed else None) - for i in range(chunk_size) - ] +def _streaming_generate_chunk(args) -> List[Dict]: + chunk_size, base_seed, start_index, is_update = args + results: List[Dict] = [] + for i in range(chunk_size): + seed = (base_seed + start_index + i) if base_seed is not None else None + results.append(generate_movie_object(is_update, seed)) + return results class DataManager: @@ -231,6 +241,328 @@ def __init__(self, client: WeaviateClient): # Seed the Faker instance for reproducibility Faker.seed(42) + def __producer_consumer_ingest( + self, + collection: Collection, + num_objects: int, + vectorizer: str, + vector_dimensions: int, + named_vectors: Optional[List[str]], + uuid: Optional[str], + dynamic_batch: bool, + batch_size: int, + concurrent_requests: int, + multi_vector: bool, + skip_seed: bool, + verbose: bool, + ) -> Tuple[int, List, _ErrorTracker]: + """Memory-safe producer→queue ingestion with two clear modes: + - dynamic_batch=True: Fast streaming generation via multiprocessing feeding a single dynamic batcher. + - dynamic_batch=False: Modest thread producers + single fixed_size batcher (which handles HTTP concurrency). + + Parameters + ---------- + collection : Collection + Target Weaviate collection into which the generated objects will be ingested. + num_objects : int + Total number of objects to create and send to the collection. Must be a + non-negative integer. + vectorizer : str + Vectorizer configuration of the collection. If set to ``"none"``, vectors + are generated client-side; otherwise vectors are expected to be computed + server-side and ``vector`` fields are omitted from the payload. + vector_dimensions : int + Dimensionality of the generated vector(s) when ``vectorizer == "none"``. + Should match the collection configuration for the relevant vectorizer(s). + named_vectors : Optional[List[str]] + Optional list of named vector keys to populate when generating client-side + vectors. If ``None``, a single unnamed vector is generated. If provided, + one vector per name is generated (or multiple per name when + ``multi_vector`` is ``True``). + uuid : Optional[str] + Optional fixed UUID to assign to all generated objects. If ``None``, UUIDs + are generated automatically per object. + dynamic_batch : bool + When ``True``, use a single dynamically-sized batcher fed by a fast + multiprocessing producer. When ``False``, use modest thread-based + producers with a fixed-size batcher that manages HTTP concurrency. + batch_size : int + Target number of objects per batch when ``dynamic_batch`` is ``False``. + Must be a positive integer. In dynamic mode this may still serve as a + hint or upper bound for batch sizing, depending on the batcher + implementation. + concurrent_requests : int + Maximum number of concurrent HTTP requests the batcher is allowed to + issue. Must be a positive integer. Higher values increase throughput at + the cost of additional client and server resource usage. + multi_vector : bool + If ``True`` and ``named_vectors`` is provided, generates multiple vectors + per named vector (e.g. a list of vectors for each name). If ``False``, + generates a single vector per named vector or a single unnamed vector. + skip_seed : bool + If ``True``, do not fix the random seed, resulting in non-deterministic + object and vector generation across runs. If ``False``, a base seed is + used for reproducible generation. + verbose : bool + If ``True``, enable more verbose progress and error reporting during the + ingestion process. + Returns + ------- + Tuple[int, List, _ErrorTracker] + A tuple containing: + - the number of successfully ingested objects, + - a list of objects that failed ingestion (if any), + - an internal error tracker with details and examples of failures. + """ + from queue import Queue, Empty + import threading + + failed_objects: List = [] + error_tracker = _ErrorTracker(max_examples=10) + + # Early return for zero objects to prevent hang in fixed-size mode + if num_objects <= 0: + return 0, failed_objects, error_tracker + + # Shared helper: build vector payload according to vectorizer configuration + def build_vector() -> ( + Optional[ + Union[List[float], Dict[str, List[float]], Dict[str, List[List[float]]]] + ] + ): + if vectorizer != "none": + return None + if multi_vector and named_vectors and len(named_vectors) > 0: + return { + named_vectors[0]: [ + (2 * np.random.rand(vector_dimensions) - 1).tolist(), + (2 * np.random.rand(vector_dimensions) - 1).tolist(), + ] + } + if named_vectors is None: + return (2 * np.random.rand(vector_dimensions) - 1).tolist() + return { + name: (2 * np.random.rand(vector_dimensions) - 1).tolist() + for name in named_vectors + } + + base_seed: Optional[int] = 42 if not skip_seed else None + + # --- Dynamic mode: multiprocessing producer → single dynamic batch consumer --- + if dynamic_batch: + import multiprocessing as mp + + gen_chunk_size = max(2000, batch_size * 10) + max_prefetch_chunks = 4 + producer_processes = min( + max(1, (mp.cpu_count() or 4)), concurrent_requests, 16 + ) + if verbose: + print( + f"Dynamic mode: streaming with {producer_processes} generator processes, chunk_size={gen_chunk_size}, prefetch={max_prefetch_chunks}" + ) + + q: Queue[Optional[List[Dict]]] = Queue(maxsize=max_prefetch_chunks) + consumed = 0 + consumed_lock = threading.Lock() + feeder_error: Optional[Exception] = None + feeder_error_lock = threading.Lock() + + num_chunks = math.ceil(num_objects / gen_chunk_size) + task_args: List[Tuple[int, Optional[int], int, bool]] = [] + for chunk_idx in range(num_chunks): + start_index = chunk_idx * gen_chunk_size + size = min(gen_chunk_size, num_objects - start_index) + if size > 0: + task_args.append((size, base_seed, start_index, False)) + + def feeder() -> None: + nonlocal feeder_error + try: + with mp.Pool(processes=producer_processes) as pool: + for chunk in pool.imap(_streaming_generate_chunk, task_args): + q.put(chunk) + except Exception as e: + with feeder_error_lock: + feeder_error = e + if verbose: + click.echo(f"Error in feeder thread: {e}", err=True) + finally: + # Always signal completion to the consumer, even on error. + try: + q.put(None) + except Exception: + # Best-effort; if the queue is unavailable, just ignore. + pass + + def consumer() -> None: + nonlocal consumed + start_time = time.time() + last_log = start_time + with collection.batch.dynamic() as batch: + while True: + try: + chunk = q.get(timeout=0.5) + except Empty: + continue + if chunk is None: + break + for item in chunk: + vec = build_vector() + if vec is None: + batch.add_object(properties=item, uuid=uuid) + else: + batch.add_object(properties=item, uuid=uuid, vector=vec) + with consumed_lock: + consumed += 1 + if verbose and time.time() - last_log >= 2.0: + elapsed = time.time() - start_time + with consumed_lock: + current_consumed = consumed + qps = current_consumed / elapsed if elapsed > 0 else 0 + print( + f"Submitted {current_consumed}/{num_objects} (~{qps:.0f} obj/s), chunks_in_queue={q.qsize()}" + ) + last_log = time.time() + + # After context manager, best-effort failed_objects + if getattr(collection.batch, "failed_objects", None): + failed_objects.extend(collection.batch.failed_objects) + error_tracker.add_failed_objects(collection.batch.failed_objects) + + feeder_t = threading.Thread(target=feeder, daemon=True) + consumer_t = threading.Thread(target=consumer, daemon=True) + feeder_t.start() + consumer_t.start() + feeder_t.join() + consumer_t.join() + + # Report any feeder errors that occurred + if feeder_error: + error_msg = ( + "Feeder thread encountered an exception during data generation" + ) + if verbose: + click.echo(f"{error_msg}: {feeder_error}", err=True) + else: + click.echo(f"{error_msg}. Use --verbose for details.", err=True) + # Add to error tracker for consistency + error_tracker.total += 1 + error_tracker.examples.append((None, str(feeder_error))) + + return consumed, failed_objects, error_tracker + + # --- Fixed-size mode --- + q: Queue[Optional[Dict]] = Queue(maxsize=max(1, batch_size * 20)) + consumed = 0 + consumed_lock = threading.Lock() + producer_errors: List[Exception] = [] + producer_errors_lock = threading.Lock() + + producer_threads = min(4, max(1, num_objects // max(1, batch_size * 10))) + if verbose: + print( + f"Fixed-size mode: {producer_threads} producers, 1 consumer; batch_size={batch_size}, concurrent_requests={concurrent_requests}" + ) + + ranges: List[Tuple[int, int]] = [] + base = num_objects // producer_threads + remainder = num_objects % producer_threads + start_idx = 0 + for i in range(producer_threads): + end_idx = start_idx + base + (1 if i < remainder else 0) + if end_idx > start_idx: + ranges.append((start_idx, end_idx)) + start_idx = end_idx + + def producer(lo: int, hi: int) -> None: + try: + for i in range(lo, hi): + seed = (base_seed + i) if base_seed is not None else None + item = self.__generate_single_object(is_update=False, seed=seed) + q.put(item) + except Exception as e: + with producer_errors_lock: + producer_errors.append(e) + if verbose: + click.echo( + f"Error in producer thread (range {lo}-{hi}): {e}", + err=True, + ) + finally: + # Always signal completion to the consumer, even on error. + try: + q.put(None) + except Exception: + # Best-effort; if the queue is unavailable, just ignore. + pass + + def consumer_fixed() -> None: + nonlocal consumed + start_time = time.time() + last_log = start_time + sentinels_received = 0 + with collection.batch.fixed_size( + batch_size=batch_size, concurrent_requests=max(1, concurrent_requests) + ) as batch: + while True: + try: + item = q.get(timeout=0.25) + except Empty: + continue + if item is None: + sentinels_received += 1 + if sentinels_received >= len(ranges): + break + continue + vec = build_vector() + if vec is None: + batch.add_object(properties=item, uuid=uuid) + else: + batch.add_object(properties=item, uuid=uuid, vector=vec) + with consumed_lock: + consumed += 1 + if verbose and time.time() - last_log >= 2.0: + elapsed = time.time() - start_time + with consumed_lock: + current_consumed = consumed + qps = current_consumed / elapsed if elapsed > 0 else 0 + print( + f"Submitted {current_consumed}/{num_objects} (~{qps:.0f} obj/s), queue={q.qsize()}" + ) + last_log = time.time() + + if getattr(collection.batch, "failed_objects", None): + failed_objects.extend(collection.batch.failed_objects) + error_tracker.add_failed_objects(collection.batch.failed_objects) # NEW + + prod_threads = [ + threading.Thread(target=producer, args=(lo, hi), daemon=True) + for idx, (lo, hi) in enumerate(ranges) + ] + for t in prod_threads: + t.start() + cons_thread = threading.Thread(target=consumer_fixed, daemon=True) + cons_thread.start() + for t in prod_threads: + t.join() + cons_thread.join() + + # Report any producer errors that occurred + if producer_errors: + error_msg = f"Producer errors occurred: {len(producer_errors)} producer thread(s) encountered exceptions" + if verbose: + for i, err in enumerate(producer_errors, 1): + click.echo(f" Producer error {i}: {err}", err=True) + else: + click.echo(f"{error_msg}. Use --verbose for details.", err=True) + # Add to error tracker for consistency + for err in producer_errors: + error_tracker.total += 1 + error_tracker.examples.append((None, str(err))) + + return consumed, failed_objects, error_tracker + def __import_json( self, collection: Collection, @@ -296,140 +628,11 @@ def __convert_property_value(self, value: Any, data_type: wvc.DataType) -> Any: return date.strftime("%Y-%m-%dT%H:%M:%SZ") return value - def __generate_single_object(self, is_update: bool = False) -> Dict: + def __generate_single_object( + self, is_update: bool = False, seed: Optional[int] = None + ) -> Dict: """Method to generate a single object for non-parallel use cases""" - return generate_movie_object(is_update) - - def __generate_data_object( - self, - limit: int, - skip_seed: bool, - is_update: bool = False, - verbose: bool = False, - ) -> Union[List[Dict], Dict]: - """Generate data objects, using parallel processing for large batches""" - - # For small numbers, don't bother with parallel processing - if limit <= 1000: - return [ - self.__generate_single_object(is_update=is_update) for _ in range(limit) - ] - - # For large numbers, use parallel processing - start_time = time.time() - - # Calculate chunks - num_chunks = math.ceil(limit / CHUNK_SIZE) - chunk_sizes = [CHUNK_SIZE] * (num_chunks - 1) - # The last chunk might be smaller - chunk_sizes.append(limit - sum(chunk_sizes)) - - # Prepare arguments for worker processes - task_args = [ - (size, i, is_update, skip_seed) - for i, size in enumerate(chunk_sizes) - if size > 0 - ] - - # Use multiprocessing Pool to avoid pickling issues with ProcessPoolExecutor - results = [] - with multiprocessing.Pool(processes=MAX_WORKERS) as pool: - # Map the generation function to the chunk arguments - for i, chunk_result in enumerate( - pool.imap_unordered(generate_chunk, task_args) - ): - results.extend(chunk_result) - # Report progress periodically - current_count = len(results) - progress = current_count / limit * 100 - if verbose and ( - i % max(1, num_chunks // 10) == 0 or i == len(task_args) - 1 - ): - print( - f"Data generation progress: {progress:.1f}% ({current_count}/{limit})" - ) - - elapsed = time.time() - start_time - if verbose: - print( - f"Generated {len(results)} objects in {elapsed:.2f} seconds ({len(results)/elapsed:.0f} objects/second)" - ) - - return results - - def __ingest_batch( - self, - batch_objects: List[Dict], - collection: Collection, - vectorizer: str, - vector_dimensions: int, - named_vectors: Optional[List[str]], - uuid: Optional[str], - batch_index: int = 0, - verbose: bool = False, - multi_vector: bool = False, - ) -> Tuple[int, List]: - """Process a single batch of objects for ingestion""" - counter = 0 - failed_objects = [] - batch_size = len(batch_objects) - - if verbose: - print(f"Starting batch {batch_index+1} with {batch_size} objects") - - log_interval = max(1, batch_size // 5) # Log 5 times per batch - start_time = time.time() - - with collection.batch.dynamic() as batch: - for i, obj in enumerate(batch_objects): - if vectorizer == "none": - if multi_vector: - vector = { - named_vectors[0]: [ - (2 * np.random.rand(vector_dimensions) - 1).tolist(), - (2 * np.random.rand(vector_dimensions) - 1).tolist(), - ] - } - else: - # Generate vector(s) for the object - if named_vectors is None: - vector = ( - 2 * np.random.rand(vector_dimensions) - 1 - ).tolist() - else: - vector = { - name: ( - 2 * np.random.rand(vector_dimensions) - 1 - ).tolist() - for name in named_vectors - } - batch.add_object(properties=obj, uuid=uuid, vector=vector) - else: - # Let the vectorizer generate the vector - batch.add_object(properties=obj, uuid=uuid) - counter += 1 - - # Log progress periodically - if verbose and (i + 1) % log_interval == 0: - progress = (i + 1) / batch_size * 100 - elapsed = time.time() - start_time - rate = (i + 1) / elapsed if elapsed > 0 else 0 - print( - f"Batch {batch_index+1}: {progress:.1f}% ({i+1}/{batch_size}) at {rate:.1f} objects/second" - ) - - # Collect failed objects - if collection.batch.failed_objects: - failed_objects.extend(batch.failed_objects) - - if verbose: - total_elapsed = time.time() - start_time - rate = batch_size / total_elapsed if total_elapsed > 0 else 0 - print( - f"Completed batch {batch_index+1}: {counter} objects in {total_elapsed:.2f}s ({rate:.1f} objects/second)" - ) - - return counter, failed_objects + return generate_movie_object(is_update, seed) def __ingest_data( self, @@ -442,140 +645,59 @@ def __ingest_data( uuid: Optional[str] = None, verbose: bool = False, multi_vector: bool = False, + dynamic_batch: bool = False, + batch_size: int = 1000, + concurrent_requests: int = MAX_WORKERS, ) -> Collection: if randomize: - click.echo(f"Generating {num_objects} objects") + click.echo(f"Generating and ingesting {num_objects} objects") start_time = time.time() - # Determine vector dimensions based on vectorizer + # Determine vectorizer setup config = collection.config.get() if not config.vectorizer and config.vector_config: - # Named vectors named_vectors = list(config.vector_config.keys()) vectorizer = config.vector_config[ named_vectors[0] ].vectorizer.vectorizer elif config.vectorizer: - # Standard vectorizer vectorizer = config.vectorizer named_vectors = None + else: + vectorizer = "none" + named_vectors = None - # Generate all the data objects - data_objects = self.__generate_data_object( - num_objects, skip_seed, verbose=verbose - ) - if verbose: - print(f"Data generation complete. Beginning ingestion...") - - # Get collection with consistency level cl_collection = collection.with_consistency_level(cl) - # Determine if parallel processing makes sense based on dataset size - MIN_OBJECTS_FOR_PARALLEL = ( - 1000 # Only use parallel for datasets >= this size + # Single consumer that feeds the batcher; batcher does its own HTTP parallelism + counter, failed_objects, error_tracker = self.__producer_consumer_ingest( + collection=cl_collection, + num_objects=num_objects, + vectorizer=vectorizer, + vector_dimensions=vector_dimensions or 1536, + named_vectors=named_vectors, + uuid=uuid, + dynamic_batch=dynamic_batch, + batch_size=batch_size, + concurrent_requests=concurrent_requests, + multi_vector=multi_vector, + skip_seed=skip_seed, + verbose=verbose, ) - MIN_BATCH_SIZE = 100 # Minimum batch size for each worker - # For small datasets, use a single batch - if num_objects < MIN_OBJECTS_FOR_PARALLEL: - if verbose: - print( - f"Processing {num_objects} objects in a single batch (small dataset)" - ) - - counter, failed_objects = self.__ingest_batch( - batch_objects=data_objects, - collection=cl_collection, - vectorizer=vectorizer, - vector_dimensions=vector_dimensions, - named_vectors=named_vectors, - uuid=uuid, - batch_index=0, - verbose=verbose, - multi_vector=multi_vector, - ) - - # Handle any failed objects - if failed_objects: - for failed_object in failed_objects: - print( - f"Failed to add object with UUID {failed_object.original_uuid}: {failed_object.message}" - ) - else: - # Use parallel batch insertion with progress tracking for larger datasets - counter = 0 - - # Calculate optimal number of workers based on dataset size - # Ensure each batch is at least MIN_BATCH_SIZE - max_possible_workers = max(1, num_objects // MIN_BATCH_SIZE) - num_workers = min( - MAX_WORKERS, max_possible_workers, 16 - ) # Cap at 16 for network connections - - # Calculate batch size to ensure each worker gets a reasonable amount of work - batch_size = max(MIN_BATCH_SIZE, num_objects // num_workers) - - # Create batches - batches = [ - data_objects[i : i + batch_size] - for i in range(0, len(data_objects), batch_size) - ] - - if verbose: - print( - f"Processing {len(data_objects)} objects in {len(batches)} batches using {num_workers} workers" - f" (batch size: {batch_size})" - ) - - all_failed_objects = [] - completed_jobs = 0 - - # Process batches in parallel - with ThreadPoolExecutor(max_workers=num_workers) as executor: - futures = [] - for i, batch in enumerate(batches): - futures.append( - executor.submit( - self.__ingest_batch, - batch_objects=batch, - collection=cl_collection, - vectorizer=vectorizer, - vector_dimensions=vector_dimensions, - multi_vector=multi_vector, - named_vectors=named_vectors, - uuid=uuid, - batch_index=i, - verbose=verbose, - ) - ) - - # Process results as they complete - for future in concurrent.futures.as_completed(futures): - batch_count, batch_failed = future.result() - counter += batch_count - all_failed_objects.extend(batch_failed) - - completed_jobs += 1 - if verbose: - progress = completed_jobs / len(batches) * 100 - elapsed = time.time() - start_time - rate = counter / elapsed if elapsed > 0 else 0 - print( - f"Overall progress: {progress:.1f}% ({completed_jobs}/{len(batches)} batches, {counter}/{num_objects} objects) at {rate:.1f} objects/second" - ) - - # Handle any failed objects - if all_failed_objects: - for failed_object in all_failed_objects: - print( - f"Failed to add object with UUID {failed_object.original_uuid}: {failed_object.message}" - ) + # New compact failure summary (replaces old block) + if error_tracker.total > 0: + print(f"Encountered {error_tracker.total} total errors.") + print("Showing up to 10 unique error examples:") + for idx, (orig_uuid, msg) in enumerate(error_tracker.examples, start=1): + uuid_str = f"{orig_uuid}" if orig_uuid else "N/A" + print(f"{idx:2d}. UUID {uuid_str}: {msg}") total_elapsed = time.time() - start_time print( f"Inserted {counter} objects into class '{collection.name}'" + ( - f" in {total_elapsed:.2f} seconds ({counter/total_elapsed:.1f} objects/second)" + f" in {total_elapsed:.2f} seconds ({counter / total_elapsed:.1f} objects/second)" if verbose else "" ) @@ -606,10 +728,12 @@ def create_data( wait_for_indexing: bool = CreateDataDefaults.wait_for_indexing, verbose: bool = CreateDataDefaults.verbose, multi_vector: bool = CreateDataDefaults.multi_vector, + dynamic_batch: bool = CreateDataDefaults.dynamic_batch, + batch_size: int = CreateDataDefaults.batch_size, + concurrent_requests: int = MAX_WORKERS, ) -> Collection: if not self.client.collections.exists(collection): - raise Exception( f"Class '{collection}' does not exist in Weaviate. Create first using command" ) @@ -648,7 +772,6 @@ def create_data( col.config.get().multi_tenancy_config.auto_tenant_creation ) if auto_tenants > 0 and auto_tenants_enabled is False: - raise Exception( f"Auto tenant creation is not enabled for class '{col.name}'. Please enable it using command" ) @@ -664,7 +787,7 @@ def create_data( raise Exception( f"Either --tenants or --auto_tenants must be provided, not both." ) - if existing_tenants == "None": + if existing_tenants == ["None"]: tenants = [f"{tenant_suffix}-{i}" for i in range(1, auto_tenants + 1)] else: if len(existing_tenants) < auto_tenants: @@ -690,6 +813,9 @@ def create_data( uuid=uuid, verbose=verbose, multi_vector=multi_vector, + dynamic_batch=dynamic_batch, + batch_size=batch_size, + concurrent_requests=concurrent_requests, ) after_length = len(col) else: @@ -721,6 +847,9 @@ def create_data( uuid=uuid, verbose=verbose, multi_vector=multi_vector, + dynamic_batch=dynamic_batch, + batch_size=batch_size, + concurrent_requests=concurrent_requests, ) after_length = len(col.with_tenant(tenant)) if wait_for_indexing: @@ -743,19 +872,16 @@ def __update_data( """Update objects in the collection, either with random data or incremental changes.""" if not skip_seed: - # Set a seed for reproducible random sampling random.seed(42) start_time = time.time() cl_collection = collection.with_consistency_level(cl) total_updated = 0 - # Get total collection size collection_size = len(collection) if verbose: print(f"Collection '{collection.name}' contains {collection_size} objects") - # Determine if we need random offsets (partial update) or sequential (full update) use_random_offsets = num_objects < collection_size if use_random_offsets and verbose: @@ -763,70 +889,58 @@ def __update_data( f"Updating a random subset of {num_objects} objects from a total of {collection_size}" ) - # Generate a single random starting offset for the entire update operation if needed random_offset = None if use_random_offsets: - # Set a consistent seed for reproducibility - # Calculate the maximum safe starting offset - # the client has problem with large offsets, so we limit it to 100000 max_start_offset = ( collection_size - num_objects if (collection_size - num_objects) < QUERY_MAXIMUM_RESULTS else QUERY_MAXIMUM_RESULTS ) if max_start_offset < 0: - # If we're requesting more objects than exist, adjust num_objects = collection_size random_offset = 0 else: - # Generate a random starting point random_offset = random.randint(0, max_start_offset) if verbose: print(f"Using random offset {random_offset} for update operation") - # Process in batches to avoid GRPC message size limits iterations = math.ceil(num_objects / MAX_OBJECTS_PER_BATCH) - # Determine vector dimensions for random vector generation vector_dimensions = 1536 - # Retrieve the vector size from the first object in the collection object_with_vector = collection.query.fetch_objects( limit=1, include_vector=True ) if len(object_with_vector.objects) > 0: - vector_dimensions = len( - object_with_vector.objects[0].vector["default"] - if "default" in object_with_vector.objects[0].vector - else object_with_vector.objects[0].vector - ) + vec = object_with_vector.objects[0].vector + if isinstance(vec, dict): + first_vec = next(iter(vec.values())) + vector_dimensions = ( + len(first_vec) if isinstance(first_vec, list) else len(first_vec[0]) + ) + else: + vector_dimensions = len(vec) for i in range(iterations): - # Determine how many objects to fetch in this batch batch_size = min(MAX_OBJECTS_PER_BATCH, num_objects - total_updated) if batch_size <= 0: break - # Calculate the offset for this batch if use_random_offsets: - # For random updates, use the random offset plus any additional offset for subsequent batches offset = random_offset + (i * MAX_OBJECTS_PER_BATCH) if verbose: print( - f"Fetching batch {i+1}/{iterations} ({batch_size} objects, offset: {offset})" + f"Fetching batch {i + 1}/{iterations} ({batch_size} objects, offset: {offset})" ) else: - # For sequential updates (full collection), just use sequential offsets offset = i * batch_size if verbose: print( - f"Fetching batch {i+1}/{iterations} ({batch_size} objects, offset: {offset})" + f"Fetching batch {i + 1}/{iterations} ({batch_size} objects, offset: {offset})" ) - # Fetch current batch of objects - one simple API call res = collection.query.fetch_objects(limit=batch_size, offset=offset) data_objects = res.objects - # Check if we got any objects if not data_objects: if total_updated == 0: print( @@ -841,34 +955,20 @@ def __update_data( batch_count = len(data_objects) - # Process this batch based on strategy (random or incremental) if randomize: if i == 0 and verbose: print(f"Updating objects with random data...") - # Generate new properties for this batch - # For small batches, simple generation is fine, for larger we can use parallel - if batch_count > 100: - random_objects = self.__generate_data_object( - limit=batch_count, - skip_seed=skip_seed, - is_update=True, - verbose=False, - ) - else: - random_objects = [ - self.__generate_single_object(is_update=True) - for _ in range(batch_count) - ] + random_objects = [ + self.__generate_single_object(is_update=True) + for _ in range(batch_count) + ] - # Process each update in current batch for j, (obj, updated_props) in enumerate( zip(data_objects, random_objects) ): - # Generate a random vector (not named) random_vector = np.random.rand(vector_dimensions).tolist() - # Replace the object cl_collection.data.replace( uuid=obj.uuid, properties=updated_props, @@ -876,7 +976,6 @@ def __update_data( ) total_updated += 1 - # Report progress at intervals if verbose and (j + 1) % max(1, batch_count // 5) == 0: batch_progress = (j + 1) / batch_count * 100 total_progress = total_updated / num_objects * 100 @@ -886,12 +985,10 @@ def __update_data( f"Batch progress: {batch_progress:.1f}%, Overall: {total_progress:.1f}% ({total_updated}/{num_objects}), speed: {rate:.1f} objects/second" ) else: - # For non-random updates if i == 0 and verbose: print(f"Updating objects with incremental changes...") for j, obj in enumerate(data_objects): - # Update existing object properties for property, value in obj.properties.items(): if isinstance(value, str): obj.properties[property] = "updated-" + value @@ -902,14 +999,12 @@ def __update_data( elif isinstance(value, datetime): obj.properties[property] = value + timedelta(days=1) - # Update the object cl_collection.data.update( uuid=obj.uuid, properties=obj.properties, ) total_updated += 1 - # Report progress at intervals if verbose and (j + 1) % max(1, batch_count // 5) == 0: batch_progress = (j + 1) / batch_count * 100 total_progress = total_updated / num_objects * 100 @@ -919,11 +1014,9 @@ def __update_data( f"Batch progress: {batch_progress:.1f}%, Overall: {total_progress:.1f}% ({total_updated}/{num_objects}), speed: {rate:.1f} objects/second" ) - # If we've processed fewer objects than the batch size, there might not be any more objects if not use_random_offsets and batch_count < batch_size: break - # Report completion if total_updated < num_objects: print( f"Warning: Only found {total_updated} objects to update, less than the requested {num_objects}" @@ -933,7 +1026,7 @@ def __update_data( print( f"Updated {total_updated} objects in class '{collection.name}'" + ( - f" in {total_elapsed:.2f} seconds ({total_updated/total_elapsed:.1f} objects/second)" + f" in {total_elapsed:.2f} seconds ({total_updated / total_elapsed:.1f} objects/second)" if verbose else "" ) @@ -952,7 +1045,6 @@ def update_data( ) -> None: if not self.client.collections.exists(collection): - raise Exception( f"Class '{collection}' does not exist in Weaviate. Create first using ./create_class.py" ) @@ -961,7 +1053,6 @@ def update_data( try: tenants = [key for key in col.tenants.get().keys()] except Exception as e: - # Check if the error is due to multi-tenancy being disabled if "multi-tenancy is not enabled" in str(e): click.echo( f"Collection '{col.name}' does not have multi-tenancy enabled. Skipping tenant information collection." @@ -978,12 +1069,7 @@ def update_data( for tenant in tenants: if tenant == "None": ret = self.__update_data( - col, - limit, - cl_map[consistency_level], - randomize, - skip_seed, - verbose, + col, limit, cl_map[consistency_level], randomize, skip_seed, verbose ) else: click.echo(f"Processing tenant '{tenant}'") @@ -996,7 +1082,6 @@ def update_data( verbose, ) if ret == -1: - raise Exception( f"Failed to update objects in class '{col.name}' for tenant '{tenant}'" ) @@ -1020,8 +1105,6 @@ def __delete_data( ) return 1 - # Calculate the number of full batches and handle any remaining objects - # Use math.ceil to ensure we process all objects, even if num_objects < MAX_OBJECTS_PER_BATCH start_time = time.time() iterations = math.ceil(num_objects / MAX_OBJECTS_PER_BATCH) deleted_objects = 0 @@ -1032,13 +1115,12 @@ def __delete_data( ) for i in range(iterations): - # Determine how many objects to fetch in this batch batch_size = min(MAX_OBJECTS_PER_BATCH, num_objects - deleted_objects) if batch_size <= 0: break if verbose: - print(f"Fetching batch {i+1}/{iterations} ({batch_size} objects)") + print(f"Fetching batch {i + 1}/{iterations} ({batch_size} objects)") res = collection.query.fetch_objects(limit=batch_size) if len(res.objects) == 0: @@ -1056,7 +1138,6 @@ def __delete_data( deleted_objects += len(ids) - # Progress reporting if verbose if verbose: progress = min(100, (deleted_objects / num_objects) * 100) elapsed = time.time() - start_time @@ -1066,7 +1147,6 @@ def __delete_data( + f"batch of {len(ids)} deleted in {batch_elapsed:.2f}s (rate: {rate:.1f} objects/second)" ) - # If we've deleted fewer objects than expected, there might not be any more objects if len(ids) < batch_size: break @@ -1074,7 +1154,7 @@ def __delete_data( print( f"Deleted {deleted_objects} objects from class '{collection.name}'" + ( - f" in {total_elapsed:.2f} seconds ({deleted_objects/total_elapsed:.1f} objects/second)" + f" in {total_elapsed:.2f} seconds ({deleted_objects / total_elapsed:.1f} objects/second)" if verbose else "" ) @@ -1092,12 +1172,10 @@ def delete_data( ) -> None: if not self.client.collections.exists(collection): - print( + raise Exception( f"Class '{collection}' does not exist in Weaviate. Create first using command." ) - return 1 - col: Collection = self.client.collections.get(collection) mt_enabled = col.config.get().multi_tenancy_config.enabled if mt_enabled: @@ -1111,14 +1189,11 @@ def delete_data( "one": wvc.ConsistencyLevel.ONE, } - if tenants_list is not None: - tenants = tenants_list - else: - tenants = existing_tenants + tenants = tenants_list if tenants_list is not None else existing_tenants for tenant in tenants: if tenant == "None": - ret = self.__delete_data( + ret = self.__delete_data( # NOTE: call the correct delete impl col, limit, cl_map[consistency_level], uuid, verbose ) else: @@ -1131,7 +1206,6 @@ def delete_data( verbose, ) if ret == -1: - raise Exception( f"Failed to delete objects in class '{col.name}' for tenant '{tenant}'" ) @@ -1150,12 +1224,10 @@ def __query_data( start_time = datetime.now() response = None if search_type == "fetch": - # Fetch logic response = collection.with_consistency_level(cl).query.fetch_objects( limit=num_objects ) elif search_type == "vector": - # Vector logic response = collection.with_consistency_level(cl).query.near_text( query=query, return_metadata=MetadataQuery(distance=True, certainty=True), @@ -1163,14 +1235,13 @@ def __query_data( target_vector=target_vector, ) elif search_type == "keyword": - # Keyword logic response = collection.with_consistency_level(cl).query.bm25( query=query, + return_objects=True, return_metadata=MetadataQuery(score=True, explain_score=True), limit=num_objects, ) elif search_type == "hybrid": - # Hybrid logic response = collection.with_consistency_level(cl).query.hybrid( query=query, return_metadata=MetadataQuery(score=True), @@ -1178,12 +1249,10 @@ def __query_data( target_vector=target_vector, ) elif search_type == "uuid": - # UUID logic num_objects = 1 response = collection.with_consistency_level(cl).query.fetch_object_by_id( uuid=query ) - else: click.echo( f"Invalid search type: {search_type}. Please choose from 'fetch', 'vector', 'keyword', or 'hybrid'." @@ -1217,7 +1286,6 @@ def query_data( ) -> None: if not self.client.collections.exists(collection): - raise Exception( f"Class '{collection}' does not exist in Weaviate. Create first using command." ) @@ -1227,17 +1295,29 @@ def query_data( if mt_enabled: if tenants is not None: tenants_with_status = col.tenants.get_by_names(tenants.split(",")) - existing_tenants = [ - tenant - for tenant, status in tenants_with_status.items() - if status.activity_status == TenantActivityStatus.ACTIVE - ] + if col.config.get().multi_tenancy_config.auto_tenant_activation: + # When auto_tenant_activation is enabled, Weaviate is expected to automatically + # activate tenants on query. We therefore include all tenants here regardless + # of their current activity status. If the server is configured not to auto- + # activate tenants, querying an inactive tenant may fail. + existing_tenants = [tenant for tenant in tenants_with_status.keys()] + else: + existing_tenants = [ + tenant + for tenant, status in tenants_with_status.items() + if status.activity_status == TenantActivityStatus.ACTIVE + ] else: - existing_tenants = [ - key - for key, tenant in col.tenants.get().items() - if tenant.activity_status == TenantActivityStatus.ACTIVE - ] + tenants_with_status = col.tenants.get() + if col.config.get().multi_tenancy_config.auto_tenant_activation: + # See comment above: when auto_tenant_activation is enabled, include all tenants. + existing_tenants = [tenant for tenant in tenants_with_status.keys()] + else: + existing_tenants = [ + tenant + for tenant, status in tenants_with_status.items() + if status.activity_status == TenantActivityStatus.ACTIVE + ] else: if tenants is not None: raise Exception( @@ -1274,7 +1354,6 @@ def query_data( target_vector, ) if ret == -1: - raise Exception( f"Failed to query objects in class '{col.name}' for tenant '{tenant}'" )