Skip to content

Commit 07d696e

Browse files
committed
add hackernews update script
1 parent c209011 commit 07d696e

File tree

5 files changed

+338
-0
lines changed

5 files changed

+338
-0
lines changed

load_scripts/hackernews/Dockerfile

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
FROM python:3.12-slim
2+
3+
WORKDIR /app
4+
5+
# Install dependencies
6+
COPY requirements.txt /app/requirements.txt
7+
RUN pip install --no-cache-dir -r requirements.txt
8+
9+
# Copy the ingestion script
10+
COPY ingest.py /app/ingest.py
11+
RUN chmod +x /app/ingest.py
12+
13+
# Set default environment variables
14+
ENV CLICKHOUSE_HOST=localhost \
15+
CLICKHOUSE_PORT=9000 \
16+
CLICKHOUSE_USER=default \
17+
CLICKHOUSE_PASSWORD="" \
18+
CLICKHOUSE_DATABASE=default \
19+
TABLE_NAME=hackernews \
20+
WORKERS=500 \
21+
BLOCK_SIZE=10000 \
22+
BATCH_LINES=500
23+
24+
ENTRYPOINT ["python", "/app/ingest.py"]

load_scripts/hackernews/README.md

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# Hacker News Ingestion Scripts
2+
3+
Fast incremental ingestion of Hacker News items into ClickHouse.
4+
5+
## Python Script (Recommended)
6+
7+
High-performance async HTTP implementation that can process 1000+ requests per second.
8+
9+
### Requirements
10+
11+
```bash
12+
pip install -r requirements.txt
13+
```
14+
15+
### Usage
16+
17+
```bash
18+
# Incremental: from max(id) in ClickHouse -> HN maxitem
19+
python ingest.py
20+
21+
# Start from specific ID
22+
python ingest.py 46352456
23+
24+
# Dry run: estimate items to download without downloading
25+
python ingest.py --dry-run
26+
27+
# Dry run from specific ID
28+
python ingest.py --dry-run 46352456
29+
30+
# With custom configuration
31+
WORKERS=1000 BLOCK_SIZE=20000 python ingest.py
32+
```
33+
34+
### Environment Variables
35+
36+
- `CLICKHOUSE_HOST` - ClickHouse host (default: localhost)
37+
- `CLICKHOUSE_PORT` - ClickHouse native port (default: 9000)
38+
- `CLICKHOUSE_USER` - ClickHouse user (default: default)
39+
- `CLICKHOUSE_PASSWORD` - ClickHouse password (default: empty)
40+
- `CLICKHOUSE_DATABASE` - Database name (default: default)
41+
- `TABLE_NAME` - Table name (default: hackernews)
42+
- `WORKERS` - Concurrent HTTP connections (default: 500)
43+
- `BLOCK_SIZE` - Items per block (default: 10000)
44+
- `BATCH_LINES` - Insert batch size (default: 500)
45+
46+
## Docker
47+
48+
### Build
49+
50+
```bash
51+
docker build -t hn-ingest .
52+
```
53+
54+
### Run
55+
56+
```bash
57+
# Incremental from max(id)
58+
docker run --rm \
59+
-e CLICKHOUSE_HOST=your-clickhouse-host \
60+
-e CLICKHOUSE_PORT=9000 \
61+
hn-ingest
62+
63+
# Start from specific ID
64+
docker run --rm \
65+
-e CLICKHOUSE_HOST=your-clickhouse-host \
66+
-e CLICKHOUSE_PORT=9000 \
67+
hn-ingest 46352456
68+
```
69+
70+
## Performance
71+
72+
The Python async implementation is significantly faster than sequential approaches:
73+
74+
- **~1000+ requests/second** with default settings
75+
- **~10,000 items in ~10-15 seconds**
76+
- Configurable concurrency via `WORKERS` environment variable
77+
78+
## Table Schema
79+
80+
The script expects a ClickHouse table with the following schema:
81+
82+
```sql
83+
CREATE TABLE hackernews (
84+
id UInt32,
85+
deleted UInt8,
86+
type String,
87+
by String,
88+
time UInt32,
89+
text String,
90+
dead UInt8,
91+
parent UInt32,
92+
poll UInt32,
93+
kids Array(UInt32),
94+
url String,
95+
score UInt32,
96+
title String,
97+
parts Array(UInt32),
98+
descendants UInt32
99+
) ENGINE = MergeTree()
100+
ORDER BY id;
101+
```
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
steps:
2+
- name: 'gcr.io/cloud-builders/docker'
3+
entrypoint: 'bash'
4+
args: ['-c', 'docker pull gcr.io/$PROJECT_ID/ch-hackernews:latest || exit 0']
5+
- name: 'gcr.io/cloud-builders/docker'
6+
args: [
7+
'build',
8+
'-t', 'gcr.io/$PROJECT_ID/ch-hackernews:latest',
9+
'--cache-from', 'gcr.io/$PROJECT_ID/ch-hackernews:latest',
10+
'.'
11+
]
12+
# Push the container image to Container Registry
13+
- name: 'gcr.io/cloud-builders/docker'
14+
args: ['push', 'gcr.io/$PROJECT_ID/ch-hackernews:latest']
15+
# Deploy container image to Cloud Run
16+
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
17+
entrypoint: gcloud
18+
args: ['run', 'jobs', 'deploy', 'ch-hackernews', '--image', 'gcr.io/$PROJECT_ID/ch-hackernews:latest', '--region', 'us-central1']
19+
images: ['gcr.io/$PROJECT_ID/ch-hackernews:latest']

