Skip to content

Marcinthecloud/otel-to-pipelines

Repository files navigation

OTLP to Cloudflare Pipelines

License Cloudflare Workers

A slightly vibe-coded example Cloudflare Worker that receives OpenTelemetry Protocol (OTLP) logs from the new export feature in Workers Observability and writes them to Cloudflare Pipelines for easy processing, filtering, and writing to R2 or R2 Data Catalog (Apache Iceberg tables).

Prerequisites

Quick Start

1. Clone and Install

git clone https://github.com/yourusername/otlp-to-pipelines.git
cd otlp-to-pipelines
npm install

2. Create a Pipeline

Create a new Pipelines stream to store your logs:

Pipeline Streams

npx wrangler pipelines streams create otlp_stream --schema-file pipelines-schema.json

Copy the stream ID or get it using:

npx wrangler pipelines streams list

Pipelines Sink (R2 Data Catalog example)

npx wrangler pipelines sinks create otlp_sink \
  --type r2-data-catalog \
  --bucket my-bucket \
  --namespace otlp \
  --table worker_logs \
  --catalog-token YOUR_CATALOG_TOKEN \
  --compression zstd \
  --roll-interval 60  # Write files every 60 seconds

For the catalog-token, see HERE

For info about setting up R2 Data Catalog, see HERE

Pipelines SQL

Process data from the stream and write it to the configured sink

npx wrangler pipelines create otlp_pipeline \
  --sql "INSERT INTO otlp_sink
SELECT
  to_timestamp_micros(timestamp_ns / 1000) AS timestamp_micros,
  to_timestamp_micros(observed_timestamp_ns / 1000) AS observed_timestamp_micros,
  severity_number,
  severity_text,
  body,
  trace_id,
  span_id,
  flags,
  attributes,
  resource_attributes,
  scope_name,
  scope_version,
  scope_attributes,
  dropped_attributes_count
FROM otlp_stream;"

You can do a lot with Pipelines SQL - see the reference HERE

3. Configure the Worker

Copy the example config and add your pipeline ID:

cp wrangler.toml.example wrangler.toml

Edit wrangler.toml and replace YOUR_PIPELINE_STREAM_ID_HERE with the stream ID.

4. Deploy

npx wrangler deploy

Your worker will be deployed and you'll get a URL like: https://otlp-to-pipelines.your-subdomain.workers.dev

5. Configure OTLP Source

In your Cloudflare Worker that you want to send logs from:

  1. Go to Workers & Pages → Select your worker
  2. Navigate to SettingsObservability
  3. Click Add destination under OTLP destinations
  4. Enter your worker URL (in this case, you don't need /v1/logs)

Enable logging in your worker:

Wrangler TOML:

[observability]
[observability.logs]
enabled = true
head_sampling_rate = 1
invocation_logs = true
destinations = [ "NAME_OF_DESTINATION_FROM_PRIOR_STEP" ]
persist = true

Dash

  1. Go to Workers & Pages → Select your worker
  2. Navigate to SettingsObservability
  3. Click the edit icon next to Workers Logs
  4. Click Enable and choose the destination in the Export logs to external destinations drop down
  5. Click deploy - Note that you'll need to update your Wrangler config otherwise your next deploy will override these settings.

Usage

You can use R2 SQL to filter through these logs:

npx wrangler r2 sql query "YOUR_WAREHOUSE" "SELECT severity_text, body FROM otlp.worker_logs limit 10;```

Or ANY Iceberg compatible query engine

You can check the health of the receiver with

curl https://otlp-to-pipelines.your-subdomain.workers.dev

Response:

{
  "status": "ok",
  "message": "OTLP receiver is ready",
  "version": "1.0.0",
  "endpoints": {
    "logs": "/v1/logs"
  }
}

You can also check what events are successfully being processed with: npx wrangler tail which will look like this:

❯ npx wrangler tail  

 ⛅️ wrangler 4.45.2 
─────────────────────────────────────────────
Successfully created tail, expires at 2025-10-31T00:18:39Z
Connected to otlp-to-pipelines, waiting for logs...
POST https://otlp-to-pipelines.[your_domain]].workers.dev/ - Ok @ 10/30/2025, 11:21:25 AM

About the data

The Worker takes the OTLP logs and formats it to be compatible with Cloudflare Pipelines:

Field Type Description
timestamp string ISO 8601 timestamp
timestamp_ns int64 Unix timestamp in nanoseconds
severity_number int32 Numeric severity (0-24)
severity_text string Text severity (DEBUG, INFO, WARN, ERROR, FATAL)
body string Log message
trace_id string Distributed tracing trace ID (hex)
span_id string Distributed tracing span ID (hex)
attributes json Log-level attributes
resource_attributes json Resource attributes (service.name, etc.)
scope_name string Instrumentation scope

See pipelines-schema.json for the full schema.

Local Development

npx wrangler dev

The worker will be available at http://localhost:8787

Testing

Test with the example request:

curl -X POST http://localhost:8787/v1/logs \
  -H "Content-Type: application/json" \
  -d @example-request.json

Resources

About

A sample Cloudflare Worker that acts as an OTLP receiver and sends it to Cloudflare Pipelines

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors