@@ -123,36 +123,22 @@ async def send_violation(
123123 RuntimeError:
124124 If all retry attempts are exhausted or a critical error occurs.
125125 """
126- # Ensure authentication before sending
126+ # Ensure authentication and prepare request payload
127127 access_token = await self .token_manager .get_valid_token ()
128128 if not access_token :
129129 raise RuntimeError ('Failed to obtain valid access token' )
130130
131- headers : dict [str , str ] = {
132- 'Authorization' : f"Bearer { access_token } " ,
133- }
134-
135- # Optimise image format - use JPEG to reduce file size
136- files : dict [str , tuple [str , bytes , str ]] = {
137- 'image' : ('violation.jpg' , image_bytes , 'image/jpeg' ),
138- }
139-
140- data : dict [str , str ] = {
141- 'site' : site ,
142- 'stream_name' : stream_name ,
143- }
144- if detection_time :
145- data ['detection_time' ] = detection_time .isoformat ()
146- if warnings_json :
147- data ['warnings_json' ] = warnings_json
148- if detections_json :
149- data ['detections_json' ] = detections_json
150- if cone_polygon_json :
151- data ['cone_polygon_json' ] = cone_polygon_json
152- if pole_polygon_json :
153- data ['pole_polygon_json' ] = pole_polygon_json
154-
155- upload_url : str = self .base_url + '/upload'
131+ headers , files , data , upload_url = self ._build_upload_payload (
132+ access_token = access_token ,
133+ image_bytes = image_bytes ,
134+ site = site ,
135+ stream_name = stream_name ,
136+ detection_time = detection_time ,
137+ warnings_json = warnings_json ,
138+ detections_json = detections_json ,
139+ cone_polygon_json = cone_polygon_json ,
140+ pole_polygon_json = pole_polygon_json ,
141+ )
156142
157143 # Use shared client connection pool
158144 client = await self ._get_client ()
@@ -170,46 +156,154 @@ async def send_violation(
170156 headers = headers ,
171157 )
172158 resp .raise_for_status ()
173- # Return the violation ID from the response
174159 return resp .json ().get ('violation_id' )
175160
176161 except httpx .ConnectTimeout :
177162 logging .warning (
178163 f"[send_violation] Attempt { attempt + 1 } : "
179164 'Connection timeout, retry...' ,
180165 )
181- if attempt < self .max_retries - 1 :
182- await asyncio .sleep (backoff_delay )
183- backoff_delay *= 2 # Exponential backoff
184- else :
185- raise RuntimeError (
186- '[send_violation] All retry attempts exhausted due to '
187- 'timeout' ,
188- )
166+ backoff_delay = await self ._on_timeout (attempt , backoff_delay )
189167
190168 except httpx .HTTPStatusError as exc :
191- # If 401, refresh token and retry
192- if exc .response .status_code == 401 :
193- logging .warning (
194- '[send_violation] Unauthorized. '
195- 'Attempting token refresh...' ,
196- )
197- await self .token_manager .refresh_token ()
198- # Update headers with new token
199- new_token = await self .token_manager .get_valid_token ()
200- headers ['Authorization' ] = f"Bearer { new_token } "
201-
202- if attempt < self .max_retries - 1 :
203- continue # Retry instead of recursive call
169+ if await self ._try_refresh_on_401 (exc , attempt , headers ):
170+ continue
204171 raise
205172
206173 except Exception as e :
207174 logging .error (f"[send_violation] Unexpected error: { e } " )
208- if attempt < self .max_retries - 1 :
209- await asyncio .sleep (backoff_delay )
210- backoff_delay *= 2
211- else :
212- raise
175+ backoff_delay = await self ._on_unexpected (
176+ attempt , backoff_delay , e ,
177+ )
213178
214179 # If all attempts fail, return None
215180 return None
181+
182+ def _build_upload_payload (
183+ self ,
184+ access_token : str ,
185+ image_bytes : bytes ,
186+ site : str ,
187+ stream_name : str ,
188+ detection_time : datetime | None ,
189+ warnings_json : str | None ,
190+ detections_json : str | None ,
191+ cone_polygon_json : str | None ,
192+ pole_polygon_json : str | None ,
193+ ) -> tuple [
194+ dict [str , str ],
195+ dict [str , tuple [str , bytes , str ]],
196+ dict [str , str ],
197+ str ,
198+ ]:
199+ """
200+ Build headers, files, form data, and URL for upload request.
201+
202+ Args:
203+ access_token (str): The access token for authentication.
204+ image_bytes (bytes): The image bytes to upload.
205+ site (str): The site identifier.
206+ stream_name (str): The stream name.
207+ detection_time (datetime | None): The time of detection.
208+ warnings_json (str | None): JSON string of warnings.
209+ detections_json (str | None): JSON string of detection items.
210+ cone_polygon_json (str | None): JSON string of cone polygons.
211+ pole_polygon_json (str | None): JSON string of pole polygons.
212+
213+ Returns:
214+ tuple[
215+ dict[str, str],
216+ dict[str, tuple[str, bytes, str]],
217+ dict[str, str],
218+ str,
219+ ]: The headers, files, form data, and upload URL.
220+ """
221+ headers : dict [str , str ] = {
222+ 'Authorization' : f"Bearer { access_token } " ,
223+ }
224+ files : dict [str , tuple [str , bytes , str ]] = {
225+ 'image' : ('violation.jpg' , image_bytes , 'image/jpeg' ),
226+ }
227+ data : dict [str , str ] = {
228+ 'site' : site ,
229+ 'stream_name' : stream_name ,
230+ }
231+ if detection_time :
232+ data ['detection_time' ] = detection_time .isoformat ()
233+ if warnings_json :
234+ data ['warnings_json' ] = warnings_json
235+ if detections_json :
236+ data ['detections_json' ] = detections_json
237+ if cone_polygon_json :
238+ data ['cone_polygon_json' ] = cone_polygon_json
239+ if pole_polygon_json :
240+ data ['pole_polygon_json' ] = pole_polygon_json
241+
242+ upload_url : str = self .base_url + '/upload'
243+ return headers , files , data , upload_url
244+
245+ async def _on_timeout (self , attempt : int , delay : int ) -> int :
246+ """
247+ Handle timeout backoff; raise if final attempt, else sleep and backoff.
248+
249+ Args:
250+ attempt (int): The current attempt number.
251+ delay (int): The current backoff delay.
252+
253+ Returns:
254+ int: The next backoff delay.
255+ """
256+ if attempt < self .max_retries - 1 :
257+ await asyncio .sleep (delay )
258+ return delay * 2
259+ raise RuntimeError (
260+ '[send_violation] All retry attempts exhausted due to timeout' ,
261+ )
262+
263+ async def _on_unexpected (
264+ self , attempt : int , delay : int , err : Exception ,
265+ ) -> int :
266+ """
267+ Handle unexpected error backoff; re-raise on final attempt.
268+
269+ Args:
270+ attempt (int): The current attempt number.
271+ delay (int): The current backoff delay.
272+ err (Exception): The unexpected error that occurred.
273+
274+ Returns:
275+ int: The next backoff delay.
276+ """
277+ if attempt < self .max_retries - 1 :
278+ await asyncio .sleep (delay )
279+ return delay * 2
280+ raise err
281+
282+ async def _try_refresh_on_401 (
283+ self ,
284+ exc : httpx .HTTPStatusError ,
285+ attempt : int ,
286+ headers : dict [str , str ],
287+ ) -> bool :
288+ """
289+ Attempt token refresh on 401; update headers and signal retry.
290+
291+ Args:
292+ exc (httpx.HTTPStatusError): The HTTP error that occurred.
293+ attempt (int): The current attempt number.
294+ headers (dict[str, str]): The headers to update.
295+
296+ Returns:
297+ bool:
298+ True if the caller should retry,
299+ otherwise False (caller should raise).
300+ """
301+ if exc .response is not None and exc .response .status_code == 401 :
302+ logging .warning (
303+ '[send_violation] Unauthorized. Attempting token refresh...' ,
304+ )
305+ await self .token_manager .refresh_token ()
306+ new_token = await self .token_manager .get_valid_token ()
307+ headers ['Authorization' ] = f"Bearer { new_token } "
308+ return attempt < self .max_retries - 1
309+ return False
0 commit comments