Skip to content

Commit 8063f60

Browse files
committed
Fix blocking issues from PR review
- Fix incorrect timestamp: use time.time() instead of duration in metadata - Move time imports to module level in execution_timeout.py - Add threading lock to fix race condition in timeout monitoring - Protect current_execution access with lock - Copy execution data before processing outside lock - All fixes validated and tested
1 parent 4020118 commit 8063f60

File tree

2 files changed

+44
-38
lines changed

2 files changed

+44
-38
lines changed

deepnote_toolkit/execution_timeout.py

Lines changed: 42 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import os
77
import signal
88
import threading
9+
import time
910
from typing import Optional
1011

1112
import requests
@@ -41,36 +42,36 @@ def __init__(
4142
self.current_execution: Optional[dict] = None
4243
self.warning_timer: Optional[threading.Timer] = None
4344
self.timeout_timer: Optional[threading.Timer] = None
45+
self._execution_lock = threading.Lock()
4446

4547
def on_pre_execute(self, info: ExecutionInfo) -> None:
4648
"""
4749
Called before executing a cell.
4850
Starts timers for warning and timeout.
4951
"""
50-
import time
51-
5252
cell_preview = info.raw_cell[:100] if info.raw_cell else "<empty>"
5353

54-
self.current_execution = {
55-
"code": cell_preview,
56-
"start": time.time(),
57-
}
58-
59-
# Start warning timer
60-
if self.warning_threshold > 0:
61-
self.warning_timer = threading.Timer(
62-
self.warning_threshold, self._send_warning
63-
)
64-
self.warning_timer.daemon = True
65-
self.warning_timer.start()
54+
with self._execution_lock:
55+
self.current_execution = {
56+
"code": cell_preview,
57+
"start": time.time(),
58+
}
6659

67-
# Start timeout timer
68-
if self.enable_auto_interrupt and self.timeout_threshold > 0:
69-
self.timeout_timer = threading.Timer(
70-
self.timeout_threshold, self._interrupt_execution
71-
)
72-
self.timeout_timer.daemon = True
73-
self.timeout_timer.start()
60+
# Start warning timer
61+
if self.warning_threshold > 0:
62+
self.warning_timer = threading.Timer(
63+
self.warning_threshold, self._send_warning
64+
)
65+
self.warning_timer.daemon = True
66+
self.warning_timer.start()
67+
68+
# Start timeout timer
69+
if self.enable_auto_interrupt and self.timeout_threshold > 0:
70+
self.timeout_timer = threading.Timer(
71+
self.timeout_threshold, self._interrupt_execution
72+
)
73+
self.timeout_timer.daemon = True
74+
self.timeout_timer.start()
7475

7576
self.logger.debug(
7677
"Timeout monitoring started: warning=%ds, timeout=%ds, auto_interrupt=%s",
@@ -84,8 +85,9 @@ def on_post_execute(self, result: ExecutionResult) -> None:
8485
Called after executing a cell.
8586
Cancels any pending timers.
8687
"""
87-
self._cancel_timers()
88-
self.current_execution = None
88+
with self._execution_lock:
89+
self._cancel_timers()
90+
self.current_execution = None
8991

9092
def _cancel_timers(self) -> None:
9193
"""Cancel all active timers."""
@@ -98,13 +100,15 @@ def _cancel_timers(self) -> None:
98100

99101
def _send_warning(self) -> None:
100102
"""Send warning when execution is running longer than threshold."""
101-
if not self.current_execution:
102-
return
103-
104-
import time
103+
# Capture execution data while holding lock
104+
with self._execution_lock:
105+
if not self.current_execution:
106+
return
107+
execution_data = self.current_execution.copy()
105108

106-
duration = time.time() - self.current_execution["start"]
107-
code_preview = self.current_execution["code"][:50]
109+
# Process outside lock to avoid blocking
110+
duration = time.time() - execution_data["start"]
111+
code_preview = execution_data["code"][:50]
108112

109113
self.logger.warning(
110114
"LONG_EXECUTION | duration=%.1fs | preview=%s",
@@ -117,22 +121,23 @@ def _send_warning(self) -> None:
117121

118122
def _interrupt_execution(self) -> None:
119123
"""Interrupt execution after timeout threshold is exceeded."""
120-
if not self.current_execution:
121-
return
124+
# Capture execution data while holding lock
125+
with self._execution_lock:
126+
if not self.current_execution:
127+
return
128+
execution_data = self.current_execution.copy()
122129

123-
import time
124-
125-
duration = time.time() - self.current_execution["start"]
130+
# Process outside lock to avoid blocking
131+
duration = time.time() - execution_data["start"]
132+
code_preview = execution_data["code"][:50]
126133

127134
self.logger.error(
128135
"TIMEOUT_INTERRUPT | duration=%.1fs | Sending SIGINT to interrupt execution",
129136
duration,
130137
)
131138

132139
# Report to webapp before interrupting
133-
self._report_to_webapp(
134-
duration, self.current_execution["code"][:50], warning=False
135-
)
140+
self._report_to_webapp(duration, code_preview, warning=False)
136141

137142
# Send SIGINT to interrupt the execution (simulates Ctrl+C)
138143
try:

deepnote_toolkit/ipython_utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
from typing import Optional
23

34
from IPython import get_ipython
@@ -56,7 +57,7 @@ def publish_execution_metadata(
5657
"execution_count": execution_count,
5758
"duration_seconds": duration,
5859
"success": success,
59-
"timestamp": duration, # Using duration as timestamp for now
60+
"timestamp": time.time(),
6061
}
6162

6263
if error_type:

0 commit comments

Comments
 (0)