@@ -247,15 +247,27 @@ public TrafficStatus calculateStatus(List<Message> sqsMessages, SqsMessageOperat
247247
248248 // Process delta files and count records in [deltaWindowStart, newestDeltaTs]
249249 int sum = 0 ;
250+ int deltaRecordsCount = 0 ;
251+ int filesProcessed = 0 ;
252+ int cacheHits = 0 ;
253+ int cacheMisses = 0 ;
250254
251255 for (String s3Path : deltaS3Paths ) {
256+ boolean wasCached = isCached (s3Path );
257+ if (wasCached ) {
258+ cacheHits ++;
259+ } else {
260+ cacheMisses ++;
261+ }
262+
252263 List <Long > timestamps = getTimestampsFromFile (s3Path );
264+ filesProcessed ++;
253265
254266 boolean shouldStop = false ;
255267 for (long ts : timestamps ) {
256268 // Stop condition: record is older than our window
257269 if (ts < deltaWindowStart ) {
258- LOGGER .debug ("stopping delta file processing at timestamp {} (older than window start {})" , ts , deltaWindowStart );
270+ LOGGER .info ("stopping delta file processing at timestamp {} (older than window start {})" , ts , deltaWindowStart );
259271 break ;
260272 }
261273
@@ -266,6 +278,7 @@ public TrafficStatus calculateStatus(List<Message> sqsMessages, SqsMessageOperat
266278
267279 // increment sum if record is in delta window
268280 if (ts >= deltaWindowStart ) {
281+ deltaRecordsCount ++;
269282 sum ++;
270283 }
271284
@@ -276,6 +289,9 @@ public TrafficStatus calculateStatus(List<Message> sqsMessages, SqsMessageOperat
276289 }
277290 }
278291
292+ LOGGER .info ("delta files: processed={}, deltaRecords={}, cache hits={}, misses={}, cacheSize={}" ,
293+ filesProcessed , deltaRecordsCount , cacheHits , cacheMisses , deltaFileCache .size ());
294+
279295 // Count SQS messages in [oldestQueueTs, oldestQueueTs + 5m] with allowlist filtering
280296 int sqsCount = 0 ;
281297 if (sqsMessages != null && !sqsMessages .isEmpty ()) {
@@ -328,7 +344,7 @@ private long findNewestDeltaTimestamp(List<String> deltaS3Paths) throws IOExcept
328344 }
329345
330346 long newestTs = Collections .max (timestamps );
331- LOGGER .debug ("found newest delta timestamp {} from file {}" , newestTs , newestDeltaPath );
347+ LOGGER .info ("found newest delta timestamp {} from file {}" , newestTs , newestDeltaPath );
332348 return newestTs ;
333349 }
334350
@@ -341,17 +357,28 @@ private List<String> listDeltaFiles() {
341357 List <String > allFiles = cloudStorage .list (s3DeltaPrefix );
342358
343359 // Filter to only .dat delta files and sort newest to oldest
344- return allFiles .stream ()
360+ List < String > deltaFiles = allFiles .stream ()
345361 .filter (OptOutUtils ::isDeltaFile )
346362 .sorted (OptOutUtils .DeltaFilenameComparatorDescending )
347363 .collect (Collectors .toList ());
364+
365+ LOGGER .info ("listed {} delta files from s3 (prefix={})" , deltaFiles .size (), s3DeltaPrefix );
366+ return deltaFiles ;
348367
349368 } catch (Exception e ) {
350369 LOGGER .error ("failed to list delta files from s3 with prefix: {}" , s3DeltaPrefix , e );
351370 return Collections .emptyList ();
352371 }
353372 }
354373
374+ /**
375+ * Check if a delta file is already cached
376+ */
377+ private boolean isCached (String s3Path ) {
378+ String filename = s3Path .substring (s3Path .lastIndexOf ('/' ) + 1 );
379+ return deltaFileCache .containsKey (filename );
380+ }
381+
355382 /**
356383 * Get timestamps from a delta file (S3 path), using cache if available
357384 */
@@ -362,16 +389,17 @@ private List<Long> getTimestampsFromFile(String s3Path) throws IOException {
362389 // Check cache first
363390 FileRecordCache cached = deltaFileCache .get (filename );
364391 if (cached != null ) {
365- LOGGER .debug ("using cached timestamps for file: {}" , filename );
392+ LOGGER .info ("using cached timestamps for file: {}" , filename );
366393 return cached .timestamps ;
367394 }
368395
369396 // Cache miss - download from S3
370- LOGGER .debug ("downloading and reading timestamps from s3: {}" , s3Path );
397+ LOGGER .info ("downloading and reading timestamps from s3: {}" , s3Path );
371398 List <Long > timestamps = readTimestampsFromS3 (s3Path );
372399
373400 // Store in cache
374401 deltaFileCache .put (filename , new FileRecordCache (timestamps ));
402+ LOGGER .info ("cached delta file: {} ({} records)" , filename , timestamps .size ());
375403
376404 return timestamps ;
377405 }
@@ -481,7 +509,7 @@ private Long extractTimestampFromMessage(Message msg) {
481509 try {
482510 return Long .parseLong (sentTimestamp ) / 1000 ; // Convert ms to seconds
483511 } catch (NumberFormatException e ) {
484- LOGGER .debug ("invalid sentTimestamp: {}" , sentTimestamp );
512+ LOGGER .info ("invalid sentTimestamp: {}" , sentTimestamp );
485513 }
486514 }
487515
0 commit comments