Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added 2026-01-pytorch/.DS_Store
Binary file not shown.
85 changes: 85 additions & 0 deletions 2026-01-pytorch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# PyTorch + QuestDB Data Pipeline Tutorial

This tutorial demonstrates three ways to load time-series data from QuestDB into PyTorch:

1. **Direct SQL queries** (REST API or ConnectorX)
2. **Parquet export** via REST API
3. **Direct Parquet partition access** (for local QuestDB)

The example task is volume forecasting, but the data loading patterns apply to any model.

## Quick Start

```bash
# Install dependencies
pip install pandas pyarrow requests torch scikit-learn

# Or with uv:
uv sync

# Run the demos in order:
python steps/method1_direct_query.py # See direct query methods
python steps/method2_parquet_export.py # Export data to Parquet
python steps/train.py # Train the model
```

## What Each Script Does

| Script | Description |
|--------|-------------|
| `steps/method1_direct_query.py` | Queries QuestDB demo instance, shows REST API and ConnectorX |
| `steps/method2_parquet_export.py` | Exports 7 days of data to `dataset/training_data.parquet` |
| `steps/method3_parquet_partitions.py` | Shows direct partition access (requires local QuestDB) |
| `steps/train.py` | Loads Parquet, engineers features, trains LSTM, saves model |

## Project Structure

```
├── config/
│ └── settings.py # Configuration (URLs, paths, hyperparameters)
├── src/
│ ├── data_loader.py # Three data loading methods
│ ├── features.py # Feature engineering
│ └── model.py # PyTorch Dataset and LSTM model
├── steps/
│ ├── method1_direct_query.py
│ ├── method2_parquet_export.py
│ ├── method3_parquet_partitions.py
│ └── train.py
├── dataset/ # Exported Parquet files
├── models/ # Saved model checkpoints
└── pyproject.toml
```

## Data Source

Uses the `trades` table from QuestDB's public demo: `https://demo.questdb.io`

The query aggregates raw trades into 1-minute bars:

```sql
SELECT
timestamp,
symbol,
sum(amount) as volume,
count() as trade_count,
avg(price) as avg_price,
max(price) - min(price) as price_range
FROM trades
WHERE symbol = 'BTC-USD'
SAMPLE BY 1m
ALIGN TO CALENDAR
```

## Method Comparison

| Method | Best For | Requires |
|--------|----------|----------|
| REST API | Exploration, simple queries | Network access |
| ConnectorX | Larger result sets | `pip install connectorx` |
| Parquet Export | Reproducible training data | Network access |
| Direct Partitions | Terabyte-scale data | Local QuestDB with Parquet format |

## Note on the Model

The LSTM model is intentionally simple—this tutorial focuses on data pipeline patterns, not forecasting accuracy. In production, you'd benchmark against simpler methods (moving averages, ARIMA) and tune hyperparameters properly.
Empty file.
23 changes: 23 additions & 0 deletions 2026-01-pytorch/config/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from pathlib import Path

# QuestDB connection
QUESTDB_HTTP = "https://demo.questdb.io"
QUESTDB_PG = "postgresql://admin:quest@demo.questdb.io:8812/qdb"

# For local QuestDB with Parquet partitions (Method 3)
QUESTDB_DATA_DIR = Path("/var/lib/questdb/db")

# Paths
PROJECT_ROOT = Path(__file__).parent.parent
DATASET_DIR = PROJECT_ROOT / "dataset"
MODEL_DIR = PROJECT_ROOT / "models"

# Data settings
TABLE_NAME = "trades"
SYMBOL = "BTC-USDT"

# Model settings
SEQUENCE_LENGTH = 30
BATCH_SIZE = 32
EPOCHS = 20
LEARNING_RATE = 0.001
24 changes: 24 additions & 0 deletions 2026-01-pytorch/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[project]
name = "pytorch-questdb-pipeline"
version = "0.1.0"
description = "Tutorial: Loading QuestDB time-series data into PyTorch"
requires-python = ">=3.11"
dependencies = [
"pandas>=2.0.0",
"pyarrow>=15.0.0",
"requests>=2.31.0",
"torch>=2.0.0",
"scikit-learn>=1.4.0",
]

[project.optional-dependencies]
fast = [
"connectorx>=0.3.0", # Faster queries via PostgreSQL protocol
]

[tool.poe.tasks]
method1 = { cmd = "python steps/method1_direct_query.py", help = "Demo: Direct SQL queries" }
method2 = { cmd = "python steps/method2_parquet_export.py", help = "Demo: Parquet export via API" }
method3 = { cmd = "python steps/method3_parquet_partitions.py", help = "Demo: Direct partition access" }
train = { cmd = "python steps/train.py", help = "Train the LSTM model" }
all = ["method1", "method2", "train"]
Empty file added 2026-01-pytorch/src/__init__.py
Empty file.
141 changes: 141 additions & 0 deletions 2026-01-pytorch/src/data_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""
Three methods to load data from QuestDB into Python/PyTorch.
"""
import pandas as pd
import requests
from io import StringIO
from pathlib import Path

from config.settings import QUESTDB_HTTP, QUESTDB_PG


# =============================================================================
# Method 1: Direct Queries
# =============================================================================

def query_rest_api(query: str) -> pd.DataFrame:
"""
Query QuestDB via REST API, return DataFrame.

