Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions src/content/docs/pipelines/get-started.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,26 @@ Once you create your Pipeline, you will receive a HTTP endpoint which you can po

```sh output
🌀 Authorizing R2 bucket "[R2-BUCKET-NAME]"
🌀 Creating pipeline named "[PIPELINE-NAME]"
✅ Successfully created pipeline [PIPELINE-NAME] with ID [PIPELINE-ID]

You can now send data to your pipeline with:
curl "https://<PIPELINE-ID>.pipelines.cloudflare.com/" -d '[{ "foo":"bar }]'
Opening a link in your default browser: https://oauth.pipelines.cloudflare.com/oauth/login?accountId=<ACCOUNT_ID>&bucketName=[R2-BUCKET-NAME]&pipelineName=[PIPELINE-NAME]
🌀 Checking access to R2 bucket "[R2-BUCKET-NAME]"
🌀 Creating Pipeline named "[PIPELINE-NAME]"
✅ Successfully created Pipeline "[PIPELINE-NAME]" with id [PIPELINE-ID]
🎉 You can now send data to your Pipeline!

To start interacting with this Pipeline from a Worker, open your Worker’s config file and add the following binding configuration:

{
"pipelines": [
{
"pipeline": "[PIPELINE-NAME]",
"binding": "PIPELINE"
}
]
}

Send data to your Pipeline's HTTP endpoint:

curl "https://<PIPELINE_ID>.pipelines.cloudflare.com" -d '[{"foo": "bar"}]
```

## 3. Post data to your pipeline
Expand Down
2 changes: 1 addition & 1 deletion src/content/docs/pipelines/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Ingest and load real time data streams to R2, using Cloudflare Pipelines.

<Plan type="paid" />

Pipelines lets you ingest and load real time data streams into R2, without managing any infrastructure. You can send data to a Pipeline data via HTTP, or from a Worker. Your Pipeline will handle batching the data, generating compressed JSON files, and delivering the files to an R2 bucket.
Pipelines lets you ingest and load real time data streams into R2, without managing any infrastructure. You can send data to a Pipeline via HTTP, or from a Worker. Your Pipeline will handle batching the data, generating compressed JSON files, and delivering the files to an R2 bucket.

Refer to the [get started guide](/pipelines/get-started) to start building with Pipelines.

Expand Down
23 changes: 17 additions & 6 deletions src/content/docs/pipelines/sources/worker.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@ head:
content: Pipeline Source - Worker
---

import { Render, PackageManagers } from "~/components";
import { Render, PackageManagers, WranglerConfig } from "~/components";

You can send records to your Pipeline directly from a [Cloudflare Worker](/workers/). To do so, you need to:

1. Create a Worker
2. Create a Pipeline
3. Add your Pipeline as a binding in your Workers' `wrangler.toml` file
3. Add your Pipeline as a binding in your Workers' `wrangler.jsonc` file
4. Write your Worker, to send records to your Pipeline
5. Deploy your Worker
6. Verify in R2

## 1. Create a Worker

Create a Cloudflare Worker if you don't already have one. This Worker will send records to your Pipeline.

To create a Worker, run the following command in a terminal:
Expand All @@ -39,38 +41,44 @@ To create a Worker, run the following command in a terminal:
}}
/>

This will create a new directory, which will include both a `src/index.ts` Worker script, and a [`wrangler.toml`](/workers/wrangler/configuration/) configuration file. Navigate into the newly created directory:
This will create a new directory, which will include both a `src/index.ts` Worker script, and a [`wrangler.jsonc`](/workers/wrangler/configuration/) configuration file. Navigate into the newly created directory:

```sh
cd pipeline-worker
```

## 2. Create a Pipeline

Create a new Pipeline, if you don't already have one. If this is your first time using Pipelines, follow the instructions in the [get started guide](/pipelines/get-started).

By default, Worker bindings are enabled on all Pipelines. Keep track of the name you gave your Pipeline in this stage; we'll use it in the next step.

## 3. Add a Binding

To connect your Worker to your Pipeline, you need to create a binding. [Bindings](/workers/runtime-apis/bindings/) allow you to grant specific capabilities to your Worker.

Open your newly generated `wrangler.toml` configuration file and add the following:
Open your newly generated `wrangler.jsonc` configuration file and add the following:

<WranglerConfig>
```toml
[[pipelines]]
binding = "MY_PIPELINE"
pipeline = "<MY-PIPELINE-NAME>"
```
</WranglerConfig>

