Skip to content

Commit 01564e4

Browse files
committed
Initial pipelines docs
1 parent 86d210e commit 01564e4

File tree

21 files changed

+2809
-560
lines changed

21 files changed

+2809
-560
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
link: "/pipelines/reference/changelog/"
3+
productName: Pipelines
4+
productLink: "/pipelines/"
5+
productArea: Developer Platform
6+
productAreaLink: "/pipelines/"
7+
entries:
8+
- publish_date: "2024-09-24"
9+
title: Pipelines is now in public beta.
10+
description: |-
11+
Pipelines, a new product to ingest and store real time streaming data, is now in public beta. The public beta is avaiable to any user with a [free or paid Workers plan](/workers/platform/pricing/). Create a Pipeline, and you'll be able to post data to it via HTTP or from a Cloudflare Worker. Pipelines handle batching, buffering, and partitioning the data, before writing it to an R2 bucket of your choice. It's useful to collect clickstream data, or ingest logs from a service. Start building with our [get started guide](/pipelines/getting-started/).
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
---
2+
pcx_content_type: concept
3+
title: Batching
4+
sidebar:
5+
order: 10
6+
---
7+
8+
Pipelines automatically batches requests that are received via HTTP or from a Worker. Batching helps reduce the number of output files written to your destination, which can make them more efficient to query.
9+
10+
There are three ways to define how requests are batched:
11+
12+
1. `batch-max-mb`: The maximum amount of data that will be batched, in megabytes. Default is 10 MB, maximum is 100 MB.
13+
2. `batch-max-rows`: The maximum number of rows or events in a batch before data is written. Default, and maximum, is 10,000 rows.
14+
3. `batch-max-seconds`: The maximum duration of a batch before data is written, in seconds. Default is 15 seconds, maximum is 600 seconds.
15+
16+
All three batch definitions work together. Whichever limit is reached first triggers the delivery of a batch.
17+
18+
For example, a `batch-max-mb` = 100 MB and a `batch-max-seconds` = 600 means that if 100 MB of events are posted to the Pipeline, the batch will be delivered. However, if it takes longer than 600 seconds for 100 MB of events to be posted, a batch of all the messages that were posted during those 600 seconds will be created and delivered.
19+
20+
## Batch settings
21+
22+
You can configure the following batch-level settings to adjust how Pipelines create a batch:
23+
24+
| Setting | Default | Minimum | Maximum |
25+
| ----------------------------------------- | ----------- | --------- | ----------- |
26+
| Maximum Batch Size `batch-max-mb` | 10 MB | 0.001 MB | 100 MB |
27+
| Maximum Batch Timeout `batch-max-seconds` | 15 seconds | 0 seconds | 600 seconds |
28+
| Maximum Batch Rows `batch-max-rows` | 10,000 rows | 1 row | 10,000 rows |
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
title: Configuration
3+
pcx_content_type: navigation
4+
sidebar:
5+
order: 4
6+
group:
7+
hideIndex: true
8+
---
9+
10+
import { DirectoryListing } from "~/components"
11+
12+
<DirectoryListing />
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
---
2+
pcx_content_type: concept
3+
title: Partitions, Filenames and Filepaths
4+
sidebar:
5+
order: 11
6+
7+
---
8+
9+
## Partitions
10+
Partitioning organizes data into directories based on specific fields to improve query performance. It helps by reducing the amount of data scanned for queries, enabling faster reads. By default, Pipelines partitions data by event date. This will be customizable in the future.
11+
12+
For example, the output from a Pipeline in your R2 bucket might look like this:
13+
```sh
14+
- event_date=2024-09-06/hr=15/37db9289-15ba-4e8b-9231-538dc7c72c1e-15.json.gz
15+
- event_date=2024-09-06/hr=15/37db9289-15ba-4e8b-9231-538dc7c72c1e-15.json.gz
16+
```
17+
18+
## Filepath
19+
Customizing the filepath allows you to store data with a specific prefix inside your specified R2 bucket. The data will remain partitioned by date.
20+
21+
To modify the prefix for a Pipeline using Wrangler:
22+
```sh
23+
wrangler pipelines update <pipeline-name> --filepath "test"
24+
```
25+
26+
All the output records generated by your pipeline will be stored under the prefix "test", and will look like this:
27+
```sh
28+
- test/event_date=2024-09-06/hr=15/37db9289-15ba-4e8b-9231-538dc7c72c1e-15.json.gz
29+
- test/event_date=2024-09-06/hr=15/37db9289-15ba-4e8b-9231-538dc7c72c1e-15.json.gz
30+
```
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
title: Examples
3+
pcx_content_type: navigation
4+
sidebar:
5+
order: 4
6+
group:
7+
hideIndex: false
8+
---
9+
10+
import { DirectoryListing } from "~/components"
11+
12+
<DirectoryListing />
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
---
2+
title: Get started
3+
pcx_content_type: get-started
4+
sidebar:
5+
order: 2
6+
head:
7+
- tag: title
8+
content: Get started
9+
---
10+
11+
import { Render, PackageManagers } from "~/components";
12+
13+
Pipelines let you ingest real-time data streams, such as click events on a website, or logs from a service. You can send data to a Pipeline from a Worker, or via HTTP. Pipelines handle batching requests and scales in response to your workload. Finally, Pipelines deliver the output into R2 as JSON files, automatically handling partitioning and compression for efficient querying.
14+
15+
By following this guide, you will:
16+
17+
1. Create your first Pipeline.
18+
2. Connect it to your R2 bucket.
19+
3. Post data to it via HTTP.
20+
4. Verify the output file written to R2.
21+
22+
:::note
23+
24+
Pipelines is in **public beta**, and any developer with a [paid Workers plan](/workers/platform/pricing/#workers) can start using Pipelines immediately.
25+
26+
:::
27+
28+
## Prerequisites
29+
30+
To use Pipelines, you will need:
31+
32+
<Render file="prereqs" product="workers" />
33+
34+
## 1. Set up an R2 bucket and get your API tokens.
35+
36+
Pipelines are built to ingest data and store it in an R2 bucket. Create a bucket by following the [get started guide for R2](r2/get-started/). Save the bucket name for the next step.
37+
38+
Secondly, make sure to get your R2 API tokens to use in the next step. Follow the guide on obtaining R2 API tokens in the [R2 API Tokens Guide](r2/api/s3/tokens/), and make sure to save your Secret Access Key and Access Key IDs.
39+
40+
## 2. Create a Pipeline
41+
42+
To create a Pipeline using Wrangler, run the following command in a the terminal, and specify:
43+
44+
- The name of your Pipeline
45+
- The name of the R2 bucket you created in step 1
46+
- The R2 API credentials from Step 1
47+
48+
```sh
49+
npx wrangler pipelines create [PIPELINE-NAME] --r2 [R2-BUCKET-NAME] --access-key-id [ACCESS-KEY-ID] --secret-access-key [SECRET-ACCESS-KEY]
50+
```
51+
52+
When choosing a name for your Pipeline:
53+
54+
1. Ensure it is descriptive and relevant to the type of events you intend to ingest. You cannot change the name of the Pipeline after creating it.
55+
2. Pipeline names must be between 1 and 63 characters long.
56+
3. The name cannot contain special characters outside dashes (`-`).
57+
4. The name must start and end with a letter or a number.
58+
59+
Once you create your Pipeline, you will receive a HTTP endpoint which you can post data to. You should see output as shown below:
60+
61+
```sh output
62+
🌀 Authorizing R2 bucket "[R2-BUCKET-NAME]"
63+
🌀 Creating pipeline named "[PIPELINE-NAME]"
64+
✅ Successfully created pipeline [PIPELINE-NAME] with ID [PIPELINE-ID]
65+
66+
You can now send data to your pipeline with:
67+
curl "https://<PIPELINE-ID>.pipelines.cloudflare.com/" -d '[{ ...JSON_DATA... }]'
68+
```
69+
70+
## 3. Post data to your pipeline
71+
72+
Use a curl command in your terminal to post an array of JSON objects to the endpoint you received in step 1.
73+
74+
```sh
75+
curl -H "Content-Type:application/json" \
76+
-d '[{"account_id":"test", "other_data": "test"},{"account_id":"test","other_data": "test2"}]' \
77+
<HTTP-endpoint>
78+
```
79+
80+
Once the Pipeline successfully accepts the data, you will receive a success message.
81+
82+
Pipelines handle batching the data, so you can continue posting data to the Pipeline. Once a batch is filled up, the data will be partitioned by date, and written to your R2 bucket.
83+
84+
## 4. Verify in R2
85+
86+
Go to the R2 bucket you created in step 1 via [the Cloudflare dashboard](https://dash.cloudflare.com/). You should see a prefix for today's date. Click through, and you will see a file created containing the JSON data you posted in step 3.
87+
88+
## Summary
89+
90+
By completing this guide, you have:
91+
92+
- Created a Pipeline
93+
- Connected the Pipeline with an R2 bucket as destination.
94+
- Posted data to the R2 bucket via HTTP.
95+
- Verified the output in the R2 bucket.
96+
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
---
2+
title: Overview
3+
type: overview
4+
pcx_content_type: overview
5+
sidebar:
6+
order: 1
7+
badge:
8+
text: Beta
9+
head:
10+
- tag: title
11+
content: Pipelines
12+
---
13+
14+
import { CardGrid, Description, Feature, LinkTitleCard, Plan, RelatedProduct } from "~/components";
15+
16+
<Description>
17+
18+
Ingest, transform, and store real time data streams in R2.
19+
20+
</Description>
21+
22+
<Plan type="paid" />
23+
24+
***
25+
## Features
26+
27+
<Feature header="HTTP as a source" href="/pipelines/sources/http/">
28+
Pipelines generate an HTTP endpoint, which you can post data to.
29+
</Feature>
30+
31+
<Feature header="R2 as a sink" href="/queues/configuration/dead-letter-queues/">
32+
Convert incoming records into compressed JSON files, and write to R2.
33+
</Feature>
34+
35+
***
36+
37+
## More resources
38+
39+
<CardGrid>
40+
41+
<LinkTitleCard title="Limits" href="/pipelines/reference/limits/" icon="document">
42+
Learn about Pipelines limits.
43+
</LinkTitleCard>
44+
45+
<LinkTitleCard title="@CloudflareDev" href="https://x.com/cloudflaredev" icon="x.com">
46+
Follow @CloudflareDev on Twitter to learn about product announcements, and what is new in Cloudflare Workers.
47+
</LinkTitleCard>
48+
49+
<LinkTitleCard title="Developer Discord" href="https://discord.cloudflare.com" icon="discord">
50+
Connect with the Workers community on Discord to ask questions, show what you are building, and discuss the platform with other developers.
51+
</LinkTitleCard>
52+
53+
</CardGrid>
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
title: Observability
3+
pcx_content_type: navigation
4+
sidebar:
5+
order: 5
6+
group:
7+
hideIndex: true
8+
---
9+
10+
import { DirectoryListing } from "~/components"
11+
12+
<DirectoryListing />
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
---
2+
pcx_content_type: concept
3+
title: Metrics
4+
sidebar:
5+
order: 10
6+
7+
---
8+
9+
Pipelines metrics are split across three different nodes under `viewer` > `accounts`. Refer to [Explore the GraphQL schema](/analytics/graphql-api/getting-started/explore-graphql-schema/) to learn how to navigate a GraphQL schema and discover which data are available.
10+
11+
To learn more about the GraphQL Analytics API, refer to [GraphQL Analytics API](/analytics/graphql-api/).
12+
13+
You can use the GraphQL API to measure metrics for data ingested, as well as data delivered.
14+
15+
## Write GraphQL queries
16+
17+
Examples of how to explore your Pipelines metrics.
18+
19+
### Measure total bytes & records ingested over time period
20+
21+
```graphql
22+
query PipelineIngestion($accountTag: string!, $pipelineId: string!, $datetimeStart: Time!, $datetimeEnd: Time!) {
23+
viewer {
24+
accounts(filter: {accountTag: $accountTag}) {
25+
pipelinesIngestionAdaptiveGroups(
26+
limit: 10000
27+
filter: {
28+
pipelineId: $pipelineId
29+
datetime_geq: $datetimeStart
30+
datetime_leq: $datetimeEnd
31+
}
32+
33+
)
34+
{
35+
sum {
36+
ingestedBytes,
37+
ingestedRecords,
38+
}
39+
}
40+
}
41+
}
42+
}
43+
```
44+
45+
### Measure volume of data delivered
46+
47+
```graphql
48+
query PipelineDelivery($accountTag: string!, $queueId: string!, $datetimeStart: Time!, $datetimeEnd: Time!) {
49+
viewer {
50+
accounts(filter: {accountTag: $accountTag}) {
51+
pipelinesDeliveryAdaptiveGroups(
52+
limit: 10000
53+
filter: {
54+
pipelineId: $queueId
55+
datetime_geq: $datetimeStart
56+
datetime_leq: $datetimeEnd
57+
}
58+
) {
59+
sum {
60+
deliveredBytes,
61+
}
62+
}
63+
}
64+
}
65+
}
66+
```
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
pcx_content_type: navigation
3+
title: Pipelines REST API
4+
sidebar:
5+
order: 10
6+
7+
---

0 commit comments

Comments
 (0)