3
3
import os
4
4
import apache_beam as beam
5
5
from apache_beam .options .pipeline_options import PipelineOptions
6
+ from apache_beam .transforms .window import FixedWindows
6
7
import requests
7
- from typing import Dict , Any
8
+ from typing import Dict , Any , List
9
+ from datetime import timedelta
8
10
9
11
class PubSubToBetterStack (beam .DoFn ):
10
- def __init__ (self , source_token : str , ingesting_host : str ):
12
+ def __init__ (self , source_token : str , ingesting_host : str , batch_size : int ):
11
13
self .source_token = source_token
12
14
self .ingesting_url = ingesting_host if '://' in ingesting_host else f'https://{ ingesting_host } '
15
+ self .batch_size = batch_size
13
16
self .headers = {
14
17
'Authorization' : f'Bearer { source_token } ' ,
15
18
'Content-Type' : 'application/json'
16
19
}
20
+ self .batch = []
17
21
18
22
def process (self , element : bytes ) -> None :
19
23
try :
20
24
# Parse the Pub/Sub data
21
25
data = json .loads (element .decode ('utf-8' ))
22
-
26
+
23
27
# Rename timestamp key to dt to be understood by Better Stack
24
28
if 'timestamp' in data :
25
29
data ['dt' ] = data .pop ('timestamp' )
30
+
31
+ self .batch .append (data )
32
+
33
+ # If we've reached the batch size, send the batch
34
+ if len (self .batch ) >= self .batch_size :
35
+ self ._send_batch ()
36
+
37
+ except Exception as e :
38
+ # Log the error but don't fail the pipeline
39
+ print (f"Error processing message: { str (e )} " )
40
+
41
+ def finish_bundle (self ):
42
+ # Send any remaining messages in the batch
43
+ if self .batch :
44
+ self ._send_batch ()
26
45
27
- # Send to Better Stack
46
+ def _send_batch (self ):
47
+ try :
48
+ # Send batch to Better Stack
28
49
response = requests .post (
29
50
self .ingesting_url ,
30
51
headers = self .headers ,
31
- json = data
52
+ json = self . batch
32
53
)
33
54
34
55
if response .status_code != 202 :
35
56
raise Exception (f"Failed to send to Better Stack: { response .text } " )
36
57
58
+ # Clear the batch after successful send
59
+ self .batch = []
60
+
37
61
except Exception as e :
38
62
# Log the error but don't fail the pipeline
39
- print (f"Error processing message : { str (e )} " )
63
+ print (f"Error sending batch to Better Stack : { str (e )} " )
40
64
41
65
def run (argv = None ):
42
66
parser = argparse .ArgumentParser ()
@@ -55,6 +79,18 @@ def run(argv=None):
55
79
required = True ,
56
80
help = 'The ingesting host of your telemetry source in Better Stack'
57
81
)
82
+ parser .add_argument (
83
+ '--batch_size' ,
84
+ default = 100 ,
85
+ type = int ,
86
+ help = 'Number of messages to batch before sending to Better Stack'
87
+ )
88
+ parser .add_argument (
89
+ '--window_size' ,
90
+ default = 10 ,
91
+ type = int ,
92
+ help = 'Window size in seconds for batching messages'
93
+ )
58
94
known_args , pipeline_args = parser .parse_known_args (argv )
59
95
60
96
pipeline_options = PipelineOptions (
@@ -68,13 +104,17 @@ def run(argv=None):
68
104
| 'Read from Pub/Sub' >> beam .io .ReadFromPubSub (
69
105
subscription = known_args .input_subscription
70
106
)
107
+ | 'Window into fixed windows' >> beam .WindowInto (
108
+ FixedWindows (known_args .window_size )
109
+ )
71
110
| 'Send to Better Stack' >> beam .ParDo (
72
111
PubSubToBetterStack (
73
112
known_args .better_stack_source_token ,
74
- known_args .better_stack_ingesting_host
113
+ known_args .better_stack_ingesting_host ,
114
+ known_args .batch_size
75
115
)
76
116
)
77
117
)
78
118
79
119
if __name__ == '__main__' :
80
- run ()
120
+ run ()
0 commit comments