Skip to content

Commit 41f55db

Browse files
committed
Working, simplified template
1 parent a340d61 commit 41f55db

File tree

5 files changed

+51
-101
lines changed

5 files changed

+51
-101
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@ COPY pipeline.py .
1111
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
1212
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/pipeline.py"
1313

14-
RUN pip install -U -r ./requirements.txt
14+
RUN pip install -U -r ./requirements.txt

README.md

Lines changed: 27 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,13 @@
1-
# GCP Dataflow PubSub to Better Stack
1+
# Google Cloud Pub/Sub to Better Stack
22

3-
A Google Cloud Dataflow Flex template that reads messages from PubSub and sends them to Better Stack.
4-
5-
## Overview
6-
7-
This template provides a scalable solution for streaming data from Google Cloud PubSub to Better Stack. It uses Apache Beam's Python SDK and can be deployed as a Dataflow Flex template.
8-
9-
## Prerequisites
10-
11-
- Google Cloud Platform account with Dataflow and PubSub enabled
12-
- Better Stack account with a source token
13-
- Docker installed (for building the template)
14-
- Google Cloud SDK installed
15-
16-
## Environment Variables
17-
18-
The template requires two environment variables:
19-
20-
- `BETTER_STACK_SOURCE_TOKEN`: Your Better Stack source token
21-
- `BETTER_STACK_INGESTING_HOST`: The Better Stack ingest host URL
3+
A Dataflow Flex template that reads messages from Pub/Sub and sends them to Better Stack Telemetry.
224

235
## Building and Deploying the Template
246

257
1. Clone this repository:
268
```bash
27-
git clone https://github.com/your-org/gcp-dataflow-pubsub-to-telemetry.git
28-
cd gcp-dataflow-pubsub-to-telemetry
9+
git clone https://github.com/your-org/gcp-dataflow-pubsub-to-betterstack.git
10+
cd gcp-dataflow-pubsub-to-betterstack
2911
```
3012

3113
2. Choose Google Cloud Platform project to use
@@ -37,75 +19,43 @@ gcloud projects list
3719
gcloud config set project PROJECT_ID
3820
```
3921

40-
3. Build the Docker image:
22+
3. Choose Google Cloud Platform region to use
4123
```bash
42-
docker build -t gcr.io/$(gcloud config get-value project)/pubsub-to-betterstack .
24+
# See currently selected region
25+
gcloud config get-value compute/region
26+
# You can switch to a different region using
27+
gcloud app regions list
28+
gcloud config set compute/region PROJECT_ID
4329
```
4430

45-
4. Push the image to Google Container Registry:
46-
```bash
47-
gcloud auth configure-docker
48-
docker push gcr.io/$(gcloud config get-value project)/pubsub-to-betterstack
49-
```
50-
51-
5. Create a Cloud Storage bucket for the template (if you don't have one):
31+
4. Create a Cloud Storage bucket for the template (if you don't have one):
5232
```bash
5333
BUCKET_NAME="dataflow-templates-$(gcloud config get-value project)"
54-
gsutil mb -l us-central1 gs://${BUCKET_NAME}
34+
gsutil mb -l $(gcloud config get-value compute/region) gs://${BUCKET_NAME}
5535
```
5636

57-
6. Update the template specification with your project ID:
37+
5. Set parameters based on your Google Cloud Pub/Sub Subscription and Better Stack Telemetry source
5838
```bash
59-
sed -i "s/PROJECT_ID/$(gcloud config get-value project)/g" pubsub-to-betterstack.json
39+
INPUT_SUBSCRIPTION=projects/$(gcloud config get-value project)/subscriptions/<your-pubsub-subscription-name>
40+
SOURCE_TOKEN=<your-better-stack-source-token>
41+
INGESTING_HOST=<your-better-stack-ingesting-host>
6042
```
6143

62-
7. Upload the template specification to Cloud Storage:
63-
```bash
64-
gsutil cp pubsub-to-betterstack.json gs://${BUCKET_NAME}/templates/
65-
```
66-
67-
8. Deploy the template using gcloud CLI:
44+
6. Build, deploy and run the template
6845
```bash
46+
gcloud builds submit --tag "gcr.io/$(gcloud config get-value project)/pubsub-to-betterstack" .
47+
gcloud dataflow flex-template build gs://$BUCKET_NAME/pubsub-to-betterstack.json \
48+
--image "gcr.io/$(gcloud config get-value project)/pubsub-to-betterstack" \
49+
--sdk-language "PYTHON" \
50+
--metadata-file "metadata.json"
6951
gcloud dataflow flex-template run "pubsub-to-betterstack-$(date +%Y%m%d-%H%M%S)" \
70-
--template-file-gcs-location=gs://${BUCKET_NAME}/templates/pubsub-to-betterstack.json \
71-
--parameters input_subscription=projects/$(gcloud config get-value project)/subscriptions/YOUR_SUBSCRIPTION \
72-
--parameters better_stack_source_token=YOUR_SOURCE_TOKEN \
73-
--parameters better_stack_ingesting_host=YOUR_INGESTING_HOST \
74-
--region=$(gcloud config get-value compute/region) \
75-
--additional-experiments=use_runner_v2
76-
```
77-
78-
### Using Google Cloud Console
79-
80-
1. Go to the Dataflow section in the Google Cloud Console
81-
2. Click "Create Job from Template"
82-
3. Select "Custom Template"
83-
4. Enter the path to your template in Cloud Storage: `gs://${BUCKET_NAME}/templates/pubsub-to-betterstack.json`
84-
5. Fill in the required parameters:
85-
- `input_subscription`: Your PubSub subscription to read from
86-
- `better_stack_source_token`: Your Better Stack source token
87-
- `better_stack_ingesting_host`: The Better Stack ingest host URL
88-
6. Click "Run Job"
89-
90-
## Message Format
91-
92-
The template expects messages in JSON format. Each message will be sent to Better Stack as-is. For example:
93-
94-
```json
95-
{
96-
"message": "Hello from PubSub",
97-
"timestamp": "2024-02-11T12:00:00Z",
98-
"severity": "INFO"
99-
}
52+
--template-file-gcs-location=gs://$BUCKET_NAME/pubsub-to-betterstack.json \
53+
--parameters input_subscription=$INPUT_SUBSCRIPTION \
54+
--parameters better_stack_source_token=$SOURCE_TOKEN \
55+
--parameters better_stack_ingesting_host=$INGESTING_HOST \
56+
--region=$(gcloud config get-value compute/region)
10057
```
10158

