Skip to content

Commit b45f5dc

Browse files
committed
feat: Orchestrator - Log the queue item processing times
1 parent ea550a2 commit b45f5dc

File tree

1 file changed

+13
-3
lines changed

1 file changed

+13
-3
lines changed

cloud_pipelines_backend/orchestrator_sql.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ def internal_process_queued_executions_queue(self, session: orm.Session):
9494
queued_execution = session.scalar(query)
9595
if queued_execution:
9696
self._queued_executions_queue_idle = False
97+
start_timestamp = time.monotonic_ns()
9798
_logger.info(f"Before processing {queued_execution.id=}")
9899
try:
99100
self.internal_process_one_queued_execution(
@@ -107,7 +108,11 @@ def internal_process_queued_executions_queue(self, session: orm.Session):
107108
)
108109
record_system_error_exception(execution=queued_execution, exception=ex)
109110
session.commit()
110-
_logger.info(f"After processing {queued_execution.id=}")
111+
finally:
112+
duration_ms = int((time.monotonic_ns() - start_timestamp) / 1_000_000)
113+
_logger.info(
114+
f"After processing {queued_execution.id=}. Duration={duration_ms}ms"
115+
)
111116
return True
112117
else:
113118
if not self._queued_executions_queue_idle:
@@ -132,12 +137,12 @@ def internal_process_running_executions_queue(self, session: orm.Session):
132137
running_container_execution = session.scalar(query)
133138
if running_container_execution:
134139
self._running_executions_queue_idle = False
140+
start_timestamp = time.monotonic_ns()
141+
_logger.info(f"Before processing {running_container_execution.id=}")
135142
try:
136-
_logger.info(f"Before processing {running_container_execution.id=}")
137143
self.internal_process_one_running_execution(
138144
session=session, container_execution=running_container_execution
139145
)
140-
_logger.info(f"After processing {running_container_execution.id=}")
141146
except Exception as ex:
142147
_logger.exception(f"Error processing {running_container_execution.id=}")
143148
session.rollback()
@@ -163,6 +168,11 @@ def internal_process_running_executions_queue(self, session: orm.Session):
163168
session=session, execution=execution_node
164169
)
165170
session.commit()
171+
finally:
172+
duration_ms = int((time.monotonic_ns() - start_timestamp) / 1_000_000)
173+
_logger.info(
174+
f"After processing {running_container_execution.id=}. Duration={duration_ms}ms"
175+
)
166176
return True
167177
else:
168178
if not self._running_executions_queue_idle:

0 commit comments

Comments
 (0)