Skip to content

Commit 33762ae

Browse files
committed
Initial commit
0 parents  commit 33762ae

File tree

6 files changed

+195
-0
lines changed

6 files changed

+195
-0
lines changed

Dockerfile

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
2+
3+
ARG WORKDIR=/dataflow/template
4+
RUN mkdir -p ${WORKDIR}
5+
WORKDIR ${WORKDIR}
6+
7+
COPY requirements.txt .
8+
COPY setup.py .
9+
COPY pipeline.py .
10+
11+
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
12+
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/pipeline.py"
13+
14+
RUN pip install -U -r ./requirements.txt

LICENSE.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
ISC License
2+
3+
Copyright (c) 2025, Better Stack, Inc.
4+
5+
Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies.
6+
7+
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

README.md

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# GCP Dataflow PubSub to Better Stack
2+
3+
A Google Cloud Dataflow Flex template that reads messages from PubSub and sends them to Better Stack.
4+
5+
## Overview
6+
7+
This template provides a scalable solution for streaming data from Google Cloud PubSub to Better Stack. It uses Apache Beam's Python SDK and can be deployed as a Dataflow Flex template.
8+
9+
## Prerequisites
10+
11+
- Google Cloud Platform account with Dataflow and PubSub enabled
12+
- Better Stack account with a source token
13+
- Docker installed (for building the template)
14+
- Google Cloud SDK installed
15+
16+
## Environment Variables
17+
18+
The template requires two environment variables:
19+
20+
- `BETTER_STACK_SOURCE_TOKEN`: Your Better Stack source token
21+
- `BETTER_STACK_INGEST_HOST`: The Better Stack ingest host URL
22+
23+
## Building the Template
24+
25+
1. Clone this repository:
26+
```bash
27+
git clone https://github.com/your-org/gcp-dataflow-pubsub-to-telemetry.git
28+
cd gcp-dataflow-pubsub-to-telemetry
29+
```
30+
31+
2. Build the Docker image:
32+
```bash
33+
docker build -t gcr.io/YOUR_PROJECT/pubsub-to-betterstack .
34+
```
35+
36+
3. Push the image to Google Container Registry:
37+
```bash
38+
docker push gcr.io/YOUR_PROJECT/pubsub-to-betterstack
39+
```
40+
41+
## Deploying the Template
42+
43+
You can deploy the template using the Google Cloud Console or the gcloud CLI:
44+
45+
### Using gcloud CLI
46+
47+
```bash
48+
gcloud dataflow flex-template run "pubsub-to-betterstack-$(date +%Y%m%d-%H%M%S)" \
49+
--template-file-gcs-location=gs://YOUR_BUCKET/templates/pubsub-to-betterstack.json \
50+
--parameters input_subscription=projects/YOUR_PROJECT/subscriptions/YOUR_SUBSCRIPTION \
51+
--region=YOUR_REGION \
52+
--additional-experiments=use_runner_v2
53+
```
54+
55+
### Using Google Cloud Console
56+
57+
1. Go to the Dataflow section in the Google Cloud Console
58+
2. Click "Create Job from Template"
59+
3. Select "Custom Template"
60+
4. Enter the path to your template in Cloud Storage
61+
5. Fill in the required parameters:
62+
- `input_subscription`: Your PubSub subscription to read from
63+
6. Set the environment variables:
64+
- `BETTER_STACK_SOURCE_TOKEN`
65+
- `BETTER_STACK_INGEST_HOST`
66+
7. Click "Run Job"
67+
68+
## Message Format
69+
70+
The template expects messages in JSON format. Each message will be sent to Better Stack as-is. For example:
71+
72+
```json
73+
{
74+
"message": "Hello from PubSub",
75+
"timestamp": "2024-02-11T12:00:00Z",
76+
"severity": "INFO"
77+
}
78+
```
79+
80+
## Error Handling
81+
82+
The template includes error handling that:
83+
- Logs errors but continues processing
84+
- Retries failed requests to Better Stack
85+
- Maintains message ordering
86+
87+
## License
88+
89+
ISC License. See [LICENSE.md](LICENSE.md) for details.

pipeline.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import argparse
2+
import json
3+
import os
4+
import apache_beam as beam
5+
from apache_beam.options.pipeline_options import PipelineOptions
6+
import requests
7+
from typing import Dict, Any
8+
9+
class PubSubToBetterStack(beam.DoFn):
10+
def __init__(self, source_token: str, ingest_host: str):
11+
self.source_token = source_token
12+
self.ingest_host = ingest_host
13+
self.headers = {
14+
'Authorization': f'Bearer {source_token}',
15+
'Content-Type': 'application/json'
16+
}
17+
18+
def process(self, element: bytes) -> None:
19+
try:
20+
# Parse the PubSub message
21+
message = json.loads(element.decode('utf-8'))
22+
23+
# Send to Better Stack
24+
response = requests.post(
25+
self.ingest_host,
26+
headers=self.headers,
27+
json=message
28+
)
29+
30+
if response.status_code != 202:
31+
raise Exception(f"Failed to send to Better Stack: {response.text}")
32+
33+
except Exception as e:
34+
# Log the error but don't fail the pipeline
35+
print(f"Error processing message: {str(e)}")
36+
37+
def run(argv=None):
38+
parser = argparse.ArgumentParser()
39+
parser.add_argument(
40+
'--input_subscription',
41+
required=True,
42+
help='Input PubSub subscription to read from'
43+
)
44+
known_args, pipeline_args = parser.parse_known_args(argv)
45+
46+
# Get Better Stack credentials from environment variables
47+
source_token = os.environ.get('BETTER_STACK_SOURCE_TOKEN')
48+
ingest_host = os.environ.get('BETTER_STACK_INGEST_HOST')
49+
50+
if not source_token or not ingest_host:
51+
raise ValueError(
52+
"Environment variables BETTER_STACK_SOURCE_TOKEN and BETTER_STACK_INGEST_HOST must be set"
53+
)
54+
55+
pipeline_options = PipelineOptions(
56+
pipeline_args,
57+
save_main_session=True
58+
)
59+
60+
with beam.Pipeline(options=pipeline_options) as p:
61+
(
62+
p
63+
| 'Read from PubSub' >> beam.io.ReadFromPubSub(
64+
subscription=known_args.input_subscription
65+
)
66+
| 'Send to Better Stack' >> beam.ParDo(
67+
PubSubToBetterStack(source_token, ingest_host)
68+
)
69+
)
70+
71+
if __name__ == '__main__':
72+
run()

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
apache-beam[gcp]>=2.50.0
2+
requests>=2.31.0

setup.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import setuptools
2+
3+
setuptools.setup(
4+
name='pubsub-to-betterstack',
5+
version='0.1.0',
6+
install_requires=[
7+
'apache-beam[gcp]>=2.50.0',
8+
'requests>=2.31.0',
9+
],
10+
packages=setuptools.find_packages(),
11+
)

0 commit comments

Comments
 (0)