Skip to content

Commit e2e64ed

Browse files
ryan-williamsclaude
andcommitted
Implement monthly parquet sharding to reduce Lambda write amplification
**Problem**: Each Lambda invocation downloaded the entire growing parquet file (~4MB+), appended 1 row, and re-uploaded. This becomes increasingly wasteful as files grow. **Solution**: Shard data by month into `awair-{id}/{YYYY-MM}.parquet` files. Lambda now only touches the current month's file (~640KB max). Historical months are immutable. Backend changes: - `awair data shard`: New CLI command to split existing files into monthly shards - Lambda `updater.py`: Write to monthly files via `get_monthly_s3_config()` - Lambda `app.py`: IAM permissions use wildcard (`{key_base}/*`) - `config.py`: Add `list_monthly_files()`, `load_monthly_data()` helpers - CLI `info`/`gaps`/`hist`: Auto-detect and aggregate across monthly files Frontend changes: - `hyparquetSource.ts`: Parallel multi-file fetching with 404 handling - `awairService.ts`: Add `getMonthlyDataUrl()`, `getMonthsInRange()` - Refresh only polls current month's file (historical months immutable) Row group size: 5000 rows (~3.5 days, ~80KB) for good cache granularity. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent a3a232d commit e2e64ed

File tree

14 files changed

+584
-180
lines changed

14 files changed

+584
-180
lines changed

CLAUDE.md

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,23 @@ pnpm run test # Run tests
8080
```
8181
Awair API
8282
83-
Lambda (every 1 min) → S3 (device-specific parquet files)
83+
Lambda (every 1 min) → S3 (monthly parquet shards)
8484
↑ ↓
8585
Python CLI Web Dashboard (reads directly from S3)
8686
8787
Multi-Device Example:
88-
Gym (17617): EventBridge (1min) → Lambda → s3://380nwk/awair-17617.parquet
89-
BR (137496): EventBridge (1min) → Lambda → s3://380nwk/awair-137496.parquet
88+
Gym (17617): EventBridge (1min) → Lambda → s3://380nwk/awair-17617/{YYYY-MM}.parquet
89+
BR (137496): EventBridge (1min) → Lambda → s3://380nwk/awair-137496/{YYYY-MM}.parquet
9090
```
9191

92+
### Monthly Sharding
93+
94+
Data is stored in monthly shards to reduce Lambda write amplification:
95+
- Each Lambda invocation only downloads/uploads the current month's file (~14-44k rows)
96+
- Historical months are immutable (never modified after month ends)
97+
- Frontend fetches from multiple monthly files in parallel for longer time ranges
98+
- CLI commands auto-detect and aggregate across monthly files
99+
92100
### Key Components
93101

94102
#### 1. Lambda Data Updater (`src/awair/lmbda/`)
@@ -149,8 +157,12 @@ All data files follow a fixed structure under the S3 root:
149157
```
150158
{S3_ROOT}/
151159
├── devices.parquet # Device registry (cached from API)
152-
├── awair-17617.parquet # Device data files
153-
├── awair-137496.parquet
160+
├── awair-17617/ # Device data (monthly shards)
161+
│ ├── 2025-06.parquet
162+
│ ├── 2025-07.parquet
163+
│ └── ...
164+
├── awair-137496/
165+
│ └── ...
154166
└── ...
155167
```
156168

src/awair/cli/config.py

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,18 @@ def get_devices_path() -> str:
6363

6464

6565
def get_data_path(device_id: int) -> str:
66-
"""Get path to device data parquet file."""
66+
"""Get path to device data parquet file (legacy single-file format)."""
6767
return f'{get_s3_root()}/awair-{device_id}.parquet'
6868

6969

70+
def get_data_base_path(device_id: int) -> str:
71+
"""Get base path for device data (directory for monthly shards).
72+
73+
Monthly files are stored as: {base_path}/{YYYY-MM}.parquet
74+
"""
75+
return f'{get_s3_root()}/awair-{device_id}'
76+
77+
7078
def parse_s3_path(s3_path: str) -> tuple[str, str]:
7179
"""Parse S3 path into bucket and key components."""
7280
if not s3_path.startswith('s3://'):
@@ -85,6 +93,65 @@ def parse_s3_path(s3_path: str) -> tuple[str, str]:
8593
return bucket, key
8694

8795

