Skip to content

Commit 87e5a32

Browse files
Improved all docs, added index.mdx in platform
also tested examples e2e
1 parent dd0e8d5 commit 87e5a32

File tree

5 files changed

+206
-44
lines changed

5 files changed

+206
-44
lines changed

src/content/docs/r2/sql/end-to-end-pipeline.mdx

Lines changed: 104 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
---
2-
title: Build a fraud detection pipeline with Cloudflare Pipelines and R2 SQL
2+
title: Build an end to end data pipeline
33
summary: Learn how to create an end-to-end data pipeline using Cloudflare Pipelines, R2 Data Catalog, and R2 SQL for real-time transaction analysis.
44
pcx_content_type: tutorial
55
products:
66
- R2
7-
- R2 Data Catalog
7+
- R2 Data Catalog
88
- R2 SQL
99
---
1010

@@ -83,11 +83,9 @@ npx wrangler r2 bucket catalog compaction enable fraud-detection-data --token $W
8383

8484
### Create the Pipeline stream
8585

86-
Create a stream to receive incoming fraud detection events:
87-
88-
```bash
89-
npx wrangler pipelines streams create fraud-transactions \
90-
--schema '{
86+
First, create a schema file called `raw_transactions_schema.json` with the following `json` schema:
87+
```json
88+
{
9189
"fields": [
9290
{"name": "transaction_id", "type": "string", "required": true},
9391
{"name": "user_id", "type": "int64", "required": true},
@@ -98,20 +96,70 @@ npx wrangler pipelines streams create fraud-transactions \
9896
{"name": "is_fraud", "type": "string", "required": false},
9997
{"name": "ingestion_timestamp", "type": "string", "required": false}
10098
]
101-
}' \
99+
}
100+
```
101+
102+
Create a stream to receive incoming fraud detection events:
103+
104+
```bash
105+
npx wrangler pipelines streams create rawtransactionstream \
106+
--schema-file raw_transactions_schema.json \
102107
--http-enabled true \
103108
--http-auth true
104109
```
105110
:::note
106-
After running the `stream create` command, note the **Stream Endpoint URL** from the output. This is the endpoint you'll use to send data to your pipeline.
111+
Note the **HTTP Ingest Endpoint URL** from the output. This is the endpoint you'll use to send data to your pipeline.
107112
:::
113+
```bash
114+
# The http ingest endpoint from the output (see example below)
115+
export STREAM_ENDPOINT= #the http ingest endpoint from the output (see example below)
116+
```
117+
118+
The output should look like this:
119+
```sh
120+
🌀 Creating stream 'rawtransactionstream'...
121+
✨ Successfully created stream 'rawtransactionstream' with id 'stream_id'.
122+
123+
Creation Summary:
124+
General:
125+
Name: rawtransactionstream
126+
127+
HTTP Ingest:
128+
Enabled: Yes
129+
Authentication: Yes
130+
Endpoint: https://stream_id.ingest.cloudflare.com
131+
CORS Origins: None
132+
133+
Input Schema:
134+
┌───────────────────────┬────────┬────────────┬──────────┐
135+
│ Field Name │ Type │ Unit/Items │ Required │
136+
├───────────────────────┼────────┼────────────┼──────────┤
137+
│ transaction_id │ string │ │ Yes │
138+
├───────────────────────┼────────┼────────────┼──────────┤
139+
│ user_id │ int64 │ │ Yes │
140+
├───────────────────────┼────────┼────────────┼──────────┤
141+
│ amount │ f64 │ │ No │
142+
├───────────────────────┼────────┼────────────┼──────────┤
143+
│ transaction_timestamp │ string │ │ No │
144+
├───────────────────────┼────────┼────────────┼──────────┤
145+
│ location │ string │ │ No │
146+
├───────────────────────┼────────┼────────────┼──────────┤
147+
│ merchant_category │ string │ │ No │
148+
├───────────────────────┼────────┼────────────┼──────────┤
149+
│ is_fraud │ string │ │ No │
150+
├───────────────────────┼────────┼────────────┼──────────┤
151+
│ ingestion_timestamp │ string │ │ No │
152+
└───────────────────────┴────────┴────────────┴──────────┘
153+
```
154+
155+
108156

109157
### Create the data sink
110158

111159
Create a sink that writes data to your R2 bucket as Apache Iceberg tables:
112160

113161
```bash
114-
npx wrangler pipelines sinks create fraud-data-sink \
162+
npx wrangler pipelines sinks create rawtransactionsink \
115163
--type "r2-data-catalog" \
116164
--bucket "fraud-detection-data" \
117165
--roll-interval 30 \
@@ -129,8 +177,8 @@ This creates a `sink` configuration that will write to the Iceberg table fraud_d
129177
Connect your stream to your sink with SQL:
130178