Replace `<MY-PIPELINE-NAME>` with the name of the Pipeline you created in step 2. Next, replace `MY_PIPELINE` with the name you want for your `binding`. The binding must be a valid JavaScript variable name. This is the variable you will use to reference this queue in your Worker.

## 4. Write your Worker

You will now configure your Worker to send records to your Pipeline. Your Worker will:

1. Take a request it receives from the browser
2. Transform the request to JSON
3. Send the resulting record to your Pipeline

In your Worker project directory, open the `src` folder and add the following to your `index.ts` file:

```ts
export interface Env {
<MY_PIPELINE>: Pipeline<any>;
Expand All @@ -94,7 +102,8 @@ Replace `MY_PIPELINE` with the name of the binding you set in Step 3. If sending
In a production application, you would likely use a [`try...catch`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/try...catch) statement to catch the exception and handle it directly (for example, return a custom error or even retry).

## 5. Publish your Worker
With your `wrangler.toml` file and `index.ts` file configured, you are ready to publish your Worker. To publish your Worker, run:

With your `wrangler.jsonc` file and `index.ts` file configured, you are ready to publish your Worker. To publish your Worker, run:

```sh
npx wrangler deploy
Expand All @@ -111,9 +120,11 @@ Published <YOUR-WORKER-NAME> (0.29 sec)
Copy your `*.workers.dev` subdomain and paste it into a new browser tab. Refresh the page a few times to send records to your Pipeline. Your browser should return the `Success` response after sending the record to your Pipeline.

## 6. Verify in R2