96+
def list_monthly_files(base_path: str) -> list[str]:
97+
"""List all monthly parquet files in a device data directory.
98+
99+
Args:
100+
base_path: Base path for device data (e.g., s3://bucket/awair-17617 or ./awair-17617)
101+
102+
Returns:
103+
Sorted list of paths to monthly parquet files (e.g., ['s3://.../2024-11.parquet', ...])
104+
"""
105+
import re
106+
107+
# Strip trailing slash if present
108+
base_path = base_path.rstrip('/')
109+
110+
if base_path.startswith('s3://'):
111+
import boto3
112+
bucket, prefix = parse_s3_path(f'{base_path}/placeholder')
113+
prefix = prefix.rsplit('/', 1)[0] + '/' # Get directory prefix
114+
115+
s3 = boto3.client('s3')
116+
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
117+
118+
files = []
119+
for obj in response.get('Contents', []):
120+
key = obj['Key']
121+
# Match YYYY-MM.parquet pattern
122+
if re.match(r'.*/\d{4}-\d{2}\.parquet$', key):
123+
files.append(f's3://{bucket}/{key}')
124+
return sorted(files)
125+
else:
126+
from pathlib import Path
127+
base = Path(base_path)
128+
if not base.is_dir():
129+
return []
130+
files = [str(f) for f in base.glob('*.parquet') if re.match(r'\d{4}-\d{2}\.parquet$', f.name)]
131+
return sorted(files)
132+
133+
134+
def load_monthly_data(base_path: str):
135+
"""Load and combine all monthly parquet files into a single DataFrame.
136+
137+
Args:
138+
base_path: Base path for device data (e.g., s3://bucket/awair-17617)
139+
140+
Returns:
141+
Combined DataFrame sorted by timestamp
142+
"""
143+
import pandas as pd
144+
145+
files = list_monthly_files(base_path)
146+
if not files:
147+
return pd.DataFrame()
148+
149+
dfs = [pd.read_parquet(f) for f in files]
150+
combined = pd.concat(dfs, ignore_index=True)
151+
combined = combined.sort_values('timestamp').reset_index(drop=True)
152+
return combined
153+
154+
88155
def get_default_data_path(device_id: int | None = None) -> str:
89156
"""Get data file path for a device.
90157

src/awair/cli/data.py

Lines changed: 168 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,14 @@
99
from ..storage import ParquetStorage
1010
from .base import awair
1111
from .common_opts import device_id_opt
12-
from .config import data_path_opt, err
12+
from .config import (
13+
data_path_opt,
14+
err,
15+
get_data_base_path,
16+
list_monthly_files,
17+
load_monthly_data,
18+
resolve_device_by_name_or_id,
19+
)
1320

1421

1522
@awair.group
@@ -18,21 +25,82 @@ def data():
1825
pass
1926

2027

28+
def load_device_data(device_id: str | None, data_path: str) -> tuple[pd.DataFrame, str, bool]:
29+
"""Load device data, trying monthly files first then falling back to single file.
30+
31+
Args:
32+
device_id: Device ID (string or numeric)
33+
data_path: Data path (may be single file or base directory)
34+
35+
Returns:
36+
Tuple of (DataFrame, source_description, is_monthly)
37+
"""
38+
import re
39+
40+
# If device_id not provided, try to extract from data_path
41+
# Pattern: awair-{deviceId}.parquet or awair-{deviceId}/
42+
if device_id is None:
43+
match = re.search(r'awair-(\d+)(?:\.parquet|/|$)', data_path)
44+
if match:
45+
device_id = match.group(1)
46+
47+
# Try monthly files first
48+
if device_id is not None:
49+
if isinstance(device_id, str):
50+
try:
51+
_, device_id_int = resolve_device_by_name_or_id(device_id)
52+
except ValueError:
53+
device_id_int = int(device_id)
54+
else:
55+
device_id_int = device_id
56+
57+
base_path = get_data_base_path(device_id_int)
58+
monthly_files = list_monthly_files(base_path)
59+
60+
if monthly_files:
61+
df = load_monthly_data(base_path)
62+
source = f'{base_path}/ ({len(monthly_files)} monthly files)'
63+
return df, source, True
64+
65+
# Fall back to single file
66+
storage = ParquetStorage(data_path)
67+
df = storage.read_data()
68+
return df, data_path, False
69+
70+
2171
@data.command
2272
@device_id_opt
2373
@data_path_opt
2474
def info(device_id: str | None, data_path: str):
25-
"""Show data file information."""
26-
storage = ParquetStorage(data_path)
27-
summary = storage.get_data_summary()
75+
"""Show data file information.
2876
29-
echo(f'Data file: {data_path}')
30-
echo(f'Total records: {summary["count"]}')
31-
if summary['earliest']:
32-
echo(f'Date range: {summary["earliest"]} to {summary["latest"]}')
33-
echo(f'File size: {summary["file_size_mb"]:.2f} MB')
34-
else:
35-
echo('No data in file')
77+
Automatically detects and reads from monthly sharded files if available,
78+
falling back to single-file format.
79+
"""
80+
df, source, is_monthly = load_device_data(device_id, data_path)
81+
82+
echo(f'Data source: {source}')
83+
84+
if df.empty:
85+
echo('No data found')
86+
return
87+
88+
echo(f'Total records: {len(df):,}')
89+
90+
df['timestamp'] = pd.to_datetime(df['timestamp'])
91+
earliest = df['timestamp'].min()
92+
latest = df['timestamp'].max()
93+
echo(f'Date range: {earliest} to {latest}')
94+
95+
if is_monthly:
96+
# Show per-month breakdown
97+
base_path = source.split(' (')[0]
98+
monthly_files = list_monthly_files(base_path)
99+
echo('\nMonthly files:')
100+
for f in monthly_files:
101+
month_name = f.split('/')[-1].replace('.parquet', '')
102+
month_df = pd.read_parquet(f)
103+
echo(f' {month_name}: {len(month_df):,} records')
36104

37105

38106
@data.command
@@ -49,14 +117,14 @@ def gaps(
49117
count: int,
50118
min_gap: int | None,
51119
):
52-
"""Find and report the largest timing gaps in the data."""
120+
"""Find and report the largest timing gaps in the data.
53121
54-
# Read data
55-
storage = ParquetStorage(data_path)
56-
df = storage.read_data()
122+
Automatically detects and reads from monthly sharded files if available.
123+
"""
124+
df, source, _ = load_device_data(device_id, data_path)
57125

58126
if df.empty:
59-
err('No data in file')
127+
err('No data found')
60128
return
61129

62130
# Filter by date range if specified (parsing already handled by option callbacks)
@@ -100,7 +168,7 @@ def gaps(
100168
# Show summary
101169
date_range = f'{df["timestamp"].min().date()} to {df["timestamp"].max().date()}'
102170

103-
echo(f'Gap analysis for {data_path}')
171+
echo(f'Gap analysis for {source}')
104172
echo(f'Date range: {date_range}')
105173
echo(f'Total records: {len(df)}')
106174

@@ -132,13 +200,14 @@ def hist(
132200
from_dt: str | None,
133201
to_dt: str | None,
134202
):
135-
"""Generate histogram of record counts per day."""
203+
"""Generate histogram of record counts per day.
136204
137-
storage = ParquetStorage(data_path)
138-
df = storage.read_data()
205+
Automatically detects and reads from monthly sharded files if available.
206+
"""
207+
df, _, _ = load_device_data(device_id, data_path)
139208

140209
if df.empty:
141-
err('No data in file')
210+
err('No data found')
142211
return
143212

144213
# Ensure timestamp is datetime
@@ -166,3 +235,81 @@ def hist(
166235

167236
for _, row in daily_counts.iterrows():
168237
echo(f'{row["count"]:7d} {row["date"]}')
238+
239+
240+
# Default row group size for monthly shards
241+
# 5000 rows = ~3.5 days at 1-minute intervals = ~80KB per RG
242+
# Monthly files have ~40-44k rows = ~8-9 RGs, good granularity for caching
243+
DEFAULT_MONTHLY_ROW_GROUP_SIZE = 5000
244+
245+
246+
@data.command
247+
@device_id_opt
248+
@data_path_opt
249+
@option('-n', '--dry-run', is_flag=True, help='Show what would be done without writing files')
250+
@option('-r', '--row-group-size', type=int, default=DEFAULT_MONTHLY_ROW_GROUP_SIZE,
251+
help=f'Row group size for output files (default: {DEFAULT_MONTHLY_ROW_GROUP_SIZE})')
252+
def shard(device_id: str | None, data_path: str, dry_run: bool, row_group_size: int):
253+
"""Split single parquet file into monthly shards.
254+
255+
Reads the existing awair-{deviceId}.parquet file and splits it into
256+
monthly files: awair-{deviceId}/{YYYY-MM}.parquet
257+
258+
This reduces Lambda write amplification by allowing updates to only
259+
touch the current month's file.
260+
261+
Default row group size is 5000 rows (~3.5 days, ~80KB) for good cache
262+
granularity. Use --row-group-size to customize.
263+
"""
264+
# Read existing data
265+
echo(f'Reading: {data_path}')
266+
storage = ParquetStorage(data_path)
267+
df = storage.read_data()
268+
269+
if df.empty:
270+
err('No data in file')
271+
return
272+
273+
echo(f'Using row_group_size: {row_group_size}')
274+
275+
# Ensure timestamp is datetime and extract year-month
276+
df['timestamp'] = pd.to_datetime(df['timestamp'])
277+
df['year_month'] = df['timestamp'].dt.strftime('%Y-%m')
278+
279+
# Group by year-month
280+
groups = df.groupby('year_month')
281+
echo(f'Found {len(groups)} months of data:')
282+
283+
# Determine output base path (directory)
284+
# e.g., s3://380nwk/awair-17617.parquet -> s3://380nwk/awair-17617/
285+
if data_path.endswith('.parquet'):
286+
output_base = data_path[:-8] # Remove .parquet suffix
287+
else:
288+
output_base = data_path
289+
290+
# Process each month
291+
for year_month, group_df in sorted(groups):
292+
count = len(group_df)
293+
output_path = f'{output_base}/{year_month}.parquet'
294+
295+
date_range = f'{group_df["timestamp"].min().date()} to {group_df["timestamp"].max().date()}'
296+
echo(f' {year_month}: {count:,} records ({date_range})')
297+
298+
if dry_run:
299+
echo(f' Would write: {output_path}')
300+
else:
301+
# Prepare DataFrame for writing (remove year_month helper column)
302+
write_df = group_df.drop(columns=['year_month']).sort_values('timestamp').reset_index(drop=True)
303+
304+
# Write to monthly file
305+
write_df.to_parquet(output_path, index=False, engine='pyarrow', row_group_size=row_group_size)
306+
echo(f' Wrote: {output_path}')
307+
308+
total_records = len(df)
309+
if dry_run:
310+
echo(f'\nDry run complete. Would shard {total_records:,} records into {len(groups)} monthly files.')
311+
echo('Run without --dry-run to execute.')
312+
else:
313+
echo(f'\nSharded {total_records:,} records into {len(groups)} monthly files.')
314+
echo(f'Original file preserved: {data_path}')
315+
echo('After verifying shards, you can delete the original file.')

src/awair/lmbda/app.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ def __init__(
5050
if not s3_bucket or not s3_key:
5151
raise ValueError(f'Invalid S3 path: {data_path}. Expected format: s3://bucket/key')
5252

53+
# Normalize key for IAM (remove .parquet suffix if present)
54+
# Monthly sharding uses directory structure: awair-{deviceId}/{YYYY-MM}.parquet
55+
s3_key_base = s3_key[:-8] if s3_key.endswith('.parquet') else s3_key
56+
5357
# IAM role for Lambda
5458
lambda_role = iam.Role(
5559
self, "LambdaExecutionRole",
@@ -67,12 +71,18 @@ def __init__(
6771
"s3:PutObject",
6872
"s3:DeleteObject"
6973
],
70-
resources=[f"arn:aws:s3:::{s3_bucket}/{s3_key}"]
74+
# Wildcard for all monthly files: awair-{id}/*.parquet
75+
resources=[f"arn:aws:s3:::{s3_bucket}/{s3_key_base}/*"]
7176
),
7277
iam.PolicyStatement(
7378
effect=iam.Effect.ALLOW,
7479
actions=["s3:ListBucket"],
75-
resources=[f"arn:aws:s3:::{s3_bucket}"]
80+
resources=[f"arn:aws:s3:::{s3_bucket}"],
81+
conditions={
82+
"StringLike": {
83+
"s3:prefix": [f"{s3_key_base}/*"]
84+
}
85+
}
7686
)
7787
]
7888
)

0 commit comments

Comments
 (0)