Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 237 additions & 0 deletions alchemy-web/src/content/docs/guides/cloudflare-r2-notifications.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
---
title: R2 Bucket Notifications
description: Learn how to process R2 bucket events with Cloudflare Queues and Workers using Alchemy.
sidebar:
order: 1.5
---

import { Steps } from '@astrojs/starlight/components';

This guide explains how to set up event notifications for R2 buckets and process them with a Worker via a Cloudflare Queue.

:::note
This is a step-by-step guide to set up R2 bucket notifications. For a complete API reference see the [R2BucketNotification Provider](/providers/cloudflare/r2-bucket-notification).
:::

<Steps>

1. **Create an R2 Bucket and Queue**

Create an R2 bucket and a typed Queue to receive notification messages:

```ts
import { R2Bucket, Queue, R2BucketNotificationMessage } from "alchemy/cloudflare";

export const bucket = await R2Bucket("uploads");

export const queue = await Queue<R2BucketNotificationMessage>("upload-events");
```

2. **Create the Notification Rule**

Connect the bucket to the queue with an R2BucketNotification:

```ts
import { R2BucketNotification } from "alchemy/cloudflare";

await R2BucketNotification("upload-notifications", {
bucket,
queue,
eventTypes: ["object-create"],
});
```

3. **Create a Worker to Process Events**

Create a Worker that consumes messages from the queue:

```ts
import { Worker } from "alchemy/cloudflare";

export const worker = await Worker("processor", {
entrypoint: "./src/processor.ts",
bindings: {
BUCKET: bucket,
},
eventSources: [queue],
});
```

4. **Implement the Event Handler**

Process bucket events in your Worker:

```ts
// src/processor.ts
import type { queue, worker } from "../alchemy.run";

export default {
async queue(batch: typeof queue.Batch, env: typeof worker.Env) {
for (const message of batch.messages) {
const event = message.body;

console.log(`Event: ${event.action} on ${event.bucket}/${event.object.key}`);

if (event.action === "PutObject" || event.action === "CompleteMultipartUpload") {
// Object was created - process it
const object = await env.BUCKET.get(event.object.key);
if (object) {
console.log(`Processing ${event.object.key} (${event.object.size} bytes)`);
// Do something with the object...
}
}

message.ack();
}
},
};
```

:::tip
Using `typeof queue.Batch` provides type safety for the `R2BucketNotificationMessage` payload.
:::

5. **(Optional) Filter Notifications**

Only receive notifications for specific objects using prefix and suffix filters:

```ts
await R2BucketNotification("pdf-uploads", {
bucket,
queue,
eventTypes: ["object-create"],
prefix: "documents/",
suffix: ".pdf",
});
```

6. **(Optional) Handle Multiple File Types**

Need to process multiple file formats? Use an array of suffixes to create rules for each:

```ts
await R2BucketNotification("audio-uploads", {
bucket,
queue,
eventTypes: ["object-create"],
prefix: "audio/",
suffix: [".mp3", ".wav", ".flac", ".aac"],
});
```

This creates separate notification rules for each suffix under the hood. You can also use an array of prefixes:

```ts
await R2BucketNotification("multi-folder-uploads", {
bucket,
queue,
eventTypes: ["object-create"],
prefix: ["uploads/", "imports/", "inbox/"],
suffix: ".json",
});
```

:::note
Either `prefix` OR `suffix` can be an array, but not both at the same time.
:::

7. **(Optional) Handle Multiple Event Types**

Listen for both create and delete events:

```ts
await R2BucketNotification("all-events", {
bucket,
queue,
eventTypes: ["object-create", "object-delete"],
});
```

Then handle different actions in your worker:

```ts
async queue(batch: typeof queue.Batch, env: typeof worker.Env) {
for (const message of batch.messages) {
const event = message.body;

switch (event.action) {
case "PutObject":
case "CompleteMultipartUpload":
case "CopyObject":
console.log(`Object created: ${event.object.key}`);
break;
case "DeleteObject":
case "LifecycleDeletion":
console.log(`Object deleted: ${event.object.key}`);
break;
}

message.ack();
}
}
```

</Steps>

## Complete Example

Here's a complete `alchemy.run.ts` file that sets up R2 notifications:

```ts
import { alchemy } from "alchemy";
import {
R2Bucket,
Queue,
R2BucketNotification,
Worker,
R2BucketNotificationMessage,
} from "alchemy/cloudflare";

const app = await alchemy("r2-notifications-demo");

export const bucket = await R2Bucket("uploads");

export const queue = await Queue<R2BucketNotificationMessage>("upload-events");

await R2BucketNotification("upload-notifications", {
bucket,
queue,
eventTypes: ["object-create"],
prefix: "incoming/",
});

export const worker = await Worker("processor", {
entrypoint: "./src/processor.ts",
bindings: {
BUCKET: bucket,
},
eventSources: [queue],
url: true,
});

console.log("Worker URL:", worker.url);

await app.finalize();
```

## Event Message Format

Each notification message contains:

| Field | Type | Description |
|-------|------|-------------|
| `account` | `string` | Cloudflare account ID |
| `action` | `string` | Action that triggered the event |
| `bucket` | `string` | Bucket name |
| `object.key` | `string` | Object key/path |
| `object.size` | `number?` | Size in bytes (not present for deletes) |
| `object.eTag` | `string?` | Entity tag (not present for deletes) |
| `eventTime` | `string` | ISO 8601 timestamp |
| `copySource` | `object?` | Source info for CopyObject events |

## Supported Actions

| Event Type | Actions |
|------------|---------|
| `object-create` | `PutObject`, `CompleteMultipartUpload`, `CopyObject` |
| `object-delete` | `DeleteObject`, `LifecycleDeletion` |
Loading
Loading