@@ -139,10 +139,10 @@ public void reloadTrafficCalcConfig() throws MalformedTrafficCalcConfigException
139139 this .evaluationWindowSeconds , this .baselineTraffic , this .thresholdMultiplier , ranges .size ());
140140
141141 } catch (MalformedTrafficCalcConfigException e ) {
142- LOGGER .warn ( "failed to load traffic calc config, config is malformed: {}" , trafficCalcConfigPath , e );
142+ LOGGER .error ( "circuit_breaker_config_error: config is malformed, configPath= {}" , trafficCalcConfigPath , e );
143143 throw e ;
144144 } catch (Exception e ) {
145- LOGGER .warn ( "failed to load traffic calc config, config is malformed or missing: {}" , trafficCalcConfigPath , e );
145+ LOGGER .error ( "circuit_breaker_config_error: config is malformed or missing, configPath= {}" , trafficCalcConfigPath , e );
146146 throw new MalformedTrafficCalcConfigException ("failed to load traffic calc config: " + e .getMessage ());
147147 }
148148 }
@@ -163,12 +163,12 @@ List<List<Long>> parseAllowlistRanges(JsonObject config) throws MalformedTraffic
163163 long end = rangeArray .getLong (1 );
164164
165165 if (start >= end ) {
166- LOGGER .error ("invalid allowlist range: start must be less than end: [{}, {}]" , start , end );
166+ LOGGER .error ("circuit_breaker_config_error: allowlist range start must be less than end, range= [{}, {}]" , start , end );
167167 throw new MalformedTrafficCalcConfigException ("invalid allowlist range at index " + i + ": start must be less than end" );
168168 }
169169
170170 if (end - start > 86400 ) {
171- LOGGER .error ("invalid allowlist range: range must be less than 24 hours: [{}, {}]" , start , end );
171+ LOGGER .error ("circuit_breaker_config_error: allowlist range must be less than 24 hours, range= [{}, {}]" , start , end );
172172 throw new MalformedTrafficCalcConfigException ("invalid allowlist range at index " + i + ": range must be less than 24 hours" );
173173 }
174174
@@ -186,7 +186,7 @@ List<List<Long>> parseAllowlistRanges(JsonObject config) throws MalformedTraffic
186186 long currentEnd = ranges .get (i ).get (1 );
187187 long nextStart = ranges .get (i + 1 ).get (0 );
188188 if (currentEnd >= nextStart ) {
189- LOGGER .error ("overlapping allowlist ranges detected: [{}, {}] overlaps with [{}, {}]" ,
189+ LOGGER .error ("circuit_breaker_config_error: overlapping allowlist ranges, range= [{}, {}] overlaps with range= [{}, {}]" ,
190190 ranges .get (i ).get (0 ), currentEnd , nextStart , ranges .get (i + 1 ).get (1 ));
191191 throw new MalformedTrafficCalcConfigException (
192192 "overlapping allowlist ranges detected at indices " + i + " and " + (i + 1 ));
@@ -196,7 +196,7 @@ List<List<Long>> parseAllowlistRanges(JsonObject config) throws MalformedTraffic
196196 } catch (MalformedTrafficCalcConfigException e ) {
197197 throw e ;
198198 } catch (Exception e ) {
199- LOGGER .error ("failed to parse allowlist ranges" , e );
199+ LOGGER .error ("circuit_breaker_config_error: failed to parse allowlist ranges" , e );
200200 throw new MalformedTrafficCalcConfigException ("failed to parse allowlist ranges: " + e .getMessage ());
201201 }
202202
@@ -227,8 +227,8 @@ public TrafficStatus calculateStatus(List<Message> sqsMessages, SqsMessageOperat
227227 List <String > deltaS3Paths = listDeltaFiles ();
228228
229229 if (deltaS3Paths .isEmpty ()) {
230- LOGGER .warn ( " no delta files found in s3 with prefix: {}" , s3DeltaPrefix );
231- return TrafficStatus . DEFAULT ;
230+ LOGGER .error ( "s3_error: no delta files found in s3 at prefix= {}" , s3DeltaPrefix );
231+ throw new RuntimeException ( "no delta files found in s3 at prefix=" + s3DeltaPrefix ) ;
232232 }
233233
234234 // Find newest delta file timestamp for delta traffic window
@@ -326,8 +326,8 @@ public TrafficStatus calculateStatus(List<Message> sqsMessages, SqsMessageOperat
326326 return status ;
327327
328328 } catch (Exception e ) {
329- LOGGER .error ("error calculating traffic status" , e );
330- return TrafficStatus . DEFAULT ;
329+ LOGGER .error ("delta_job_failed: error calculating traffic status" , e );
330+ throw new RuntimeException ( "error calculating traffic status" , e ) ;
331331 }
332332 }
333333
@@ -345,7 +345,7 @@ private long findNewestDeltaTimestamp(List<String> deltaS3Paths) throws IOExcept
345345 List <Long > timestamps = getTimestampsFromFile (newestDeltaPath );
346346
347347 if (timestamps .isEmpty ()) {
348- LOGGER .warn ( " newest delta file has no timestamps: {}" , newestDeltaPath );
348+ LOGGER .error ( "s3_error: newest delta file has no timestamps, path= {}" , newestDeltaPath );
349349 return System .currentTimeMillis () / 1000 ;
350350 }
351351
@@ -372,7 +372,7 @@ private List<String> listDeltaFiles() {
372372 return deltaFiles ;
373373
374374 } catch (Exception e ) {
375- LOGGER .error ("failed to list delta files from s3 with prefix: {}" , s3DeltaPrefix , e );
375+ LOGGER .error ("s3_error: failed to list delta files at prefix= {}" , s3DeltaPrefix , e );
376376 return Collections .emptyList ();
377377 }
378378 }
@@ -429,7 +429,7 @@ private List<Long> readTimestampsFromS3(String s3Path) throws IOException {
429429
430430 return timestamps ;
431431 } catch (Exception e ) {
432- LOGGER .error ("failed to read delta file from s3: {}" , s3Path , e );
432+ LOGGER .error ("s3_error: failed to read delta file at path= {}" , s3Path , e );
433433 throw new IOException ("failed to read delta file from s3: " + s3Path , e );
434434 }
435435 }
@@ -512,7 +512,7 @@ private Long extractTimestampFromMessage(Message msg) {
512512 try {
513513 return Long .parseLong (sentTimestamp ) / 1000 ; // Convert ms to seconds
514514 } catch (NumberFormatException e ) {
515- LOGGER .warn ( " invalid sentTimestamp: {}" , sentTimestamp );
515+ LOGGER .error ( "sqs_error: invalid sentTimestamp, messageId={}, sentTimestamp={}" , msg . messageId () , sentTimestamp );
516516 }
517517 }
518518
@@ -590,23 +590,38 @@ private void evictOldCacheEntries(long cutoffTimestamp) {
590590 }
591591
592592 /**
593- * Determine traffic status based on current vs past counts
593+ * Determine traffic status based on current vs baseline traffic.
594+ * Logs warnings at 50%, 75%, and 90% of the circuit breaker threshold.
594595 */
595596 TrafficStatus determineStatus (int sumCurrent , int baselineTraffic ) {
596597 if (baselineTraffic == 0 || thresholdMultiplier == 0 ) {
597- // Avoid division by zero - if no baseline traffic, return DEFAULT status
598- LOGGER .warn ("baselineTraffic is 0 or thresholdMultiplier is 0, returning default status" );
599- return TrafficStatus .DEFAULT ;
598+ LOGGER .error ("circuit_breaker_config_error: baselineTraffic is 0 or thresholdMultiplier is 0" );
599+ throw new RuntimeException ("invalid circuit breaker config: baselineTraffic=" + baselineTraffic + ", thresholdMultiplier=" + thresholdMultiplier );
600600 }
601601
602- if (sumCurrent >= thresholdMultiplier * baselineTraffic ) {
603- LOGGER .error ("delayed_processing threshold breached: sumCurrent={}, thresholdMultiplier={}, baselineTraffic={}" ,
604- sumCurrent , thresholdMultiplier , baselineTraffic );
602+ int threshold = thresholdMultiplier * baselineTraffic ;
603+ double thresholdPercent = (double ) sumCurrent / threshold * 100 ;
604+
605+ // Log warnings at increasing thresholds before circuit breaker triggers
606+ if (thresholdPercent >= 90.0 ) {
607+ LOGGER .warn ("high_message_volume: 90% of threshold reached, sumCurrent={}, threshold={} ({}x{}), thresholdPercent={}%" ,
608+ sumCurrent , threshold , thresholdMultiplier , baselineTraffic , String .format ("%.1f" , thresholdPercent ));
609+ } else if (thresholdPercent >= 75.0 ) {
610+ LOGGER .warn ("high_message_volume: 75% of threshold reached, sumCurrent={}, threshold={} ({}x{}), thresholdPercent={}%" ,
611+ sumCurrent , threshold , thresholdMultiplier , baselineTraffic , String .format ("%.1f" , thresholdPercent ));
612+ } else if (thresholdPercent >= 50.0 ) {
613+ LOGGER .warn ("high_message_volume: 50% of threshold reached, sumCurrent={}, threshold={} ({}x{}), thresholdPercent={}%" ,
614+ sumCurrent , threshold , thresholdMultiplier , baselineTraffic , String .format ("%.1f" , thresholdPercent ));
615+ }
616+
617+ if (sumCurrent >= threshold ) {
618+ LOGGER .error ("circuit_breaker_triggered: traffic threshold breached, sumCurrent={}, threshold={} ({}x{})" ,
619+ sumCurrent , threshold , thresholdMultiplier , baselineTraffic );
605620 return TrafficStatus .DELAYED_PROCESSING ;
606621 }
607622
608- LOGGER .info ("traffic within normal range: sumCurrent={}, thresholdMultiplier ={}, baselineTraffic ={}" ,
609- sumCurrent , thresholdMultiplier , baselineTraffic );
623+ LOGGER .info ("traffic within normal range: sumCurrent={}, threshold ={} ({}x{}), thresholdPercent ={}% " ,
624+ sumCurrent , threshold , thresholdMultiplier , baselineTraffic , String . format ( "%.1f" , thresholdPercent ) );
610625 return TrafficStatus .DEFAULT ;
611626 }
612627
0 commit comments