diff --git a/README.md b/README.md index 75ac7e9..ec3c359 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ A powerful command-line interface for managing and interacting with Weaviate vector databases directly from your terminal. ## Key Features -- **Collections**: Create, update, delete and get collection configurations +- **Collections**: Create, batch, update, delete and get collection configurations - **Data Management**: Import, query, update and delete data with various search types (vector, keyword, hybrid) - **Multi-tenancy**: Manage tenants and their states across collections - **Backup & Restore**: Create and restore backups with support for S3, GCS and filesystem @@ -40,6 +40,9 @@ weaviate-cli create collection --collection movies --vectorizer transformers # Import test data weaviate-cli create data --collection movies --limit 1000 +# Batch data import from local json file +weaviate-cli batch insert --collection --path --vectorizer --replication-factor 3 + # Query data weaviate-cli query data --collection movies --search-type hybrid --query "action movies" ``` @@ -47,6 +50,7 @@ weaviate-cli query data --collection movies --search-type hybrid --query "action ## Core Commands - **create**: Create collections, tenants, backups or import data +- **batch**: batch import a collection from a local file - **delete**: Remove collections, tenants or data - **update**: Modify collection settings, tenant states or data - **get**: Retrieve collection info, tenant details or shard status diff --git a/cli.py b/cli.py index a61fdf9..da36bd0 100644 --- a/cli.py +++ b/cli.py @@ -11,6 +11,7 @@ from weaviate_cli.commands.cancel import cancel from weaviate_cli.commands.assign import assign from weaviate_cli.commands.revoke import revoke +from weaviate_cli.commands.batch import batch from weaviate_cli import __version__ @@ -62,6 +63,7 @@ def main(ctx: click.Context, config_file: Optional[str], user: Optional[str]): main.add_command(cancel) main.add_command(assign) main.add_command(revoke) +main.add_command(batch) if __name__ == "__main__": main() diff --git a/weaviate_cli/commands/batch.py b/weaviate_cli/commands/batch.py new file mode 100644 index 0000000..e5c9050 --- /dev/null +++ b/weaviate_cli/commands/batch.py @@ -0,0 +1,103 @@ +import click +import sys +import os +import json +from weaviate_cli.utils import get_client_from_context +from weaviate.exceptions import WeaviateConnectionError +from weaviate_cli.defaults import CreateCollectionDefaults +from weaviate_cli.managers.batch_manager import BatchManager + + +@click.group() +def batch() -> None: + """Batch operations in Weaviate.""" + pass + + +@batch.command("insert") +@click.option( + "--collection", + required=True, + help="The name of the collection (class) to insert data into.", +) +@click.option( + "--path", + required=True, + type=str, + help="Path to the JSON file containing the data.", +) +@click.option( + "--vectorizer", + default=CreateCollectionDefaults.vectorizer, + type=click.Choice( + ["contextionary", "transformers", "openai", "ollama", "cohere", "jinaai"] + ), + help="Vectorizer to use.", +) +@click.option( + "--shards", + default=1, + help="Number of shards for the collection (default: 1).", +) +@click.option( + "--replication-factor", + default=1, + help="Replication factor for the collection (default: 1).", +) +@click.pass_context +def batch_insert_cli(ctx, collection, path, vectorizer, shards, replication_factor): + """ + Insert data into a Weaviate collection (class) in batch mode. + """ + # Validate the file path and extension + if not os.path.isfile(path): + click.echo(f"Error: The file {path} does not exist.") + sys.exit(1) + if not path.endswith(".json"): + click.echo("Error: The file must have a .json extension.") + sys.exit(1) + + # Load the JSON data + try: + with open(path, "r") as file: + data = json.load(file) + except json.JSONDecodeError: + click.echo(f"Error: The file {path} is not a valid JSON file.") + sys.exit(1) + + # Validate JSON structure + if not isinstance(data, list) or not all(isinstance(obj, dict) for obj in data): + click.echo( + "Error: The JSON file must contain a list of objects (e.g., [{...}, {...}])." + ) + sys.exit(1) + + # Initialize the Weaviate client + client = None + try: + client = get_client_from_context(ctx) + batch_manager = BatchManager(client) + + # Create the collection (if it doesn't exist) + click.echo(f"Ensuring collection '{collection}' exists...") + batch_manager.create_collection( + collection=collection, + vectorizer=vectorizer, + shards=shards, + replication_factor=replication_factor, + force_auto_schema=True, + ) + + # Insert the data in batch mode + click.echo(f"Inserting data into collection '{collection}'...") + batch_manager.batch_insert(collection, data) + + except WeaviateConnectionError as wce: + click.echo(f"Connection error: {wce}") + sys.exit(1) + except Exception as e: + click.echo(f"Error: {e}") + sys.exit(1) + finally: + if client: + client.close() diff --git a/weaviate_cli/datasets/movies.json b/weaviate_cli/datasets/movies.json index 7970d22..d8340cb 100644 --- a/weaviate_cli/datasets/movies.json +++ b/weaviate_cli/datasets/movies.json @@ -1,6 +1,6 @@ [ { - "budget": 237000000, + "budget": "237000000", "cast": "Sam Worthington Zoe Saldana Sigourney Weaver Stephen Lang Michelle Rodriguez", "director": "James Cameron", "genres": "Action Adventure Fantasy Science Fiction", diff --git a/weaviate_cli/managers/batch_manager.py b/weaviate_cli/managers/batch_manager.py new file mode 100644 index 0000000..36f6b88 --- /dev/null +++ b/weaviate_cli/managers/batch_manager.py @@ -0,0 +1,136 @@ +import click +from typing import Dict, List, Optional +import weaviate.classes.config as wvc +from weaviate.client import WeaviateClient + + +class BatchManager: + def __init__(self, client: WeaviateClient) -> None: + self.client = client + + def create_collection( + self, + collection: str, + vectorizer: str = "contextionary", + shards: int = 1, + replication_factor: int = 1, + force_auto_schema: bool = True, + ) -> None: + """ + Create a collection dynamically for batch insertion. + + Args: + collection (str): Name of the collection to create. + vectorizer (str): Vectorizer type (e.g., openai, transformers). + shards (int): Number of shards for the collection. + replication_factor (int): Replication factor for the collection. + force_auto_schema (bool): Whether to let Weaviate infer schema from inserted data. + """ + if self.client.collections.exists(collection): + click.echo(f"Collection '{collection}' already exists. Skipping creation.") + return + + # Map vectorizers to Weaviate configurations + vectorizer_map: Dict[str, wvc.VectorizerConfig] = { + "contextionary": wvc.Configure.Vectorizer.text2vec_contextionary(), + "transformers": wvc.Configure.Vectorizer.text2vec_transformers(), + "openai": wvc.Configure.Vectorizer.text2vec_openai(), + "ollama": wvc.Configure.Vectorizer.text2vec_ollama( + model="snowflake-arctic-embed:33m" + ), + "cohere": wvc.Configure.Vectorizer.text2vec_cohere(), + "jinaai": wvc.Configure.Vectorizer.text2vec_jinaai(), + } + + # Validate vectorizer + if vectorizer not in vectorizer_map: + raise ValueError( + f"Invalid vectorizer '{vectorizer}'. Choose from: {list(vectorizer_map.keys())}" + ) + + try: + # Create collection with configuration + self.client.collections.create( + name=collection, + vector_index_config=wvc.Configure.VectorIndex.hnsw(), + replication_config=wvc.Configure.replication( + factor=replication_factor, + async_enabled=False, + deletion_strategy=wvc.ReplicationDeletionStrategy.DELETE_ON_CONFLICT, + ), + sharding_config=wvc.Configure.sharding(desired_count=shards), + vectorizer_config=vectorizer_map[vectorizer], + properties=None if force_auto_schema else [], + ) + click.echo( + f"Collection '{collection}' created successfully with vectorizer '{vectorizer}'." + ) + except Exception as e: + raise Exception(f"Error creating collection '{collection}': {e}") + + def batch_insert( + self, + collection: str, + data: List[Dict], + ) -> None: + """ + Insert data into a collection in batch. + + Args: + collection (str): Name of the collection. + data (List[Dict]): Data to be inserted. + """ + if not self.client.collections.exists(collection): + raise Exception( + f"Collection '{collection}' does not exist. Cannot insert data." + ) + + try: + # Perform batch insertion using Weaviate's dynamic batch + with self.client.batch.dynamic() as batch: + for record in data: + + def remove_id_keys(data: Dict) -> Dict: + if not isinstance(data, dict): + return data + cleaned_data = {} + for key, value in data.items(): + if key != "id": + if isinstance(value, dict): + cleaned_data[key] = remove_id_keys(value) + elif isinstance(value, list): + cleaned_data[key] = [ + ( + remove_id_keys(item) + if isinstance(item, dict) + else item + ) + for item in value + ] + else: + cleaned_data[key] = value + + return cleaned_data + + # Remove all keys and subkeys that start with 'id' + record = remove_id_keys(record) + + # Add the object to the batch + batch.add_object( + collection=collection, + properties=record, + ) + click.echo( + f"Processed record" + ) # add '{record}' <- if you would like to see the record being processed + except Exception as e: + raise Exception(f"Batch insertion failed: {e}") + + # Check for failed objects + failed_objects = self.client.batch.failed_objects + if failed_objects: + click.echo(f"Number of failed objects: {len(failed_objects)}") + for i, failed_obj in enumerate(failed_objects, 1): + click.echo(f"Failed object {i}: {failed_obj}") + else: + click.echo(f"All objects successfully inserted into '{collection}'.")