Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
A powerful command-line interface for managing and interacting with Weaviate vector databases directly from your terminal.

## Key Features
- **Collections**: Create, update, delete and get collection configurations
- **Collections**: Create, batch, update, delete and get collection configurations
- **Data Management**: Import, query, update and delete data with various search types (vector, keyword, hybrid)
- **Multi-tenancy**: Manage tenants and their states across collections
- **Backup & Restore**: Create and restore backups with support for S3, GCS and filesystem
Expand Down Expand Up @@ -40,13 +40,17 @@ weaviate-cli create collection --collection movies --vectorizer transformers
# Import test data
weaviate-cli create data --collection movies --limit 1000

# Batch data import from local json file
weaviate-cli batch insert --collection <COL_NAME> --path <LOCAL_FILE_PATH.json> --vectorizer <VEC_NAME e.g. openai> --replication-factor 3

# Query data
weaviate-cli query data --collection movies --search-type hybrid --query "action movies"
```

## Core Commands

- **create**: Create collections, tenants, backups or import data
- **batch**: batch import a collection from a local file
- **delete**: Remove collections, tenants or data
- **update**: Modify collection settings, tenant states or data
- **get**: Retrieve collection info, tenant details or shard status
Expand Down
2 changes: 2 additions & 0 deletions cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from weaviate_cli.commands.cancel import cancel
from weaviate_cli.commands.assign import assign
from weaviate_cli.commands.revoke import revoke
from weaviate_cli.commands.batch import batch
from weaviate_cli import __version__


Expand Down Expand Up @@ -62,6 +63,7 @@ def main(ctx: click.Context, config_file: Optional[str], user: Optional[str]):
main.add_command(cancel)
main.add_command(assign)
main.add_command(revoke)
main.add_command(batch)

if __name__ == "__main__":
main()
103 changes: 103 additions & 0 deletions weaviate_cli/commands/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import click
import sys
import os
import json
from weaviate_cli.utils import get_client_from_context
from weaviate.exceptions import WeaviateConnectionError
from weaviate_cli.defaults import CreateCollectionDefaults
from weaviate_cli.managers.batch_manager import BatchManager


@click.group()
def batch() -> None:
"""Batch operations in Weaviate."""
pass


@batch.command("insert")
@click.option(
"--collection",
required=True,
help="The name of the collection (class) to insert data into.",
)
@click.option(
"--path",
required=True,
type=str,
help="Path to the JSON file containing the data.",
)
@click.option(
"--vectorizer",
default=CreateCollectionDefaults.vectorizer,
type=click.Choice(
["contextionary", "transformers", "openai", "ollama", "cohere", "jinaai"]
),
help="Vectorizer to use.",
)
@click.option(
"--shards",
default=1,
help="Number of shards for the collection (default: 1).",
)
@click.option(
"--replication-factor",
default=1,
help="Replication factor for the collection (default: 1).",
)
@click.pass_context
def batch_insert_cli(ctx, collection, path, vectorizer, shards, replication_factor):
"""
Insert data into a Weaviate collection (class) in batch mode.
"""
# Validate the file path and extension
if not os.path.isfile(path):
click.echo(f"Error: The file {path} does not exist.")
sys.exit(1)
if not path.endswith(".json"):
click.echo("Error: The file must have a .json extension.")
sys.exit(1)

# Load the JSON data
try:
with open(path, "r") as file:
data = json.load(file)
except json.JSONDecodeError:
click.echo(f"Error: The file {path} is not a valid JSON file.")
sys.exit(1)

# Validate JSON structure
if not isinstance(data, list) or not all(isinstance(obj, dict) for obj in data):
click.echo(
"Error: The JSON file must contain a list of objects (e.g., [{...}, {...}])."
)
sys.exit(1)

# Initialize the Weaviate client
client = None
try:
client = get_client_from_context(ctx)
batch_manager = BatchManager(client)

# Create the collection (if it doesn't exist)
click.echo(f"Ensuring collection '{collection}' exists...")
batch_manager.create_collection(
collection=collection,
vectorizer=vectorizer,
shards=shards,
replication_factor=replication_factor,
force_auto_schema=True,
)

# Insert the data in batch mode
click.echo(f"Inserting data into collection '{collection}'...")
batch_manager.batch_insert(collection, data)

except WeaviateConnectionError as wce:
click.echo(f"Connection error: {wce}")
sys.exit(1)
except Exception as e:
click.echo(f"Error: {e}")
sys.exit(1)
finally:
if client:
client.close()
2 changes: 1 addition & 1 deletion weaviate_cli/datasets/movies.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
{
"budget": 237000000,
"budget": "237000000",
"cast": "Sam Worthington Zoe Saldana Sigourney Weaver Stephen Lang Michelle Rodriguez",
"director": "James Cameron",
"genres": "Action Adventure Fantasy Science Fiction",
Expand Down
136 changes: 136 additions & 0 deletions weaviate_cli/managers/batch_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import click
from typing import Dict, List, Optional
import weaviate.classes.config as wvc
from weaviate.client import WeaviateClient


class BatchManager:
def __init__(self, client: WeaviateClient) -> None:
self.client = client

def create_collection(
self,
collection: str,
vectorizer: str = "contextionary",
shards: int = 1,
replication_factor: int = 1,
force_auto_schema: bool = True,
) -> None:
"""
Create a collection dynamically for batch insertion.

Args:
collection (str): Name of the collection to create.
vectorizer (str): Vectorizer type (e.g., openai, transformers).
shards (int): Number of shards for the collection.
replication_factor (int): Replication factor for the collection.
force_auto_schema (bool): Whether to let Weaviate infer schema from inserted data.
"""
if self.client.collections.exists(collection):
click.echo(f"Collection '{collection}' already exists. Skipping creation.")
return

# Map vectorizers to Weaviate configurations
vectorizer_map: Dict[str, wvc.VectorizerConfig] = {
"contextionary": wvc.Configure.Vectorizer.text2vec_contextionary(),
"transformers": wvc.Configure.Vectorizer.text2vec_transformers(),
"openai": wvc.Configure.Vectorizer.text2vec_openai(),
"ollama": wvc.Configure.Vectorizer.text2vec_ollama(
model="snowflake-arctic-embed:33m"
),
"cohere": wvc.Configure.Vectorizer.text2vec_cohere(),
"jinaai": wvc.Configure.Vectorizer.text2vec_jinaai(),
}

# Validate vectorizer
if vectorizer not in vectorizer_map:
raise ValueError(
f"Invalid vectorizer '{vectorizer}'. Choose from: {list(vectorizer_map.keys())}"
)

try:
# Create collection with configuration
self.client.collections.create(
name=collection,
vector_index_config=wvc.Configure.VectorIndex.hnsw(),
replication_config=wvc.Configure.replication(
factor=replication_factor,
async_enabled=False,
deletion_strategy=wvc.ReplicationDeletionStrategy.DELETE_ON_CONFLICT,
),
sharding_config=wvc.Configure.sharding(desired_count=shards),
vectorizer_config=vectorizer_map[vectorizer],
properties=None if force_auto_schema else [],
)
click.echo(
f"Collection '{collection}' created successfully with vectorizer '{vectorizer}'."
)
except Exception as e:
raise Exception(f"Error creating collection '{collection}': {e}")

def batch_insert(
self,
collection: str,
data: List[Dict],
) -> None:
"""
Insert data into a collection in batch.

Args:
collection (str): Name of the collection.
data (List[Dict]): Data to be inserted.
"""
if not self.client.collections.exists(collection):
raise Exception(
f"Collection '{collection}' does not exist. Cannot insert data."
)

try:
# Perform batch insertion using Weaviate's dynamic batch
with self.client.batch.dynamic() as batch:
for record in data:

def remove_id_keys(data: Dict) -> Dict:
if not isinstance(data, dict):
return data
cleaned_data = {}
for key, value in data.items():
if key != "id":
if isinstance(value, dict):
cleaned_data[key] = remove_id_keys(value)
elif isinstance(value, list):
cleaned_data[key] = [
(
remove_id_keys(item)
if isinstance(item, dict)
else item
)
for item in value
]
else:
cleaned_data[key] = value

return cleaned_data

# Remove all keys and subkeys that start with 'id'
record = remove_id_keys(record)

# Add the object to the batch
batch.add_object(
collection=collection,
properties=record,
)
click.echo(
f"Processed record"
) # add '{record}' <- if you would like to see the record being processed
except Exception as e:
raise Exception(f"Batch insertion failed: {e}")

# Check for failed objects
failed_objects = self.client.batch.failed_objects
if failed_objects:
click.echo(f"Number of failed objects: {len(failed_objects)}")
for i, failed_obj in enumerate(failed_objects, 1):
click.echo(f"Failed object {i}: {failed_obj}")
else:
click.echo(f"All objects successfully inserted into '{collection}'.")
Loading