Skip to content

Commit 7f1a7a5

Browse files
committed
task subgraphs
1 parent a14b8d6 commit 7f1a7a5

File tree

6 files changed

+173
-24
lines changed

6 files changed

+173
-24
lines changed

app.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from routes.task_completion_percentiles import task_completion_percentiles_bp
2222
from routes.task_dependencies import task_dependencies_bp
2323
from routes.lock import lock_bp
24+
from routes.task_subgraphs import task_subgraphs_bp
2425

2526
import argparse
2627
import os
@@ -79,6 +80,7 @@ def log_response_info(response):
7980

8081
# subgraphs
8182
app.register_blueprint(subgraphs_bp)
83+
app.register_blueprint(task_subgraphs_bp)
8284

8385
# runtime template
8486
app.register_blueprint(runtime_template_bp)

routes/runtime_state.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,20 @@ def wrapper(*args, **kwargs):
7373

7474
response = func(*args, **kwargs)
7575

76-
response_data = response.get_json() if hasattr(response, 'get_json') else response
77-
response_size = len(json.dumps(response_data)) if response_data else 0
76+
if hasattr(response, 'get_json'):
77+
try:
78+
response_data = response.get_json()
79+
except Exception:
80+
response_data = None
81+
else:
82+
response_data = response
83+
84+
if isinstance(response_data, (dict, list)):
85+
response_size = len(json.dumps(response_data)) if response_data else 0
86+
elif hasattr(response, 'get_data'):
87+
response_size = len(response.get_data())
88+
else:
89+
response_size = 0
7890

7991
route_name = func.__name__
8092
runtime_state.log_info(f"Route {route_name} response size: {response_size/1024/1024:.2f} MB")

routes/task_subgraphs.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
from .runtime_state import runtime_state, check_and_reload_data
2+
from flask import Blueprint, jsonify, request
3+
import graphviz
4+
import os
5+
from pathlib import Path
6+
7+
8+
task_subgraphs_bp = Blueprint('task_subgraphs', __name__, url_prefix='/api')
9+
10+
@task_subgraphs_bp.route('/task-subgraphs')
11+
@check_and_reload_data()
12+
def get_task_subgraphs():
13+
try:
14+
data = {}
15+
16+
subgraph_id = request.args.get('subgraph_id')
17+
if not subgraph_id:
18+
return jsonify({'error': 'Subgraph ID is required'}), 400
19+
try:
20+
subgraph_id = int(subgraph_id)
21+
except Exception:
22+
return jsonify({'error': 'Invalid subgraph ID'}), 400
23+
24+
plot_unsuccessful_task = request.args.get('plot_failed_task', 'true').lower() == 'true'
25+
plot_recovery_task = request.args.get('plot_recovery_task', 'true').lower() == 'true'
26+
27+
subgraph = runtime_state.subgraphs.get(subgraph_id)
28+
if not subgraph:
29+
return jsonify({'error': 'Subgraph not found'}), 404
30+
task_tries = list(subgraph)
31+
32+
svg_file_path_without_suffix = os.path.join(
33+
runtime_state.data_parser.svg_files_dir,
34+
f'task-subgraph-{subgraph_id}-{plot_unsuccessful_task}-{plot_recovery_task}'
35+
)
36+
svg_file_path = f'{svg_file_path_without_suffix}.svg'
37+
38+
if not Path(svg_file_path).exists():
39+
dot = graphviz.Digraph()
40+
41+
def plot_task_node(dot, task):
42+
node_id = f'{task.task_id}-{task.task_try_id}'
43+
node_label = f'{task.task_id}'
44+
45+
if task.when_failure_happens:
46+
node_label = f'{node_label} (unsuccessful)'
47+
style = 'dashed'
48+
color = '#FF0000'
49+
fontcolor = '#FF0000'
50+
else:
51+
style = 'solid'
52+
color = '#000000'
53+
fontcolor = '#000000'
54+
55+
if task.is_recovery_task:
56+
node_label = f'{node_label} (recovery)'
57+
style = 'filled,dashed'
58+
fillcolor = '#FF69B4'
59+
else:
60+
fillcolor = '#FFFFFF'
61+
62+
dot.node(node_id, node_label, shape='ellipse', style=style, color=color, fontcolor=fontcolor, fillcolor=fillcolor)
63+
64+
def plot_file_node(dot, file):
65+
if len(file.producers) == 0:
66+
return
67+
file_name = file.filename
68+
dot.node(file_name, file_name, shape='box')
69+
70+
def plot_task2file_edge(dot, task, file):
71+
if len(file.producers) == 0:
72+
return
73+
if task.when_failure_happens:
74+
return
75+
else:
76+
task_execution_time = task.time_worker_end - task.time_worker_start
77+
dot.edge(f'{task.task_id}-{task.task_try_id}', file.filename, label=f'{task_execution_time:.2f}s')
78+
79+
def plot_file2task_edge(dot, file, task):
80+
if len(file.producers) == 0:
81+
return
82+
file_creation_time = float('inf')
83+
for producer_task_id, producer_task_try_id in file.producers:
84+
producer_task = runtime_state.tasks[(producer_task_id, producer_task_try_id)]
85+
if producer_task.time_worker_end:
86+
file_creation_time = min(file_creation_time, producer_task.time_worker_end)
87+
file_creation_time = file_creation_time - runtime_state.MIN_TIME
88+
89+
dot.edge(file.filename, f'{task.task_id}-{task.task_try_id}', label=f'{file_creation_time:.2f}s')
90+
91+
for (tid, try_id) in task_tries:
92+
task = runtime_state.tasks[(tid, try_id)]
93+
if task.is_recovery_task and not plot_recovery_task:
94+
continue
95+
if task.when_failure_happens and not plot_unsuccessful_task:
96+
continue
97+
plot_task_node(dot, task)
98+
for file_name in getattr(task, 'input_files', []):
99+
file = runtime_state.files[file_name]
100+
plot_file_node(dot, file)
101+
plot_file2task_edge(dot, file, task)
102+
for file_name in getattr(task, 'output_files', []):
103+
file = runtime_state.files[file_name]
104+
if len(file.transfers) == 0:
105+
continue
106+
plot_file_node(dot, file)
107+
plot_task2file_edge(dot, task, file)
108+
dot.attr(rankdir='TB')
109+
dot.engine = 'dot'
110+
dot.render(svg_file_path_without_suffix, format='svg', view=False)
111+
112+
data['subgraph_id'] = subgraph_id
113+
data['num_task_tries'] = len(task_tries)
114+
data['subgraph_svg_content'] = open(svg_file_path, 'r').read()
115+
116+
return jsonify(data)
117+
except Exception as e:
118+
runtime_state.logger.error(f'Error in get_task_subgraphs: {e}')
119+
return jsonify({'error': str(e)}), 500