Go to the R2 bucket you created in step 2 via [the Cloudflare dashboard](https://dash.cloudflare.com/). You should see a prefix for today's date. Click through, and you'll find one or more files, containing the records you sent in step 4.

# Local Development

:::note
Known issue: When running your Worker locally, sending data to your Pipeline currently results in an error.
When running your Worker locally, the data is currently not sent to your production Pipeline. Only when you deploy your Worker will the data be sent to your production Pipeline.
:::
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
updated: 2025-02-24
updated: 2025-03-31
difficulty: Intermediate
content_type: 📝 Tutorial
pcx_content_type: tutorial
Expand All @@ -15,7 +15,7 @@ languages:

import { Render, PackageManagers, Details } from "~/components";

In this tutorial, you will learn how to ingest clickstream data to a R2 bucket using Pipelines. You will also learn how to connect the bucket to MotherDuck. You will then query the data using MotherDuck.
In this tutorial, you will learn how to ingest clickstream data to a R2 bucket using Pipelines. You will use the Pipeline binding to send the clickstream data to the R2 bucket from your Worker. You will also learn how to connect the bucket to MotherDuck. You will then query the data using MotherDuck.

For this tutorial, you will build a landing page of an e-commerce website. The page will list the products available for sale. A user can click on the view button to view the product details or click on the add to cart button to add the product to their cart. The focus of this tutorial is to show how to ingest the data to R2 and query it using MotherDuck. Hence, the landing page will be a simple HTML page with no functionality.

Expand Down Expand Up @@ -49,7 +49,7 @@ Create a new Worker project by running the following commands:
product="workers"
params={{
category: "hello-world",
type: "Hello World Worker",
type: "Worker + Assets",
lang: "TypeScript",
}}
/>
Expand All @@ -60,16 +60,9 @@ Navigate to the `e-commerce-pipelines` directory:
cd e-commerce-pipelines
```

## 2. Create the front-end
## 2. Update the front-end

Using Static Assets, you can serve the frontend of your application from your Worker. To use Static Assets, you need to add the required bindings to your `wrangler.toml` file.

```toml
[assets]
directory = "public"
```

Next, create a `public` directory and add an `index.html` file. The `index.html` file should contain the following HTML code:
Using Static Assets, you can serve the frontend of your application from your Worker. The above step creates a new Worker project with a default `public/index.html` file. Update the `public/index.html` file with the following HTML code:

<details>
<summary>Select to view the HTML code</summary>
Expand Down Expand Up @@ -309,19 +302,35 @@ You need to create a new pipeline and connect it to your R2 bucket.
Create a new pipeline `clickstream-pipeline` using the [Wrangler CLI](/workers/wrangler/):

```sh
npx wrangler pipelines create clickstream-pipeline --r2 <BUCKET_NAME>
npx wrangler pipelines create clickstream-pipeline --r2-bucket <BUCKET_NAME>
```

Replace `<BUCKET_NAME>` with the name of your R2 bucket.

When you run the command, you will be prompted to authorize Cloudflare Workers Pipelines to create R2 API tokens on your behalf. These tokens are required by your Pipeline. Your Pipeline uses these tokens when loading data into your bucket. You can approve the request through the browser link which will open automatically.
When you run the command, you will be prompted to authorize Pipelines to create R2 API tokens on your behalf. These tokens are required by your Pipeline. Your Pipeline uses these tokens when loading data into your bucket. You can approve the request through the browser link which will open automatically.

```output
🌀 Authorizing R2 bucket <BUCKET_NAME>
🌀 Creating pipeline named "clickstream-pipeline"
✅ Successfully created pipeline "clickstream-pipeline" with id <PIPELINES_ID>
You can now send data to your pipeline with:
curl "https://<PIPELINES_ID>.pipelines.cloudflare.com" -d '[{"foo": "bar"}]'
🌀 Authorizing R2 bucket "<BUCKET_NAME>"
Opening a link in your default browser: https://oauth.pipelines.cloudflare.com/oauth/login?accountId=<ACCOUNT_ID>&bucketName=<BUCKET_NAME>&pipelineName=clickstream-pipeline
🌀 Checking access to R2 bucket "<BUCKET_NAME>"
🌀 Creating Pipeline named "clickstream-pipeline"
✅ Successfully created Pipeline "clickstream-pipeline" with id <PIPELINE_ID>
🎉 You can now send data to your Pipeline!

To start interacting with this Pipeline from a Worker, open your Worker’s config file and add the following binding configuration:

{
"pipelines": [
{
"pipeline": "clickstream-pipeline",
"binding": "PIPELINE"
}
]
}

Send data to your Pipeline's HTTP endpoint:

curl "https://<PIPELINE_ID>.pipelines.cloudflare.com" -d '[{"foo": "bar"}]'
```

## 5. Send clickstream data to your pipeline
Expand All @@ -336,7 +345,7 @@ import { WranglerConfig } from "~/components";

```toml
[[pipelines]]
binding = "MY_PIPELINE"
binding = "PIPELINE"
pipeline = "clickstream-pipeline"
```

Expand All @@ -346,7 +355,7 @@ Next, update the type in the `worker-configuration.d.ts` file. Add the following

```ts title="worker-configuration.d.ts"
interface Env {
MY_PIPELINE: Pipeline;
PIPELINE: Pipeline;
}
```

Expand All @@ -360,7 +369,7 @@ export default {
if (pathname === "/api/clickstream" && method === "POST") {
const body = (await request.json()) as { data: any };
try {
await env.MY_PIPELINE.send([body.data]);
await env.PIPELINE.send([body.data]);
return new Response("OK", { status: 200 });
} catch (error) {
console.error(error);
Expand Down Expand Up @@ -466,4 +475,4 @@ This project serves as a foundation for building scalable, data-driven applicati

For your next steps, consider exploring more advanced querying techniques with MotherDuck, implementing real-time analytics, or integrating additional Cloudflare services to further optimize your application's performance and security.

You can find the source code of the application in the [GitHub repository](https://github.com/harshil1712/e-commerce-clickstream).
You can find the source code of the application in the [GitHub repository](https://github.com/harshil1712/cf-pipelines-bindings-demo).
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
updated: 2025-03-03
updated: 2025-03-31
difficulty: Intermediate
content_type: 📝 Tutorial
pcx_content_type: tutorial
Expand Down Expand Up @@ -48,7 +48,7 @@ Create a new Worker project by running the following commands:
product="workers"
params={{
category: "hello-world",
type: "Hello World Worker",
type: "Worker + Assets",
lang: "TypeScript",
}}
/>
Expand All @@ -59,20 +59,9 @@ Navigate to the `e-commerce-pipelines-client-side` directory:
cd e-commerce-pipelines-client-side
```

## 2. Create the front-end
## 2. Update the front-end

Using Static Assets, you can serve the frontend of your application from your Worker. To use Static Assets, you need to add the required bindings to your `wrangler.toml` file.

<WranglerConfig>

```toml
[assets]
directory = "public"
```

</WranglerConfig>

Next, create a `public` directory and add an `index.html` file. The `index.html` file should contain the following HTML code:
Using Static Assets, you can serve the frontend of your application from your Worker. The above step creates a new Worker project with a default `public/index.html` file. Update the `public/index.html` file with the following HTML code:

<details>
<summary>Select to view the HTML code</summary>
Expand Down