diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 72e51e48743da..e0f3b662b7d04 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -969,7 +969,7 @@ def _log_file_processing_stats(self, known_files: dict[str, set[DagFileInfo]]): proc = self._processors.get(file) num_dags = stat.num_dags num_errors = stat.import_errors - file_name = Path(file.rel_path).stem + file_name = normalize_name_for_stats(Path(file.rel_path).stem) processor_pid = proc.pid if proc else None processor_start_time = proc.start_time if proc else None runtime = (now - processor_start_time) if processor_start_time else None diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 40b819f146e1f..1bc6fd29c0842 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -1359,6 +1359,25 @@ def test_send_file_processing_statsd_timing( tags={"bundle_name": bundle_name, "file_name": dag_filename[:-3]}, ) + @mock.patch("airflow.dag_processing.manager.stats.gauge") + def test_log_file_processing_stats_sanitizes_last_run_seconds_ago_metric_name(self, statsd_gauge_mock): + manager = DagFileProcessorManager(max_runs=1) + dag_file = DagFileInfo( + bundle_name="testing", + rel_path=Path("test_of sprak_opertaor.py"), + bundle_path=TEST_DAGS_FOLDER, + ) + manager._file_stats[dag_file] = DagFileStat( + last_finish_time=timezone.utcnow() - timedelta(seconds=5), + ) + + manager._log_file_processing_stats({"testing": {dag_file}}) + + statsd_gauge_mock.assert_any_call( + "dag_processing.last_run.seconds_ago.test_of_sprak_opertaor", + mock.ANY, + ) + @pytest.mark.usefixtures("testing_dag_bundle") def test_refresh_dags_dir_doesnt_delete_zipped_dags( self, tmp_path, session, configure_testing_dag_bundle, test_zip_path