@@ -223,6 +223,13 @@ def rsync_result(update: RSyncerUpdate):
223223 self .rsync_processes [update .base_path ].enqueue (update .file_path )
224224
225225 self .rsync_processes [source ].subscribe (rsync_result )
226+ self .rsync_processes [source ].subscribe (
227+ partial (
228+ self ._increment_transferred_files_prometheus ,
229+ destination = destination ,
230+ source = str (source ),
231+ )
232+ )
226233 self .rsync_processes [source ].subscribe (
227234 partial (
228235 self ._increment_transferred_files ,
@@ -466,6 +473,33 @@ def _increment_file_count(
466473 }
467474 requests .post (url , json = data )
468475
476+ # Prometheus can handle higher traffic so update for every transferred file rather
477+ # than batching as we do for the Murfey database updates in _increment_transferred_files
478+ def _increment_transferred_files_prometheus (
479+ self , update : RSyncerUpdate , source : str , destination : str
480+ ):
481+ if update .outcome is TransferResult .SUCCESS :
482+ url = f"{ str (self ._environment .url .geturl ())} /visits/{ str (self ._environment .visit )} /increment_rsync_transferred_files_prometheus"
483+ data_files = (
484+ [update ]
485+ if update .file_path .suffix in self ._data_suffixes
486+ and any (
487+ substring in update .file_path .name
488+ for substring in self ._data_substrings
489+ )
490+ else []
491+ )
492+ data = {
493+ "source" : source ,
494+ "destination" : destination ,
495+ "session_id" : self .session_id ,
496+ "increment_count" : 1 ,
497+ "bytes" : update .file_size ,
498+ "increment_data_count" : len (data_files ),
499+ "data_bytes" : sum (f .file_size for f in data_files ),
500+ }
501+ requests .post (url , json = data )
502+
469503 def _increment_transferred_files (
470504 self , updates : List [RSyncerUpdate ], source : str , destination : str
471505 ):
0 commit comments