Skip to content

Commit 600cbb6

Browse files
committed
Support retry on failure
1 parent 9cedeb4 commit 600cbb6

File tree

2 files changed

+56
-22
lines changed

2 files changed

+56
-22
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ The template supports the following optional parameters:
6262

6363
- `batch_size`: Number of messages to batch before sending to Better Stack. Default: 100
6464
- `window_size`: Window size in seconds for batching messages. Default: 10
65+
- `max_retries`: Maximum number of retry attempts for failed requests. Default: 3
66+
- `initial_retry_delay`: Initial delay between retries in seconds. Default: 1
6567

6668
You can include these parameters in your Dataflow job by adding them to the run command, e.g. `gcloud dataflow flex-template run ... --parameters window_size=30`.
6769

pipeline.py

Lines changed: 54 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from apache_beam.transforms.window import FixedWindows
77
import requests
88
import gzip
9+
import time
910
from typing import Dict, Any, List
1011
from datetime import timedelta
1112

@@ -20,6 +21,8 @@ def __init__(self, source_token: str, ingesting_host: str, batch_size: int):
2021
'Content-Encoding': 'gzip'
2122
}
2223
self.batch = []
24+
self.max_retries = 3
25+
self.initial_retry_delay = 1 # seconds
2326

2427
def process(self, element: bytes) -> None:
2528
try:
@@ -34,7 +37,7 @@ def process(self, element: bytes) -> None:
3437

3538
# If we've reached the batch size, send the batch
3639
if len(self.batch) >= self.batch_size:
37-
self._send_batch()
40+
self._send_batch_with_retry()
3841

3942
except Exception as e:
4043
# Log the error but don't fail the pipeline
@@ -43,30 +46,47 @@ def process(self, element: bytes) -> None:
4346
def finish_bundle(self):
4447
# Send any remaining messages in the batch
4548
if self.batch:
46-
self._send_batch()
49+
self._send_batch_with_retry()
4750

48-
def _send_batch(self):
49-
try:
50-
# Convert batch to JSON and compress with gzip
51-
json_data = json.dumps(self.batch)
52-
compressed_data = gzip.compress(json_data.encode('utf-8'))
53-
54-
# Send compressed batch to Better Stack
55-
response = requests.post(
56-
self.ingesting_url,
57-
headers=self.headers,
58-
data=compressed_data
59-
)
60-
61-
if response.status_code != 202:
62-
raise Exception(f"Failed to send to Better Stack: {response.text}")
51+
def _send_batch_with_retry(self):
52+
retry_count = 0
53+
retry_delay = self.initial_retry_delay
54+
55+
while retry_count < self.max_retries:
56+
try:
57+
# Convert batch to JSON and compress with gzip
58+
json_data = json.dumps(self.batch)
59+
compressed_data = gzip.compress(json_data.encode('utf-8'))
6360

64-
# Clear the batch after successful send
65-
self.batch = []
61+
# Send compressed batch to Better Stack
62+
response = requests.post(
63+
self.ingesting_url,
64+
headers=self.headers,
65+
data=compressed_data
66+
)
6667

67-
except Exception as e:
68-
# Log the error but don't fail the pipeline
69-
print(f"Error sending batch to Better Stack: {str(e)}")
68+
if response.status_code == 202:
69+
# Success - clear the batch and return
70+
self.batch = []
71+
return
72+
elif response.status_code == 429: # Rate limit
73+
retry_after = int(response.headers.get('Retry-After', retry_delay))
74+
print(f"Rate limited. Retrying after {retry_after} seconds...")
75+
time.sleep(retry_after)
76+
retry_count += 1
77+
continue
78+
else:
79+
raise Exception(f"Failed to send to Better Stack: {response.text}")
80+
81+
except Exception as e:
82+
retry_count += 1
83+
if retry_count < self.max_retries:
84+
print(f"Attempt {retry_count} failed: {str(e)}. Retrying in {retry_delay} seconds...")
85+
time.sleep(retry_delay)
86+
retry_delay *= 2 # Exponential backoff
87+
else:
88+
print(f"All retry attempts failed. Last error: {str(e)}")
89+
return
7090

7191
def run(argv=None):
7292
parser = argparse.ArgumentParser()
@@ -97,6 +117,18 @@ def run(argv=None):
97117
type=int,
98118
help='Window size in seconds for batching messages'
99119
)
120+
parser.add_argument(
121+
'--max_retries',
122+
default=3,
123+
type=int,
124+
help='Maximum number of retry attempts for failed requests'
125+
)
126+
parser.add_argument(
127+
'--initial_retry_delay',
128+
default=1,
129+
type=int,
130+
help='Initial delay between retries in seconds'
131+
)
100132
known_args, pipeline_args = parser.parse_known_args(argv)
101133

102134
pipeline_options = PipelineOptions(

0 commit comments

Comments
 (0)