load_scripts/hackernews/ingest.py

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Usage:
4+
./ingest.py # incremental: from max(id) in CH -> HN maxitem
5+
./ingest.py 38718864 # start from specific ID -> HN maxitem
6+
./ingest.py --dry-run # estimate items to download without downloading
7+
"""
8+
9+
import asyncio
10+
import json
11+
import logging
12+
import os
13+
import sys
14+
from typing import List, Optional
15+
16+
import aiohttp
17+
from clickhouse_driver import Client
18+
19+
# Configuration
20+
HN_BASE_URL = "https://hacker-news.firebaseio.com/v0"
21+
MAX_CONNECTIONS = int(os.getenv("WORKERS", "500"))
22+
BATCH_SIZE = int(os.getenv("BATCH_LINES", "500"))
23+
BLOCK_SIZE = int(os.getenv("BLOCK_SIZE", "10000"))
24+
25+
# ClickHouse connection (native protocol, not HTTP)
26+
CH_HOST = os.getenv("CLICKHOUSE_HOST", "localhost")
27+
CH_PORT = int(os.getenv("CLICKHOUSE_PORT", "9000")) # Native protocol port, not 8123
28+
CH_USER = os.getenv("CLICKHOUSE_USER", "default")
29+
CH_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD", "")
30+
CH_DATABASE = os.getenv("CLICKHOUSE_DATABASE", "default")
31+
CH_SECURE = os.getenv("CLICKHOUSE_SECURE", False)
32+
TABLE_NAME = os.getenv("TABLE_NAME", "hackernews")
33+
34+
# Logging
35+
logging.basicConfig(
36+
level=logging.INFO,
37+
format='%(asctime)s - %(levelname)s - %(message)s'
38+
)
39+
logger = logging.getLogger(__name__)
40+
41+
42+
async def fetch_item(session: aiohttp.ClientSession, item_id: int) -> Optional[dict]:
43+
"""Fetch a single HN item by ID."""
44+
url = f"{HN_BASE_URL}/item/{item_id}.json"
45+
try:
46+
async with session.get(url, timeout=10) as response:
47+
if response.status == 200:
48+
return await response.json()
49+
return None
50+
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
51+
logger.debug(f"Failed to fetch item {item_id}: {e}")
52+
return None
53+
54+
55+
async def fetch_maxitem(session: aiohttp.ClientSession) -> int:
56+
"""Fetch the current max item ID from HN."""
57+
url = f"{HN_BASE_URL}/maxitem.json"
58+
async with session.get(url) as response:
59+
return await response.json()
60+
61+
62+
async def download_batch(session: aiohttp.ClientSession, start_id: int, end_id: int) -> List[dict]:
63+
"""Download a batch of items in parallel."""
64+
tasks = [fetch_item(session, item_id) for item_id in range(start_id, end_id + 1)]
65+
results = await asyncio.gather(*tasks)
66+
# Filter out None values and non-valid items
67+
return [item for item in results if item and isinstance(item, dict)]
68+
69+
70+
def transform_item(item: dict) -> tuple:
71+
"""Transform HN item to ClickHouse row format."""
72+
return (
73+
item.get("id", 0),
74+
1 if item.get("deleted", False) else 0,
75+
item.get("type", "story"),
76+
item.get("by", ""),
77+
item.get("time", 0),
78+
item.get("text", ""),
79+
1 if item.get("dead", False) else 0,
80+
item.get("parent", 0),
81+
item.get("poll", 0),
82+
item.get("kids", []),
83+
item.get("url", ""),
84+
item.get("score", 0),
85+
item.get("title", ""),
86+
item.get("parts", []),
87+
item.get("descendants", 0)
88+
)
89+
90+
91+
def insert_to_clickhouse(client: Client, items: List[dict]) -> int:
92+
"""Insert items into ClickHouse."""
93+
if not items:
94+
return 0
95+
96+
# Filter valid item types
97+
valid_types = {"story", "comment", "poll", "pollopt", "job"}
98+
filtered_items = [item for item in items if item.get("type") in valid_types]
99+
100+
if not filtered_items:
101+
return 0
102+
103+
rows = [transform_item(item) for item in filtered_items]
104+
105+
query = f"""
106+
INSERT INTO {TABLE_NAME}
107+
(id, deleted, type, by, time, text, dead, parent, poll, kids, url, score, title, parts, descendants)
108+
VALUES
109+
"""
110+
111+
client.execute(query, rows)
112+
return len(rows)
113+
114+
115+
async def main():
116+
# Check for dry-run mode
117+
dry_run = "--dry-run" in sys.argv
118+
119+
# Determine starting ID
120+
client = Client(
121+
host=CH_HOST,
122+
port=CH_PORT,
123+
user=CH_USER,
124+
password=CH_PASSWORD,
125+
database=CH_DATABASE,
126+
secure=CH_SECURE
127+
)
128+
129+
# Parse arguments (skip --dry-run flag)
130+
args = [arg for arg in sys.argv[1:] if arg != "--dry-run"]
131+
132+
if len(args) > 0:
133+
start_id = int(args[0])
134+
logger.info(f"Starting from ID (parameter): {start_id}")
135+
else:
136+
result = client.execute(f"SELECT max(id) FROM {TABLE_NAME}")
137+
last_id = result[0][0] if result and result[0][0] else 0
138+
start_id = last_id + 1
139+
logger.info(f"Last downloaded ID: {last_id}")
140+
logger.info(f"Starting from ID: {start_id}")
141+
142+
# Fetch maxitem
143+
connector = aiohttp.TCPConnector(limit=MAX_CONNECTIONS)
144+
async with aiohttp.ClientSession(connector=connector) as session:
145+
maxitem = await fetch_maxitem(session)
146+
logger.info(f"HN maxitem: {maxitem}")
147+
148+
if start_id > maxitem:
149+
logger.info(f"Nothing to do: start ID {start_id} > maxitem {maxitem}")
150+
return
151+
152+
# Calculate estimate
153+
items_to_download = maxitem - start_id + 1
154+
logger.info(f"Items to download: {items_to_download:,} (IDs {start_id} -> {maxitem})")
155+
156+
if dry_run:
157+
logger.info("=" * 60)
158+
logger.info("DRY RUN MODE - No actual download will be performed")
159+
logger.info("=" * 60)
160+
logger.info(f"Estimated items to process: {items_to_download:,}")
161+
logger.info(f"Estimated blocks: {(items_to_download + BLOCK_SIZE - 1) // BLOCK_SIZE}")
162+
logger.info(f"Block size: {BLOCK_SIZE:,} items")
163+
logger.info(f"Concurrent workers: {MAX_CONNECTIONS}")
164+
logger.info(f"Estimated time: ~{items_to_download / 1000:.1f}-{items_to_download / 500:.1f} seconds")
165+
logger.info("=" * 60)
166+
return
167+
168+
logger.info(f"Starting download...")
169+
170+
total_inserted = 0
171+
current_id = start_id
172+
173+
while current_id <= maxitem:
174+
batch_end = min(current_id + BLOCK_SIZE - 1, maxitem)
175+
logger.info(f"Processing block: {current_id}..{batch_end}")
176+
177+
# Download batch
178+
items = await download_batch(session, current_id, batch_end)
179+
logger.info(f"Downloaded {len(items)} items")
180+
181+
# Insert to ClickHouse
182+
inserted = insert_to_clickhouse(client, items)
183+
total_inserted += inserted
184+
logger.info(f"Inserted {inserted} items (total: {total_inserted})")
185+
186+
current_id = batch_end + 1
187+
188+
logger.info(f"Done! Total inserted: {total_inserted}")
189+
190+
191+
if __name__ == "__main__":
192+
asyncio.run(main())
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
aiohttp>=3.9.0
2+
clickhouse-driver>=0.2.6

0 commit comments

Comments
 (0)