131179
```bash
132-
npx wrangler pipelines create fraud-pipeline \
133-
--sql "INSERT INTO fraud-data-sink SELECT * FROM fraud-transactions"
180+
npx wrangler pipelines create transactionspipeline \
181+
--sql "INSERT INTO rawtransactionsink SELECT * FROM rawtransactionstream"
134182
```
135183

136184
## 5. Generate fraud detection data
@@ -143,15 +191,16 @@ import json
143191
import uuid
144192
import random
145193
import time
194+
import os
146195
from datetime import datetime, timezone, timedelta
147196

148-
# Configuration
149-
STREAM_ENDPOINT = "https://YOUR_STREAM_ID.ingest.cloudflare.com" # From the stream you created
150-
API_TOKEN = "WRANGLER_R2_SQL_AUTH_TOKEN" #the same one created earlier
197+
# Configuration - exported from the prior steps
198+
STREAM_ENDPOINT = os.environ["STREAM_ENDPOINT"]# From the stream you created
199+
API_TOKEN = os.environ["WRANGLER_R2_SQL_AUTH_TOKEN"] #the same one created earlier
151200
EVENTS_TO_SEND = 1000 # Feel free to adjust this
152201

153202
def generate_transaction():
154-
"""Generate some transactions with occassional fraud patterns"""
203+
"""Generate some random transactions with occassional fraud"""
155204

156205
# User IDs
157206
high_risk_users = [1001, 1002, 1003, 1004, 1005]
@@ -160,7 +209,7 @@ def generate_transaction():
160209
user_id = random.choice(high_risk_users + normal_users)
161210
is_high_risk_user = user_id in high_risk_users
162211

163-
# Generate amount
212+
# Generate amounts
164213
if random.random() < 0.05:
165214
amount = round(random.uniform(5000, 50000), 2)
166215
elif random.random() < 0.03:
@@ -169,8 +218,8 @@ def generate_transaction():
169218
amount = round(random.uniform(10, 500), 2)
170219

171220
# Locations
172-
normal_locations = ["NEW_YORK", "LOS_ANGELES", "CHICAGO", "MIAMI", "SEATTLE"]
173-
high_risk_locations = ["UNKNOWN_LOCATION", "VPN_EXIT", "BELARUS", "NIGERIA"]
221+
normal_locations = ["NEW_YORK", "LOS_ANGELES", "CHICAGO", "MIAMI", "SEATTLE", "SAN FRANCISCO"]
222+
high_risk_locations = ["UNKNOWN_LOCATION", "VPN_EXIT", "MARS", "BAT_CAVE"]
174223

175224
if is_high_risk_user and random.random() < 0.3:
176225
location = random.choice(high_risk_locations)
@@ -186,15 +235,15 @@ def generate_transaction():
186235
else:
187236
merchant_category = random.choice(normal_merchants)
188237

189-
# Determine if transaction is fraudulent based on basic risk factors
238+
# Series of checks to either increase fraud score by a certain margin
190239
fraud_score = 0
191240
if amount > 2000: fraud_score += 0.4
192241
if amount < 1: fraud_score += 0.3
193242
if location in high_risk_locations: fraud_score += 0.5
194243
if merchant_category in high_risk_merchants: fraud_score += 0.3
195244
if is_high_risk_user: fraud_score += 0.2
196245

197-
# Compare the fraud score
246+
# Compare the fraud scores
198247
is_fraud = random.random() < min(fraud_score * 0.3, 0.8)
199248

200249
# Generate timestamps (some fraud happens at unusual hours)
@@ -239,14 +288,13 @@ def send_batch_to_stream(events, batch_size=100):
239288
if response.status_code in [200, 201]:
240289
total_sent += len(batch)
241290
fraud_count += fraud_in_batch
242-
print(f"Sent batch of {len(batch)} events (Total: {total_sent})")
291+
print(f"Sent batch of {len(batch)} events (Total: {total_sent})")
243292
else:
244-
print(f"Failed to send batch: {response.status_code} - {response.text}")
293+
print(f"Failed to send batch: {response.status_code} - {response.text}")
245294

246295
except Exception as e:
247-
print(f"Error sending batch: {e}")
296+
print(f"Error sending batch: {e}")
248297