Simple and works everywhere. Best for exploration and smaller datasets.
"""
response = requests.get(
f"{QUESTDB_HTTP}/exp",
params={"query": query, "fmt": "csv"}
)
response.raise_for_status()
return pd.read_csv(StringIO(response.text))


def query_connectorx(query: str) -> pd.DataFrame:
"""
Query QuestDB via PostgreSQL wire protocol using ConnectorX.

Faster than REST for larger results. Requires: pip install connectorx
"""
import connectorx as cx
return cx.read_sql(QUESTDB_PG, query)


# =============================================================================
# Method 2: Parquet Export via REST API
# =============================================================================

def export_to_parquet(query: str, output_path: Path) -> Path:
"""
Export query results to Parquet file via QuestDB REST API.

Falls back to CSV if Parquet export is disabled on the server.
Best for reproducible training datasets.
"""
output_path.parent.mkdir(parents=True, exist_ok=True)

# Try native Parquet export
response = requests.get(
f"{QUESTDB_HTTP}/exp",
params={"query": query, "fmt": "parquet"},
stream=True
)

content_type = response.headers.get("content-type", "")

if response.ok and "parquet" in content_type:
with open(output_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"✓ Exported native Parquet: {output_path}")
else:
# Fallback: CSV export + pandas conversion
print(" Parquet export not available, using CSV fallback...")
csv_response = requests.get(
f"{QUESTDB_HTTP}/exp",
params={"query": query, "fmt": "csv"}
)
csv_response.raise_for_status()
df = pd.read_csv(StringIO(csv_response.text))
df.to_parquet(output_path, index=False)
print(f"✓ Exported via CSV→Parquet: {output_path}")

return output_path


# =============================================================================
# Method 3: Direct Parquet Partition Access
# =============================================================================

def load_parquet_partitions(table_name: str, data_dir: Path):
"""
Load QuestDB Parquet partitions directly via PyArrow.

Requires:
- Local/mounted access to QuestDB data directory
- Parquet partition format enabled (o3PartitionFormat = 'PARQUET')

Best for terabyte-scale data and streaming processing.
"""
import pyarrow.dataset as ds

table_dir = data_dir / table_name
parquet_files = list(table_dir.glob("**/*.parquet"))

if not parquet_files:
raise FileNotFoundError(
f"No Parquet files found in {table_dir}. "
"Ensure Parquet partition format is enabled in QuestDB."
)

print(f"✓ Found {len(parquet_files)} partition files")
return ds.dataset(table_dir, format="parquet")


def stream_from_partitions(dataset, columns: list[str], batch_size: int = 100_000):
"""
Stream data from Parquet partitions in batches.

Memory-efficient for large datasets.
"""
scanner = dataset.scanner(columns=columns, batch_size=batch_size)
for batch in scanner.to_batches():
yield batch.to_pandas()


# =============================================================================
# Query builders
# =============================================================================

def get_training_query(symbol: str = "BTC-USDT", days: int = 7) -> str:
"""Build query for training data: 1-minute aggregated bars."""
return f"""
SELECT
timestamp,
symbol,
sum(amount) as volume,
count() as trade_count,
avg(price) as avg_price,
max(price) - min(price) as price_range
FROM trades
WHERE symbol = '{symbol}'
AND timestamp > dateadd('d', -{days}, now())
SAMPLE BY 1m
ALIGN TO CALENDAR
"""
69 changes: 69 additions & 0 deletions 2026-01-pytorch/src/features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
Feature engineering for time-series forecasting.
"""
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler


FEATURE_COLS = [
"volume_lag_1",
"volume_lag_5",
"volume_lag_15",
"volume_ma_15",
"volume_std_15",
"trade_count",
"price_range"
]


def prepare_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Add features for volume forecasting.

Returns DataFrame with features and target column.
"""
df = df.sort_values("timestamp").copy()

# Ensure numeric types
df["volume"] = pd.to_numeric(df["volume"], errors="coerce")
df["trade_count"] = pd.to_numeric(df["trade_count"], errors="coerce")
df["price_range"] = pd.to_numeric(df["price_range"], errors="coerce")

# Lag features
for lag in [1, 5, 15]:
df[f"volume_lag_{lag}"] = df["volume"].shift(lag)

# Rolling statistics
df["volume_ma_15"] = df["volume"].rolling(15).mean()
df["volume_std_15"] = df["volume"].rolling(15).std()

# Target: next period's volume
df["target"] = df["volume"].shift(-1)

# Drop rows with NaN (from lags/rolling)
df = df.dropna()

return df


def normalize_features(
train_df: pd.DataFrame,
test_df: pd.DataFrame,
feature_cols: list[str] = FEATURE_COLS
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, StandardScaler]:
"""
Normalize features using StandardScaler.

Fits on train, transforms both train and test.
Returns: X_train, X_test, y_train, y_test, scaler
"""
scaler = StandardScaler()

X_train = scaler.fit_transform(train_df[feature_cols])
X_test = scaler.transform(test_df[feature_cols])

y_train = train_df["target"].values
y_test = test_df["target"].values

return X_train, X_test, y_train, y_test, scaler
Loading