Skip to content

Commit 8ffeec1

Browse files
trying to fix concurrency issue
1 parent 4cff733 commit 8ffeec1

File tree

1 file changed

+23
-11
lines changed

1 file changed

+23
-11
lines changed

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/services/background_task_periodic_heartbeat_check.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,16 @@ async def _check_service_heartbeat(
4848
modified_at
4949
< base_start_timestamp - timedelta(minutes=2)
5050
):
51-
missed_heartbeat_counter += 1
51+
new_missed_heartbeat_counter = missed_heartbeat_counter + 1
5252
if (
53-
missed_heartbeat_counter
53+
new_missed_heartbeat_counter
5454
> resource_usage_tracker_missed_heartbeat_counter_fail
5555
):
5656
# Handle unhealthy service
5757
_logger.error(
5858
"Service run id: %s is considered unhealthy and not billed. Counter %s",
5959
service_run_id,
60-
missed_heartbeat_counter,
60+
new_missed_heartbeat_counter,
6161
)
6262
await _close_unhealthy_service(
6363
db_engine, service_run_id, base_start_timestamp
@@ -66,14 +66,24 @@ async def _check_service_heartbeat(
6666
_logger.warning(
6767
"Service run id: %s missed heartbeat. Counter %s",
6868
service_run_id,
69-
missed_heartbeat_counter,
69+
new_missed_heartbeat_counter,
7070
)
71-
await service_runs_db.update_service_missed_heartbeat_counter(
72-
db_engine,
73-
service_run_id=service_run_id,
74-
last_heartbeat_at=last_heartbeat_at,
75-
missed_heartbeat_counter=missed_heartbeat_counter,
71+
# Use the original last_heartbeat_at and modified_at as keys for the update
72+
# to ensure we're updating the correct record and prevent race conditions
73+
updated_service = (
74+
await service_runs_db.update_service_missed_heartbeat_counter(
75+
db_engine,
76+
service_run_id=service_run_id,
77+
last_heartbeat_at=last_heartbeat_at,
78+
missed_heartbeat_counter=new_missed_heartbeat_counter,
79+
)
7680
)
81+
# If the update returned None, it means another process already updated this record
82+
if updated_service is None:
83+
_logger.warning(
84+
"Service run id: %s was already updated by another process",
85+
service_run_id,
86+
)
7787

7888

7989
async def _close_unhealthy_service(
@@ -164,8 +174,10 @@ async def check_running_services(app: FastAPI) -> None:
164174
base_start_timestamp = datetime.now(tz=UTC)
165175

166176
# Get all current running services (across all products)
167-
total_count: PositiveInt = await service_runs_db.total_service_runs_with_running_status_across_all_products(
168-
_db_engine
177+
total_count: PositiveInt = (
178+
await service_runs_db.total_service_runs_with_running_status_across_all_products(
179+
_db_engine
180+
)
169181
)
170182

171183
for offset in range(0, total_count, _BATCH_SIZE):

0 commit comments

Comments
 (0)