Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.env
.venv
*.pyc
*.egg-info
.vscode/
*_conf*
.ruff_code/
Expand All @@ -20,3 +21,4 @@ node_modules/
/app/public/js/*
.jinja_cache/
demo_tokens.py

15 changes: 15 additions & 0 deletions clients/python/Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[packages]
aiohttp = "*"
queryweaver-client = {file = ".", editable = true}

[dev-packages]
pytest = "*"
pytest-asyncio = "*"

[requires]
python_version = "3.12"
766 changes: 766 additions & 0 deletions clients/python/Pipfile.lock

Large diffs are not rendered by default.

108 changes: 108 additions & 0 deletions clients/python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# QueryWeaver Python client

Lightweight Python client for interacting with a QueryWeaver server.

This folder contains a small package, `queryweaver_client`, that wraps the
QueryWeaver HTTP API. The package uses aiohttp for async HTTP requests and is
configured to be used with pipenv (Pipfile) or a plain virtualenv.

## Getting started

### Install from PyPI (end users)

If you just want to use the client (not develop it), install the published package from PyPI:

```bash
pip install queryweaver_client
```

This installs the latest released version. For development and tests use the pipenv/venv instructions below.

### Usage example

Basic async usage (API token auth):

```python
import asyncio
from queryweaver_client import QueryWeaverClient

async def main():
async with QueryWeaverClient("http://localhost:5000", api_token="YOUR_API_TOKEN") as client:
# List available databases/schemas
schemas = await client.list_schemas()
print(schemas)

# Connect to a database and get the final result
result = await client.connect_database("postgresql://user:pass@host/db")
print(result)

# Run a natural language query and get the final result
chat_data = {
"messages": [
{"role": "user", "content": "List the top 5 customers by total_spend"}
]
}
result = await client.query("my_database_schema", chat_data)
print(result)

# Run the async function
asyncio.run(main())
```


## Build from code (developers)


### Quick install (pipenv - recommended)

1. Change to the client directory:

```bash
cd ./clients/python
```

2. Install dependencies and the package into a pipenv virtualenv (editable install supported):

```bash
pipenv install --dev
# Install the local package into the pipenv virtualenv in editable mode
pipenv install -e .
```

### Running tests

Unit tests live in `tests/` and are lightweight (they mock network calls and do NOT
require a running QueryWeaver server). Use pytest to run them.

With pipenv:

```bash
pipenv run pytest -q
```

With venv/pip:

```bash
pytest -q
```

Run a single test file or test:

```bash
pytest -q tests/test_client_basic.py
pytest -q tests/test_client_basic.py::test_connect_database_sync
```

## Notes & troubleshooting
-----------------------
- The `Pipfile` specifies Python 3.12; use a compatible interpreter if you need exact parity.
- If `pipenv install` fails because `pipenv` isn't available, install it (`pip install pipenv`) or use the plain venv flow above.
- The client supports two auth modes:
- API token: pass `api_token` to `QueryWeaverClient` (sent as Bearer Authorization header)
- Session cookie: construct an `aiohttp.ClientSession()` with cookies (for web OAuth session flows) and pass it to `QueryWeaverClient(session=your_session)`
- All API calls are async and must be awaited
- Use the client as an async context manager (`async with`) for proper session lifecycle management

## Want more?
-----------
If you'd like typed request/response models (pydantic), better streaming helpers, or integration tests that run against a local QueryWeaver instance (requires FalkorDB and env setup), I can add those next.
8 changes: 8 additions & 0 deletions clients/python/queryweaver_client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""QueryWeaver Python client package.

Provides a small, dependency-light wrapper for the QueryWeaver HTTP API.
"""

from .client import QueryWeaverClient

__all__ = ["QueryWeaverClient"]
159 changes: 159 additions & 0 deletions clients/python/queryweaver_client/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
"""QueryWeaver HTTP API client.

Simple wrapper around the REST API exposed by a running QueryWeaver server.
Supports API token authentication (Bearer) and session cookie usage.
"""
from __future__ import annotations

import json
from typing import Any, Dict, Optional

import aiohttp


class APIError(Exception):
"""Raised for non-2xx responses from the server."""


class QueryWeaverClient:
"""Minimal QueryWeaver client.

Usage:
async with QueryWeaverClient("http://localhost:5000", api_token="...") as client:
await client.list_schemas()
result = await client.connect_database("postgresql://user:pass@host/db")

Authentication:
- api_token: sent as Bearer token in Authorization header
- session cookie: pass a `aiohttp.ClientSession()` with cookies set
"""

def __init__(
self,
base_url: str,
api_token: Optional[str] = None,
session: Optional[aiohttp.ClientSession] = None,
timeout: int = 30,
) -> None:
self.base_url = base_url.rstrip("/")
self.api_token = api_token
self._session = session
self.timeout = aiohttp.ClientTimeout(total=timeout)

# default headers
self._default_headers = {"Accept": "application/json"}
if api_token:
self._default_headers["Authorization"] = f"Bearer {api_token}"

async def __aenter__(self):
if self._session is None:
self._session = aiohttp.ClientSession(headers=self._default_headers)
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._session and not self._session.closed:
Copy link

Copilot AI Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will close a user-provided session on exit. Since you track ownership with _owns_session, only close the session if it was created by the client (self._owns_session is True). Suggested fix: check self._owns_session before closing.

Suggested change
if self._session and not self._session.closed:
if self._owns_session and self._session and not self._session.closed:

Copilot uses AI. Check for mistakes.
await self._session.close()

def _url(self, path: str) -> str:
return f"{self.base_url}{path}"

def _raise_for_status(self, resp: aiohttp.ClientResponse) -> None:
if not resp.ok:
# try to parse json error
try:
payload = resp.json()
except Exception:
payload = resp.text
raise APIError(f"HTTP {resp.status}: {payload}")

async def list_schemas(self) -> Dict[str, Any]:
"""GET /graphs - list available user schemas/databases."""
async with self._session.get(self._url("/graphs"), timeout=self.timeout) as resp:
self._raise_for_status(resp)
return await resp.json()

async def get_schema(self, graph_id: str) -> Dict[str, Any]:
"""GET /graphs/{graph_id}/data - return schema nodes/edges."""
async with self._session.get(self._url(f"/graphs/{graph_id}/data"), timeout=self.timeout) as resp:
self._raise_for_status(resp)
return await resp.json()

async def delete_schema(self, graph_id: str) -> Dict[str, Any]:
"""DELETE /graphs/{graph_id} - delete a schema."""
async with self._session.delete(self._url(f"/graphs/{graph_id}"), timeout=self.timeout) as resp:
self._raise_for_status(resp)
return await resp.json()

async def refresh_schema(self, graph_id: str) -> Dict[str, Any]:
"""POST /graphs/{graph_id}/refresh - refresh schema."""
async with self._session.post(self._url(f"/graphs/{graph_id}/refresh"), timeout=self.timeout) as resp:
self._raise_for_status(resp)
return await resp.json()

async def connect_database(self, db_url: str) -> Dict[str, Any]:
"""POST /database - connect to a database and return final result."""
payload = {"url": db_url}
async with self._session.post(self._url("/database"), json=payload, timeout=self.timeout) as resp:
self._raise_for_status(resp)

# Consume the streaming response and return the final result
events = []
async for chunk in resp.content.iter_any():
chunk_str = chunk.decode('utf-8')
if not chunk_str.strip():
continue
try:
events.append(json.loads(chunk_str))
except Exception:
# fallback: try to split by '|||' or newline
parts = chunk_str.split("|||")
for part in parts:
part = part.strip()
if not part:
continue
try:
events.append(json.loads(part))
except Exception:
events.append({"raw": part})

# Return the last event which typically contains the final status/result
return events[-1] if events else {}

async def query(self, graph_id: str, chat_data: Dict[str, Any]) -> Dict[str, Any]:
"""POST /graphs/{graph_id} - run a natural language query and return final result."""
async with self._session.post(self._url(f"/graphs/{graph_id}"), json=chat_data, timeout=self.timeout) as resp:
self._raise_for_status(resp)

# Consume the streaming response and return the final result
events = []
async for chunk in resp.content.iter_any():
chunk_str = chunk.decode('utf-8')
if not chunk_str.strip():
continue
try:
events.append(json.loads(chunk_str))
except Exception:
# server may send partial JSON, try to yield raw
events.append({"raw": chunk_str})

# Return the last event which typically contains the final SQL result
return events[-1] if events else {}

async def confirm(self, graph_id: str, confirm_data: Dict[str, Any]) -> Dict[str, Any]:
"""POST /graphs/{graph_id}/confirm - confirm destructive operation and return result."""
async with self._session.post(self._url(f"/graphs/{graph_id}/confirm"), json=confirm_data, timeout=self.timeout) as resp:
self._raise_for_status(resp)

# Consume the streaming response and return the final result
events = []
async for chunk in resp.content.iter_any():
chunk_str = chunk.decode('utf-8')
if not chunk_str.strip():
continue
try:
events.append(json.loads(chunk_str))
except Exception:
events.append({"raw": chunk_str})

# Return the last event
return events[-1] if events else {}
15 changes: 15 additions & 0 deletions clients/python/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from setuptools import setup, find_packages

setup(
name="queryweaver-client",
version="0.1.0",
description="Lightweight Python client for QueryWeaver server",
packages=find_packages(exclude=("tests",)),
include_package_data=True,
install_requires=[
"requests",
],
extras_require={
"dev": ["pytest"],
},
)
Loading
Loading