102-
## Error Handling
103-
104-
The template includes error handling that:
105-
- Logs errors but continues processing
106-
- Retries failed requests to Better Stack
107-
- Maintains message ordering
108-
10959
## License
11060

11161
ISC License. See [LICENSE.md](LICENSE.md) for details.
Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,25 @@
11
{
22
"name": "pubsub-to-betterstack",
3-
"description": "Dataflow template to stream data from PubSub to Better Stack",
3+
"description": "Dataflow template to stream data from Pub/Sub to Better Stack",
44
"parameters": [
55
{
66
"name": "input_subscription",
7-
"label": "Input PubSub subscription",
8-
"helpText": "PubSub subscription to read from",
7+
"label": "Input Pub/Sub subscription",
8+
"helpText": "The name of the Pub/Sub subscription to read from",
99
"isOptional": false,
1010
"regexes": ["^projects/[^/]+/subscriptions/[^/]+$"]
1111
},
1212
{
1313
"name": "better_stack_source_token",
1414
"label": "Better Stack Source Token",
15-
"helpText": "Your Better Stack source token",
15+
"helpText": "The source token of your telemetry source in Better Stack",
1616
"isOptional": false
1717
},
1818
{
1919
"name": "better_stack_ingesting_host",
2020
"label": "Better Stack Ingesting Host",
21-
"helpText": "The Better Stack ingest host URL",
21+
"helpText": "The ingesting host of your telemetry source in Better Stack",
2222
"isOptional": false
2323
}
24-
],
25-
"image": "gcr.io/PROJECT_ID/pubsub-to-betterstack:latest",
26-
"sdkInfo": {
27-
"language": "PYTHON"
28-
}
29-
}
24+
]
25+
}

pipeline.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,28 @@
77
from typing import Dict, Any
88

99
class PubSubToBetterStack(beam.DoFn):
10-
def __init__(self, source_token: str, ingest_host: str):
10+
def __init__(self, source_token: str, ingesting_host: str):
1111
self.source_token = source_token
12-
self.ingest_host = ingest_host
12+
self.ingesting_url = ingesting_host if '://' in ingesting_host else f'https://{ingesting_host}'
1313
self.headers = {
1414
'Authorization': f'Bearer {source_token}',
1515
'Content-Type': 'application/json'
1616
}
1717

1818
def process(self, element: bytes) -> None:
1919
try:
20-
# Parse the PubSub message
21-
message = json.loads(element.decode('utf-8'))
22-
20+
# Parse the Pub/Sub data
21+
data = json.loads(element.decode('utf-8'))
22+
23+
# Rename timestamp key to dt to be understood by Better Stack
24+
if 'timestamp' in data:
25+
data['dt'] = data.pop('timestamp')
26+
2327
# Send to Better Stack
2428
response = requests.post(
25-
self.ingest_host,
29+
self.ingesting_url,
2630
headers=self.headers,
27-
json=message
31+
json=data
2832
)
2933

3034
if response.status_code != 202:
@@ -39,17 +43,17 @@ def run(argv=None):
3943
parser.add_argument(
4044
'--input_subscription',
4145
required=True,
42-
help='Input PubSub subscription to read from'
46+
help='The name of the Pub/Sub subscription to read from'
4347
)
4448
parser.add_argument(
4549
'--better_stack_source_token',
4650
required=True,
47-
help='Better Stack Telemetry source token'
51+
help='The source token of your telemetry source in Better Stack'
4852
)
4953
parser.add_argument(
5054
'--better_stack_ingesting_host',
5155
required=True,
52-
help='Better Stack Telemetry source ingesting host'
56+
help='The ingesting host of your telemetry source in Better Stack'
5357
)
5458
known_args, pipeline_args = parser.parse_known_args(argv)
5559

@@ -61,7 +65,7 @@ def run(argv=None):
6165
with beam.Pipeline(options=pipeline_options) as p:
6266
(
6367
p
64-
| 'Read from PubSub' >> beam.io.ReadFromPubSub(
68+
| 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(
6569
subscription=known_args.input_subscription
6670
)
6771
| 'Send to Better Stack' >> beam.ParDo(

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@
88
'requests>=2.31.0',
99
],
1010
packages=setuptools.find_packages(),
11-
)
11+
)

0 commit comments

Comments
 (0)