You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: src/content/docs/pipelines/index.mdx
+1-1Lines changed: 1 addition & 1 deletion
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -32,7 +32,7 @@ Ingest, transform, and load streaming data into Apache Iceberg or Parquet in R2.
32
32
33
33
<Plantype="paid" />
34
34
35
-
Cloudflare Pipelines ingests streaming events from your applications, transforms them with SQL, and loads them into [R2](/r2/) as queryable Apache Iceberg tables managed by [R2 Data Catalog](/r2/data-catalog/) or as Parquet and JSON files.
35
+
Cloudflare Pipelines ingests events, transforms them with SQL, and delivers them to R2 as [Iceberg tables](/r2/data-catalog/) or as Parquet and JSON files.
36
36
37
37
Whether you're processing server logs, mobile application events, IoT telemetry, or clickstream data, Pipelines provides durable ingestion via HTTP endpoints or Worker bindings, SQL-based transformations, and exactly-once delivery to R2. This makes it easy to build analytics-ready data warehouses and lakehouses without managing streaming infrastructure.
Copy file name to clipboardExpand all lines: src/content/docs/pipelines/pipelines/index.mdx
+2-4Lines changed: 2 additions & 4 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -2,16 +2,14 @@
2
2
title: Pipelines
3
3
pcx_content_type: navigation
4
4
sidebar:
5
-
order: 3
5
+
order: 4
6
6
---
7
7
8
8
import { LinkCard } from"~/components";
9
9
10
10
Pipelines connect [streams](/pipelines/streams/) and [sinks](/pipelines/sinks/) via SQL transformations, which can modify events before writing them to storage. This enables you to shift left, pushing validation, schematization, and processing to your ingestion layer to make your queries easy, fast, and correct.
11
11
12
-
## What are Pipelines?
13
-
14
-
Pipelines define the SQL transformations that process data as it flows from streams to sinks. They enable you to filter, transform, enrich, and restructure events in real-time before they reach storage.
12
+
Pipelines enable you to filter, transform, enrich, and restructure events in real-time as data flows from streams to sinks.
If the specified namespace and table do not exist, the sink will create them automatically.
22
+
The sink will create the specified namespace and table if they do not exist. Sinks cannot be created for existing Iceberg tables.
23
23
24
24
## Format
25
25
@@ -43,10 +43,10 @@ Configure Parquet compression for optimal storage and query performance:
43
43
44
44
### Row group size
45
45
46
-
[Row groups](https://parquet.apache.org/docs/file-format/configurations/) are sets of rows in a Parquet file that are stored together, affecting memory usage and query performance. Configure the target row group size:
46
+
[Row groups](https://parquet.apache.org/docs/file-format/configurations/) are sets of rows in a Parquet file that are stored together, affecting memory usage and query performance. Configure the target row group size in MB:
47
47
48
48
```bash
49
-
--target-row-group-size 1024MB
49
+
--target-row-group-size 256
50
50
```
51
51
52
52
## Batching and rolling policy
@@ -77,10 +77,5 @@ Set maximum file size in MB before creating a new file:
77
77
R2 Data Catalog sinks require an API token with [R2 Admin Read & Write permissions](/r2/data-catalog/manage-catalogs/#create-api-token-in-the-dashboard). This permission grants the sink access to both R2 Data Catalog and R2 storage.
[Row groups](https://parquet.apache.org/docs/file-format/configurations/) are sets of rows in a Parquet file that are stored together, affecting memory usage and query performance. Configure the target row group size:
48
+
[Row groups](https://parquet.apache.org/docs/file-format/configurations/) are sets of rows in a Parquet file that are stored together, affecting memory usage and query performance. Configure the target row group size in MB:
56
49
57
50
```bash
58
-
--target-row-group-size 1024MB
51
+
--target-row-group-size 256
59
52
```
60
53
61
54
## File organization
62
55
63
-
Files are written with UUID names within the partitioned directory structure. For example, with prefix`analytics` and default partitioning:
56
+
Files are written with UUID names within the partitioned directory structure. For example, with path`analytics` and default partitioning:
Set a base directory in your bucket where files will be written:
72
65
73
66
```bash
74
-
npx wrangler pipelines sinks create my-sink \
75
-
--type r2 \
76
-
--bucket my-bucket \
77
-
--path analytics/events
67
+
--path analytics/events
78
68
```
79
69
80
70
### Partitioning
81
71
82
-
R2 sinks automatically partition files by time using a configurable pattern. The default pattern is `year=%Y/month=%m/day=%d`.
72
+
R2 sinks automatically partition files by time using a configurable pattern. The default pattern is `year=%Y/month=%m/day=%d` (Hive-style partitioning).
83
73
84
74
```bash
85
-
npx wrangler pipelines sinks create my-sink \
86
-
--type r2 \
87
-
--bucket my-bucket \
88
-
--partitioning "year=%Y/month=%m/day=%d/hour=%H"
75
+
--partitioning "year=%Y/month=%m/day=%d/hour=%H"
89
76
```
90
77
78
+
For available format specifiers, refer to [strftime documentation](https://docs.rs/chrono/latest/chrono/format/strftime/index.html).
79
+
91
80
## Batching and rolling policy
92
81
93
82
Control when files are written to R2. Configure based on your needs:
Copy file name to clipboardExpand all lines: src/content/docs/pipelines/sinks/index.mdx
+2-6Lines changed: 2 additions & 6 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -2,18 +2,14 @@
2
2
title: Sinks
3
3
pcx_content_type: navigation
4
4
sidebar:
5
-
order: 4
5
+
order: 3
6
6
---
7
7
8
8
import { LinkCard } from"~/components";
9
9
10
10
Sinks define destinations for your data in Cloudflare Pipelines. They support writing to [R2 Data Catalog](/r2/data-catalog/) as Apache Iceberg tables or to [R2](/r2/) as raw JSON or Parquet files.
11
11
12
-
## What are Sinks?
13
-
14
-
Sinks write processed data from pipelines to R2. They provide exactly-once delivery guarantees, ensuring events are never duplicated or dropped.
15
-
16
-
Sinks can be configured to write files frequently for low-latency ingestion or to write larger, less frequent files for better query performance. Configuration options include batching settings, compression, and output formats.
12
+
Sinks provide exactly-once delivery guarantees, ensuring events are never duplicated or dropped. They can be configured to write files frequently for low-latency ingestion or to write larger, less frequent files for better query performance.
@@ -41,21 +41,14 @@ Pipelines provides array functions for manipulating list values, and lists may b
41
41
42
42
### Struct types
43
43
44
-
Structs combine related fields into a single value. In stream schemas, structs are declared using the `struct` type with a `fields` array. In SQL, structs are declared using this syntax: `STRUCT<field_name field_type, ..>`, and may contain any other type, including lists and other structs.
44
+
Structs combine related fields into a single value. In stream schemas, structs are declared using the `struct` type with a `fields` array. In SQL, structs can be created using the `struct` function.
45
45
46
-
Example struct in SQL:
46
+
Example creating a struct in SQL:
47
47
48
48
```sql
49
-
CREATETABLEevents (
50
-
properties STRUCT <
51
-
user_id TEXT,
52
-
amounts INT[],
53
-
profile STRUCT <
54
-
first_name TEXT,
55
-
last_name TEXT
56
-
>
57
-
>
58
-
)
49
+
SELECT struct('user123', 'purchase', 29.99) as event_data FROM events
59
50
```
60
51
61
-
Struct fields can be accessed via `.` notation, for example `properties.profile.first_name`.
52
+
This creates a struct with fields `c0`, `c1`, `c2` containing the user ID, event type, and amount.
53
+
54
+
Struct fields can be accessed via `.` notation, for example `event_data.c0` for the user ID.
Copy file name to clipboardExpand all lines: src/content/docs/pipelines/streams/index.mdx
+3-5Lines changed: 3 additions & 5 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -7,13 +7,11 @@ sidebar:
7
7
8
8
import { LinkCard } from"~/components";
9
9
10
-
Streams are durable, buffered queues that receive and store events for processing in [Cloudflare Pipelines](/pipelines/). They provide reliable data ingestion and can accept events via HTTP endpoints or Worker bindings.
10
+
Streams are durable, buffered queues that receive and store events for processing in [Cloudflare Pipelines](/pipelines/). They provide reliable data ingestion via HTTP endpoints and Worker bindings, ensuring no data loss even during downstream processing delays or failures.
11
11
12
-
## What are Streams?
12
+
A single stream can be read by multiple pipelines, allowing you to route the same data to different destinations or apply different transformations. For example, you might send user events to both a real-time analytics pipeline and a data warehouse pipeline.
13
13
14
-
Streams act as the entry point for your data into Pipelines. They durably buffer incoming events, ensuring no data loss even during downstream processing delays or failures. Events are persisted until successfully processed by connected pipelines.
15
-
16
-
Streams currently accept events in JSON format via [HTTP endpoints](/pipelines/streams/writing-to-streams/) and [Workers bindings](/pipelines/streams/writing-to-streams/). Streams support both structured events with defined schemas and unstructured JSON. When a schema is provided, Streams will validate and enforce it for incoming events.
14
+
Streams currently accept events in JSON format and support both structured events with defined schemas and unstructured JSON. When a schema is provided, streams will validate and enforce it for incoming events.
Copy file name to clipboardExpand all lines: src/content/docs/pipelines/streams/manage-streams.mdx
+32-12Lines changed: 32 additions & 12 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -65,15 +65,35 @@ Example schema file:
65
65
"type": "string",
66
66
"required": true
67
67
},
68
-
{
69
-
"name": "event_type",
70
-
"type": "string",
71
-
"required": true
72
-
},
73
68
{
74
69
"name": "amount",
75
-
"type": "f64",
70
+
"type": "float64",
76
71
"required": false
72
+
},
73
+
{
74
+
"name": "tags",
75
+
"type": "list",
76
+
"required": false,
77
+
"items": {
78
+
"type": "string"
79
+
}
80
+
},
81
+
{
82
+
"name": "metadata",
83
+
"type": "struct",
84
+
"required": false,
85
+
"fields": [
86
+
{
87
+
"name": "source",
88
+
"type": "string",
89
+
"required": false
90
+
},
91
+
{
92
+
"name": "priority",
93
+
"type": "int32",
94
+
"required": false
95
+
}
96
+
]
77
97
}
78
98
]
79
99
}
@@ -83,16 +103,16 @@ Example schema file:
83
103
84
104
-`string` - Text values
85
105
-`int32`, `int64` - Integer numbers
86
-
-`f32`, `f64` - Floating-point numbers
106
+
-`float32`, `float64` - Floating-point numbers
87
107
-`bool` - Boolean true/false
88
-
-`timestamp` - ISO 8601 timestamps
89
-
-`json` - Nested JSON objects
90
-
-`bytes` - Binary data
108
+
-`timestamp` - RFC 3339 timestamps, or numeric values parsed as Unix seconds, milliseconds, or microseconds (depending on unit)
109
+
-`json` - JSON objects
110
+
-`binary` - Binary data (base64-encoded)
91
111
-`list` - Arrays of values
92
112
-`struct` - Nested objects with defined fields
93
113
94
114
:::note
95
-
Events with invalid schemas are accepted during ingestion but will be dropped during processing. Schema modifications are not supported after stream creation.
115
+
Events that do not match the defined schema are accepted during ingestion but will be dropped during processing. Schema modifications are not supported after stream creation.
Deleting a stream will permanently remove all buffered events that have not been processed. Ensure all data has been delivered to your sink before deletion.
190
+
Deleting a stream will permanently remove all buffered events that have not been processed and will delete any dependent pipelines. Ensure all data has been delivered to your sink before deletion.
0 commit comments