249-
# Small delay between batches
250298
time.sleep(0.1)
251299

252300
return total_sent, fraud_count
@@ -265,10 +313,10 @@ def main():
265313
print(f"📊 Generated {len(events)} total events ({fraud_events} fraud, {fraud_events/len(events)*100:.1f}%)")
266314

267315
# Send to stream
268-
print("📤 Sending data to Cloudflare Stream...")
316+
print("Sending data to Pipeline stream...")
269317
sent, fraud_sent = send_batch_to_stream(events)
270318

271-
print(f"\n🎉 Complete!")
319+
print(f"\nComplete!")
272320
print(f" Events sent: {sent:,}")
273321
print(f" Fraud events: {fraud_sent:,} ({fraud_sent/sent*100:.1f}%)")
274322
print(f" Data is now flowing through your pipeline!")
@@ -305,8 +353,8 @@ SELECT
305353
is_fraud,
306354
transaction_timestamp
307355
FROM fraud_detection.transactions
308-
WHERE __ingest_ts > '2025-09-12T01:00:00Z'
309-
AND is_fruad = 'TRUE'
356+
WHERE __ingest_ts > '2025-09-24T01:00:00Z'
357+
AND is_fraud = 'TRUE'
310358
LIMIT 10"
311359
```
312360
:::note
@@ -318,7 +366,7 @@ Replace `YOUR_WAREHOUSE` with your R2 Data Catalog warehouse. This in the form o
318366
Create a new sink that will write the filtered data to a new Apache Iceberg table in R2 Data Catalog:
319367

320368
```bash
321-
npx wrangler pipelines sink create filtered-fraud-sink \
369+
npx wrangler pipelines sinks create filteredfraudsink \
322370
--type "r2-data-catalog" \
323371
--bucket "fraud-detection-data" \
324372
--roll-interval 30 \
@@ -327,20 +375,20 @@ npx wrangler pipelines sink create filtered-fraud-sink \
327375
--catalog-token $WRANGLER_R2_SQL_AUTH_TOKEN
328376
```
329377

330-
Now you'll create a new SQL query to process data from the original `fraud-transactions` stream and only write flagged transactions that are over the `amount` of 1000.
378+
Now you'll create a new SQL query to process data from the original `rawtransactionstream` stream and only write flagged transactions that are over the `amount` of 1000.
331379

332380
```bash
333-
npx wrangler pipelines create fraud-pipeline \
334-
--sql "INSERT INTO filtered-fraud-sink SELECT * FROM fraud-transactions WHERE is_fraud='TRUE' and amount > 1000"
381+
npx wrangler pipelines create fraudpipeline \
382+
--sql "INSERT INTO filteredfraudsink SELECT * FROM rawtransactionstream WHERE is_fraud='TRUE' and amount > 1000"
335383
```
336384

337385
:::note
338386
It may take a few minutes for the new Pipeline to fully Initialize and start processing the data. Also keep in mind the 30 second `roll-interval`
339387
:::
340388

341-
Let's query our table and check the results:
389+
Let's query the table and check the results:
342390
```bash
343-
npx wrangler r2 sql query "
391+
npx wrangler r2 sql query "YOUR_WAREHOUSE" "
344392
SELECT
345393
transaction_id,
346394
user_id,
@@ -350,15 +398,33 @@ SELECT
350398
is_fraud,
351399
transaction_timestamp
352400
FROM fraud_detection.fraud_transactions
353-
WHERE __ingest_ts > '2025-09-12T01:00:00Z'
354401
LIMIT 10"
355402
```
403+
Let's also verify that the non-fraudulent events are being filtered out:
404+
```bash
405+
npx wrangler r2 sql query "YOUR_WAREHOUSE" "
406+
SELECT
407+
transaction_id,
408+
user_id,
409+
amount,
410+
location,
411+
merchant_category,
412+
is_fraud,
413+
transaction_timestamp
414+
FROM fraud_detection.fraud_transactions
415+
WHERE is_fraud = 'FALSE'
416+
LIMIT 10"
417+
```
418+
You should see the following output:
419+
```text
420+
Query executed successfully with no results
421+
```
356422

357423
## Conclusion
358424

359425
You have successfully built an end to end data pipeline using Cloudflare's data platform. Through this tutorial, you've learned to:
360426

361427
1. **Use R2 Data Catalog** - Leveraged Apache Iceberg tables for efficient data storage
362428
2. **Set up Cloudflare Pipelines** - Created streams, sinks, and pipelines for data ingestion
363-
3. **Generated sample data** - Created transaction data with basic fraud patterns
364-
4. **Query with R2 SQL** - Performed complex fraud analysis using SQL queries
429+
3. **Generated sample data** - Created transaction data with some basic fraud patterns
430+
4. **Query your tables with R2 SQL** - Access raw and processed data tables stored in R2 Data Catalog
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
title: Platform
3+
pcx_content_type: navigation
4+
sidebar:
5+
group:
6+
hideIndex: true
7+
---
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
---
2+
pcx_content_type: concept
3+
title: Pricing
4+
sidebar:
5+
order: 1
6+
head:
7+
- tag: title
8+
content: R2 SQL - Pricing
9+
10+
---
11+
12+
13+
R2 SQL is currently not billed during open beta but will eventually be billed on the amount of data queried.
14+
15+
During the first phase of the R2 SQL open beta, you will not be billed for R2 SQL usage. You will be billed only for R2 usage.
16+
17+
We plan to price based on the volume of data queried by R2 SQL. We will provide at least 30 days' notice and exact pricing before charging.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
---
2+
title: Query data in R2 Data Catalog
3+
pcx_content_type: example
4+
---
5+
6+
:::note
7+
R2 SQL is currently in open beta
8+
:::
9+
10+
## Prerequisites
11+
12+
- Sign up for a [Cloudflare account](https://dash.cloudflare.com/sign-up/workers-and-pages).
13+
- [Create an R2 bucket](/r2/buckets/create-buckets/) and [enable the data catalog](/r2/data-catalog/manage-catalogs/#enable-r2-data-catalog-on-a-bucket).
14+
- [Create an R2 API token](/r2/api/tokens/) with [R2, R2 SQL, and data catalog permissions](/r2/api/tokens/#permissions).
15+
- Tables must have a time-based partition key in order be queried by R2 SQL. Read about the current [limitations](/r2/sql/platform/limitations-best-practices) to learn more.
16+
17+
R2 SQL can currently be accessed via Wrangler commands or a REST API.
18+
19+
## Wrangler
20+
21+
22+
Export your R2 API token as an environment variable:
23+
24+
```bash
25+
export WRANGLER_R2_SQL_AUTH_TOKEN=your_token_here
26+
```
27+
28+
If this is your first time using Wrangler, make sure to login.
29+
```bash
30+
npx wrangler login
31+
```
32+
33+
You'll also want to grab the **warehouse** of the your R2 Data Catalog:
34+
35+
```sh
36+
❯ npx wrangler r2 bucket catalog get [BUCKET_NAME]
37+
38+
⛅️ wrangler 4.38.0
39+
────────────────────────────────────────────────────────────────────────────
40+
▲ [WARNING] 🚧 `wrangler r2 bucket catalog get` is an open-beta command. Please report any issues to https://github.com/cloudflare/workers-sdk/issues/new/choose
41+
42+
43+
Catalog URI: https://catalog.cloudflarestorage.com/[ACCOUNT_ID]/[BUCKET_NAME]
44+
Warehouse: [ACCOUNT_ID]_[BUCKET_NAME]
45+
Status: active
46+
```
47+
48+
To query R2 SQL with Wrangler, simply run:
49+
50+
```sh
51+
npx wrangler r2 sql query "YOUR_WAREHOUSE" "SELECT * FROM namespace.table_name limit 10;"
52+
```
53+
For a full list of supported sql commands, check out the [R2 SQL reference page](/r2/sql/platform/sql-reference).
54+
55+
56+
## REST API
57+
58+
Set your environment variable
59+
60+
```bash
61+
export ACCOUNT_ID="your-cloudflare-account-id"
62+
export BUCKET_NAME="your-r2-bucket-name"
63+
export WRANGLER_R2_SQL_AUTH_TOKEN="your_token_here"
64+
```
65+
66+
Now you're ready to use the REST endpoint
67+
68+
```bash
69+
curl -X POST \
70+
"https://api.sql.cloudflarestorage.com/api/v1/accounts/${ACCOUNT_ID}/r2-sql/query/${BUCKET_NAME}" \
71+
-H "Authorization: Bearer ${WRANGLER_R2_SQL_AUTH_TOKEN}" \
72+
-H "Content-Type: application/json" \
73+
-d '{
74+
"warehouse": "your-warehouse-name",
75+
"query": "SELECT * FROM namespace.table_name limit 10;"
76+
}' | jq .
77+
```

0 commit comments

Comments
 (0)