static/js/modules/configs.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { TaskCompletionPercentilesModule } from './task_completion_percentiles.j
66
import { TaskDependenciesModule } from './task_dependencies.js';
77
import { TaskDependentsModule } from './task_dependents.js';
88
import { TaskRetrievalTimeModule } from './task_retrieval_time.js';
9+
import { TaskSubgraphsModule } from './task_subgraphs.js';
910
import { WorkerIncomingTransfersModule } from './worker_incoming_transfers.js';
1011
import { WorkerOutgoingTransfersModule } from './worker_outgoing_transfers.js';
1112
import { WorkerStorageConsumptionModule } from './worker_storage_consumption.js';
@@ -28,6 +29,7 @@ export const moduleClasses = {
2829
'task-completion-percentiles': TaskCompletionPercentilesModule,
2930
'task-dependencies': TaskDependenciesModule,
3031
'task-dependents': TaskDependentsModule,
32+
'task-subgraphs': TaskSubgraphsModule,
3133
'worker-storage-consumption': WorkerStorageConsumptionModule,
3234
'worker-concurrency': WorkerConcurrencyModule,
3335
'worker-incoming-transfers': WorkerIncomingTransfersModule,
@@ -51,6 +53,7 @@ export const moduleConfigs = [
5153
{ id: 'task-completion-percentiles', title: 'Task Completion Percentiles', api_url: '/api/task-completion-percentiles' },
5254
{ id: 'task-dependencies', title: 'Task Dependencies', api_url: '/api/task-dependencies' },
5355
{ id: 'task-dependents', title: 'Task Dependents', api_url: '/api/task-dependents' },
56+
{ id: 'task-subgraphs', title: 'Task Subgraphs', api_url: '/api/task-subgraphs' },
5457
{ id: 'worker-storage-consumption', title: 'Worker Storage Consumption', api_url: '/api/worker-storage-consumption' },
5558
{ id: 'worker-concurrency', title: 'Worker Concurrency', api_url: '/api/worker-concurrency' },
5659
{ id: 'worker-incoming-transfers', title: 'Worker Incoming Transfers', api_url: '/api/worker-incoming-transfers' },
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import { BaseModule } from './base.js';
2+
3+
export class TaskSubgraphsModule extends BaseModule {
4+
constructor(id, title, api_url) {
5+
super(id, title, api_url);
6+
this._current_subgraph_id = 1;
7+
}
8+
9+
async fetchData(folder, subgraph_id = 1, plot_failed_task = true, plot_recovery_task = true) {
10+
if (!subgraph_id) return;
11+
12+
const response = await fetch(
13+
`${this.api_url}?subgraph_id=${subgraph_id}` +
14+
`&plot_failed_task=${plot_failed_task}` +
15+
`&plot_recovery_task=${plot_recovery_task}`
16+
);
17+
18+
this.data = await response.json();
19+
}
20+
21+
plot() {
22+
this.clearSVG();
23+
24+
if (!this.data) return;
25+
26+
const svgElement = new DOMParser().parseFromString(this.data.subgraph_svg_content, 'image/svg+xml').documentElement;
27+
this.svgNode.parentNode.replaceChild(svgElement, this.svgNode);
28+
this.svgNode = svgElement;
29+
this.svgElement = d3.select(this.svgNode);
30+
this.svgElement.attr('preserveAspectRatio', 'xMidYMid meet');
31+
}
32+
33+
}

templates/index.html

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -35,33 +35,13 @@
3535
<link rel="stylesheet" href="{{ url_for('static', filename='css/sidebar.css') }}?v=1.0">
3636
<link rel="stylesheet" href="{{ url_for('static', filename='css/content.css') }}?v=1.0">
3737
<link rel="stylesheet" href="{{ url_for('static', filename='css/report.css') }}?v=1.0">
38-
39-
<!-- import js files-->
40-
<!--
41-
<script src="{{ url_for('static', filename='js/draw_tables.js') }}" type="module"></script>
42-
<script src="{{ url_for('static', filename='js/manager.js') }}" type="module"></script>
43-
<script src="{{ url_for('static', filename='js/manager_disk_usage.js') }}" type="module"></script>
44-
<script src="{{ url_for('static', filename='js/graph.js') }}" type="module"></script>
45-
<script src="{{ url_for('static', filename='js/module_definitions.js') }}" type="module"></script>
46-
<script src="{{ url_for('static', filename='js/log_viewer.js') }}" type="module"></script>
47-
-->
4838

49-
<!-- modules -->
39+
<!-- import js modules -->
5040
<script src="{{ url_for('static', filename='js/modules/base.js') }}" type="module"></script>
5141
<script src="{{ url_for('static', filename='js/modules/configs.js') }}" type="module"></script>
5242
<script src="{{ url_for('static', filename='js/modules/log_manager.js') }}" type="module"></script>
5343
<script src="{{ url_for('static', filename='js/main.js') }}" type="module"></script>
54-
<!--
55-
<script src="{{ url_for('static', filename='js/task_execution_time.js') }}" type="module"></script>
56-
<script src="{{ url_for('static', filename='js/task_response_time.js') }}" type="module"></script>
57-
<script src="{{ url_for('static', filename='js/storage_consumption.js') }}" type="module"></script>
58-
<script src="{{ url_for('static', filename='js/sidebar.js') }}" type="module"></script>
59-
<script src="{{ url_for('static', filename='js/task_concurrency.js') }}" type="module"></script>
60-
<script src="{{ url_for('static', filename='js/file_transfers.js') }}" type="module"></script>
61-
<script src="{{ url_for('static', filename='js/file_sizes.js') }}" type="module"></script>
62-
<script src="{{ url_for('static', filename='js/file_replicas.js') }}" type="module"></script>
63-
<script type="module" src="{{ url_for('static', filename='js/subgraphs.js') }}"></script>
64-
-->
44+
6545
</head>
6646
<body>
6747
<div id="vine-tooltip" class="tooltip"></div>

0 commit comments

Comments
 (0)