Skip to content

Commit 113e01f

Browse files
committed
good locking
1 parent 1915ab9 commit 113e01f

File tree

4 files changed

+23
-13
lines changed

4 files changed

+23
-13
lines changed

taskvine_report/cli/report.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,24 +50,24 @@ def create_app(logs_dir):
5050
package_dir = os.path.dirname(os.path.dirname(__file__))
5151
template_dir = os.path.join(package_dir, 'templates')
5252
static_dir = os.path.join(package_dir, 'static')
53-
53+
5454
app = Flask(__name__,
5555
template_folder=template_dir,
5656
static_folder=static_dir)
5757

5858
def setup_request_logging(app):
5959
@app.before_request
6060
def log_request_info():
61-
app.config["RUNTIME_STATE"].template_lock.renew()
6261
request_folder = request.args.get('folder')
62+
app.config["PROCESSING_REQUESTS_COUNT"] += 1
6363
app.config["RUNTIME_STATE"].log_request(request)
6464
request._start_time = time.time()
65-
6665
app.config["RUNTIME_STATE"].ensure_runtime_template(request_folder)
6766

6867
@app.after_request
6968
def log_response_info(response):
7069
app.config["RUNTIME_STATE"].template_lock.renew()
70+
app.config["PROCESSING_REQUESTS_COUNT"] -= 1
7171
if hasattr(request, '_start_time'):
7272
duration = time.time() - request._start_time
7373
app.config["RUNTIME_STATE"].log_response(response, request, duration)
@@ -123,6 +123,7 @@ def log_response_info(response):
123123
app.config["RUNTIME_STATE"].set_logger()
124124
app.config["RUNTIME_STATE"].set_logs_dir(logs_dir)
125125
app.config["RUNTIME_STATE"].log_info(f"Using logs directory: {logs_dir}")
126+
app.config["PROCESSING_REQUESTS_COUNT"] = 0
126127

127128
@app.route('/')
128129
def index():

taskvine_report/routes/runtime_state.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,34 @@
66
import time
77
import traceback
88
import threading
9-
import cloudpickle
9+
from flask import current_app
1010

1111

1212
class LeaseLock:
13-
def __init__(self, lease_duration_sec=60):
13+
def __init__(self, lease_duration_sec=20):
1414
self._lock = threading.Lock()
1515
self._expiry_time = 0
1616
self._lease_duration = lease_duration_sec
1717

1818
def acquire(self):
1919
now = time.time()
20+
# release lock if it's expired
2021
if self._lock.locked() and now > self._expiry_time:
2122
try:
2223
self._lock.release()
2324
except RuntimeError:
2425
pass
26+
# release lock if there is no request being processed (the lock request itself is a request)
27+
if current_app.config["PROCESSING_REQUESTS_COUNT"] <= 1:
28+
try:
29+
self._lock.release()
30+
except RuntimeError:
31+
pass
2532

33+
# acquire lock as non-blocking
2634
if not self._lock.acquire(blocking=False):
2735
return False
2836

29-
self._expiry_time = time.time() + self._lease_duration
3037
return True
3138

3239
def release(self):
@@ -67,11 +74,11 @@ def __init__(self):
6774
self.tick_size = 12
6875

6976
# for preventing multiple instances of the same runtime template
70-
self.template_lock = LeaseLock(lease_duration_sec=60)
77+
self.template_lock = LeaseLock()
7178

7279
# for preventing multiple reloads of the data
7380
self._pkl_files_fingerprint = None
74-
self.reload_lock = LeaseLock(lease_duration_sec=180)
81+
self.reload_lock = LeaseLock()
7582

7683
def set_logger(self):
7784
self.logger = Logger()

taskvine_report/src/data_parser.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -845,7 +845,6 @@ def parse_logs(self):
845845
self.parse_debug()
846846

847847
def generate_subgraphs(self):
848-
time_start = time.time()
849848
# exclude library tasks from subgraph generation to match RUNTIME_STATE filtering
850849
tasks_keys = set(key for key, task in self.tasks.items() if not task.is_library_task)
851850
parent = {key: key for key in tasks_keys}
@@ -878,7 +877,7 @@ def union(x, y):
878877
rank[root_x] += 1
879878

880879
with self._create_progress_bar() as progress:
881-
task_id = progress.add_task("Parsing subgraphs", total=len(self.files))
880+
task_id = progress.add_task("Parsing file dependencies", total=len(self.files))
882881

883882
for file in self.files.values():
884883
progress.update(task_id, advance=1)
@@ -904,9 +903,6 @@ def union(x, y):
904903
self.subgraphs = {i: subgraph for i,
905904
subgraph in enumerate(sorted_subgraphs, 1)}
906905

907-
time_end = time.time()
908-
print(f"Parsing subgraphs took {round(time_end - time_start, 4)} seconds")
909-
910906
self.checkpoint_subgraphs()
911907

912908
def checkpoint_debug(self):

taskvine_report/utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,3 +524,9 @@ def ensure_dir(path):
524524
if os.path.exists(path):
525525
return
526526
os.makedirs(path)
527+
528+
def get_current_runtime_template():
529+
return current_app.config["RUNTIME_STATE"].runtime_template
530+
531+
def request_template_matches_current_runtime_template(request):
532+
return request.args.get('folder') == get_current_runtime_template()

0 commit comments

Comments
 (0)