10
10
import datetime
11
11
import random
12
12
import pandas as pd
13
- import dateutil
14
13
from collections import OrderedDict
15
14
16
15
17
- def log_to_events (logfile ):
18
- events = []
19
- with open (logfile , 'r' ) as content :
20
- #read file separating each line
21
- content = content .read ()
22
- lines = content .split ('\n ' )
16
+ def create_event_dict (start_time , nodes_list ):
17
+ '''
18
+ Function to generate a dictionary of event (start/finish) nodes
19
+ from the nodes list
23
20
24
- for l in lines :
25
- event = None
26
- try :
27
- event = json . loads ( l )
28
- except Exception , e :
29
- pass
21
+ Parameters
22
+ ----------
23
+ start_time : datetime.datetime
24
+ a datetime object of the pipeline start time
25
+ nodes_list : list
26
+ a list of the node dictionaries that were run in the pipeline
30
27
31
- if not event : continue
28
+ Returns
29
+ -------
30
+ events : dictionary
31
+ a dictionary where the key is the timedelta from the start of
32
+ the pipeline execution to the value node it accompanies
33
+ '''
32
34
33
- if 'start' in event :
34
- event ['type' ] = 'start'
35
- event ['time' ] = event ['start' ]
36
- else :
37
- event ['type' ] = 'finish'
38
- event ['time' ] = event ['finish' ]
35
+ # Import packages
36
+ import copy
39
37
40
- events .append (event )
38
+ events = {}
39
+ for node in nodes_list :
40
+ # Format node fields
41
+ try :
42
+ estimated_threads = float (node ['num_threads' ])
43
+ except :
44
+ estimated_threads = 1
45
+ try :
46
+ estimated_memory_gb = float (node ['estimated_memory_gb' ])
47
+ except :
48
+ estimated_memory_gb = 1.0
49
+ try :
50
+ runtime_threads = float (node ['runtime_threads' ])
51
+ except :
52
+ runtime_threads = 0
53
+ try :
54
+ runtime_memory_gb = float (node ['runtime_memory_gb' ])
55
+ except :
56
+ runtime_memory_gb = 0.0
57
+
58
+ # Init and format event-based nodes
59
+ node ['estimated_threads' ] = estimated_threads
60
+ node ['estimated_memory_gb' ] = estimated_memory_gb
61
+ node ['runtime_threads' ] = runtime_threads
62
+ node ['runtime_memory_gb' ] = runtime_memory_gb
63
+ start_node = node
64
+ finish_node = copy .deepcopy (node )
65
+ start_node ['event' ] = 'start'
66
+ finish_node ['event' ] = 'finish'
67
+
68
+ # Get dictionary key
69
+ start_delta = (node ['start' ] - start_time ).total_seconds ()
70
+ finish_delta = (node ['finish' ] - start_time ).total_seconds ()
71
+
72
+ # Populate dictionary
73
+ if events .has_key (start_delta ) or events .has_key (finish_delta ):
74
+ err_msg = 'Event logged twice or events started at exact same time!'
75
+ raise KeyError (err_msg )
76
+ events [start_delta ] = start_node
77
+ events [finish_delta ] = finish_node
78
+
79
+ # Return events dictionary
41
80
return events
42
81
43
82
@@ -65,7 +104,6 @@ def log_to_dict(logfile):
65
104
unifinished_nodes = [] #all start nodes that dont have a finish yet
66
105
67
106
with open (logfile , 'r' ) as content :
68
-
69
107
#read file separating each line
70
108
content = content .read ()
71
109
lines = content .split ('\n ' )
@@ -98,7 +136,8 @@ def log_to_dict(logfile):
98
136
if aux ['id' ] == node ['id' ] and aux ['name' ] == node ['name' ] \
99
137
and aux ['start' ] < node ['finish' ]:
100
138
node ['start' ] = aux ['start' ]
101
- node ['duration' ] = (node ['finish' ] - node ['start' ]).total_seconds ()
139
+ node ['duration' ] = \
140
+ (node ['finish' ] - node ['start' ]).total_seconds ()
102
141
103
142
unifinished_nodes .remove (aux )
104
143
nodes_list .append (node )
@@ -113,29 +152,54 @@ def log_to_dict(logfile):
113
152
n ['duration' ] = (n ['finish' ] - n ['start' ]).total_seconds ()
114
153
nodes_list .append (n )
115
154
155
+ # Return list of nodes
116
156
return nodes_list
117
157
118
158
119
- def calculate_resources (events , resource ):
159
+ def calculate_resource_timeseries (events , resource ):
160
+ '''
161
+ Given as event dictionary, calculate the resources used
162
+ as a timeseries
163
+
164
+ Parameters
165
+ ----------
166
+ events : dictionary
167
+ a dictionary of event-based node dictionaries of the workflow
168
+ execution statistics
169
+ resource : string
170
+ the resource of interest to return the time-series of;
171
+ e.g. 'runtime_memory_gb', 'estimated_threads', etc
172
+
173
+ Returns
174
+ -------
175
+ time_series : pandas Series
176
+ a pandas Series object that contains timestamps as the indices
177
+ and the resource amount as values
178
+ '''
179
+
180
+ # Init variables
120
181
res = OrderedDict ()
121
- for event in events :
122
- all_res = 0.0
123
- if event ['type' ] == "start" :
182
+ all_res = 0.0
183
+
184
+ # Iterate through the events
185
+ for tdelta , event in sorted (events .items ()):
186
+ if event ['event' ] == "start" :
124
187
if resource in event and event [resource ] != 'Unkown' :
125
188
all_res += float (event [resource ])
126
189
current_time = event ['start' ];
127
- elif event ['type ' ] == "finish" :
190
+ elif event ['event ' ] == "finish" :
128
191
if resource in event and event [resource ] != 'Unkown' :
129
- all_res + = float (event [resource ])
192
+ all_res - = float (event [resource ])
130
193
current_time = event ['finish' ];
131
194
res [current_time ] = all_res
132
195
133
- timestamps = [dateutil .parser .parse (ts ) for ts in res .keys ()]
134
- time_series = pd .Series (data = res .values (), index = timestamps )
135
- #TODO: pandas is removing all data values somewhere here
136
- #interp_seq = pd.date_range(time_series.index[0], time_series.index[-1], freq='U')
137
- #interp_time_series = time_series.reindex(interp_seq)
138
- #interp_time_series = interp_time_series.fillna(method='ffill')
196
+ # Formulate the pandas timeseries
197
+ time_series = pd .Series (data = res .values (), index = res .keys ())
198
+ # Downsample where there is only value-diff
199
+ ts_diff = time_series .diff ()
200
+ time_series = time_series [ts_diff != 0 ]
201
+
202
+ # Return the new time series
139
203
return time_series
140
204
141
205
@@ -186,7 +250,8 @@ def draw_lines(start, total_duration, minute_scale, scale):
186
250
return result
187
251
188
252
189
- def draw_nodes (start , nodes_list , cores , minute_scale , space_between_minutes , colors ):
253
+ def draw_nodes (start , nodes_list , cores , minute_scale , space_between_minutes ,
254
+ colors ):
190
255
'''
191
256
Function to return the html-string of the node drawings for the
192
257
gantt chart
@@ -269,8 +334,8 @@ def draw_nodes(start, nodes_list, cores, minute_scale, space_between_minutes, co
269
334
'node_finish' : node_finish .strftime ("%Y-%m-%d %H:%M:%S" )}
270
335
# Create new node string
271
336
new_node = "<div class='node' style='left:%(left)spx;top:%(offset)spx;" \
272
- "height:%(scale_duration)spx;background-color:%(color)s;" \
273
- "' title='%(node_name)s\n duration:%(node_dur)s\n " \
337
+ "height:%(scale_duration)spx;background-color:%(color)s;' " \
338
+ "title='%(node_name)s\n duration:%(node_dur)s\n " \
274
339
"start:%(node_start)s\n end:%(node_finish)s'></div>" % \
275
340
node_dict
276
341
@@ -280,49 +345,79 @@ def draw_nodes(start, nodes_list, cores, minute_scale, space_between_minutes, co
280
345
# Return html string for nodes
281
346
return result
282
347
283
-
284
- def draw_thread_bar (threads ,space_between_minutes , minute_scale , color ):
285
- result = "<p class='time' style='top:198px;left:900px;'>Threads</p>"
286
-
287
- scale = float (space_between_minutes / float (minute_scale ))
288
- space_between_minutes = float (space_between_minutes / 60.0 )
289
-
290
- for i in range (len (threads )):
291
- #print threads[i]
292
- width = threads [i ] * 10
293
- t = (float (i * scale * minute_scale )/ 60.0 ) + 220
294
- bar = "<div class='bar' style='height:" + str (space_between_minutes ) + "px;width:" + str (width ) + "px;left:900px;top:" + str (t )+ "px'></div>"
295
- result += bar
296
-
297
- return result
298
-
299
-
300
- def draw_memory_bar (nodes_list , space_between_minutes , minute_scale , color ,
301
- mem_key = 'runtime_memory_gb' ):
348
+ # def draw_thread_bar(threads,space_between_minutes, minute_scale, color):
349
+ # result = "<p class='time' style='top:198px;left:900px;'>Threads</p>"
350
+ #
351
+ # scale = float(space_between_minutes/float(minute_scale))
352
+ # space_between_minutes = float(space_between_minutes/60.0)
353
+ #
354
+ # for i in range(len(threads)):
355
+ # #print threads[i]
356
+ # width = threads[i] * 10
357
+ # t = (float(i*scale*minute_scale)/60.0) + 220
358
+ # bar = "<div class='bar' style='height:"+ str(space_between_minutes) + "px;width:"+ str(width) +"px;left:900px;top:"+str(t)+"px'></div>"
359
+ # result += bar
360
+ #
361
+ # return result
362
+
363
+ def draw_resource_bar (start_time , finish_time , time_series , space_between_minutes ,
364
+ minute_scale , color , left , resource ):
302
365
'''
303
366
'''
304
367
305
- # Init variables
306
368
# Memory header
307
- result = "<p class='time' style='top:198px;left:1200px;'>Memory</p>"
308
- #
369
+ result = "<p class='time' style='top:198px;left:%dpx;'>%s</p>" \
370
+ % (left , resource )
371
+ # Image scaling factors
309
372
scale = float (space_between_minutes / float (minute_scale ))
310
373
space_between_minutes = float (space_between_minutes / scale )
311
374
312
- for idx , node in enumerate (nodes_list ):
313
- try :
314
- memory = float (node [mem_key ])
315
- except :
316
- memory = 0
317
-
318
- height = (node ['duration' ] / 60 ) * scale * space_between_minutes
319
- width = memory * 20
320
- t = (float (idx * scale * minute_scale )/ 60.0 ) + 220
321
- bar = "<div class='bar' style='background-color:" + color + ";height:" + \
322
- str (height ) + "px;width:" + str (width ) + \
323
- "px;left:1200px;top:" + str (t )+ "px'></div>"
324
- result += bar
325
-
375
+ # Iterate through time series
376
+ ts_len = len (time_series )
377
+ for idx , (ts_start , amount ) in enumerate (time_series .iteritems ()):
378
+ if idx < ts_len - 1 :
379
+ ts_end = time_series .index [idx + 1 ]
380
+ else :
381
+ ts_end = finish_time
382
+ # Calculate offset from start at top
383
+ offset = ((ts_start - start_time ).total_seconds () / 60.0 ) * scale * \
384
+ space_between_minutes + 220
385
+ # Scale duration
386
+ duration_mins = (ts_end - ts_start ).total_seconds () / 60.0
387
+ height = duration_mins * scale * \
388
+ space_between_minutes
389
+ if height < 5 :
390
+ height = 5
391
+ height -= 2
392
+
393
+ # Bar width is proportional to resource amount
394
+ width = amount * 20
395
+
396
+ if resource .lower () == 'memory' :
397
+ label = '%.3f GB' % amount
398
+ else :
399
+ label = '%d threads' % amount
400
+
401
+ # Setup dictionary for bar html string insertion
402
+ bar_dict = {'color' : color ,
403
+ 'height' : height ,
404
+ 'width' : width ,
405
+ 'offset' : offset ,
406
+ 'left' : left ,
407
+ 'label' : label ,
408
+ 'duration' : duration_mins ,
409
+ 'start' : ts_start .strftime ('%Y-%m-%d %H:%M:%S' ),
410
+ 'finish' : ts_end .strftime ('%Y-%m-%d %H:%M:%S' )}
411
+
412
+ bar_html = "<div class='bar' style='background-color:%(color)s;" \
413
+ "height:%(height).3fpx;width:%(width).3fpx;" \
414
+ "left:%(left)dpx; top:%(offset).3fpx;'" \
415
+ "title='%(label)s\n duration:%(duration).3f\n " \
416
+ "start:%(start)s\n end:%(finish)s'></div>"
417
+ # Add another bar to html line
418
+ result += bar_html % bar_dict
419
+
420
+ # Return bar-formatted html string
326
421
return result
327
422
328
423
@@ -379,9 +474,6 @@ def generate_gantt_chart(logfile, cores, minute_scale=10,
379
474
# generate_gantt_chart('callback.log', 8)
380
475
'''
381
476
382
- nodes_list = log_to_dict (logfile )
383
- scale = space_between_minutes
384
-
385
477
#add the html header
386
478
html_string = '''<!DOCTYPE html>
387
479
<head>
@@ -432,46 +524,54 @@ def generate_gantt_chart(logfile, cores, minute_scale=10,
432
524
<body>
433
525
<div id="content">'''
434
526
527
+ # Read in json-log to get list of node dicts
528
+ nodes_list = log_to_dict (logfile )
435
529
436
- #create the header of the report with useful information
530
+ # Create the header of the report with useful information
437
531
start_node = nodes_list [0 ]
438
532
last_node = nodes_list [- 1 ]
439
533
duration = (last_node ['finish' ] - start_node ['start' ]).total_seconds ()
440
534
441
- #summary strings of workflow at top
535
+ # Get events based dictionary of node run stats
536
+ events = create_event_dict (start_node ['start' ], nodes_list )
537
+
538
+ # Summary strings of workflow at top
442
539
html_string += '<p>Start: ' + start_node ['start' ].strftime ("%Y-%m-%d %H:%M:%S" ) + '</p>'
443
540
html_string += '<p>Finish: ' + last_node ['finish' ].strftime ("%Y-%m-%d %H:%M:%S" ) + '</p>'
444
541
html_string += '<p>Duration: ' + "{0:.2f}" .format (duration / 60 ) + ' minutes</p>'
445
542
html_string += '<p>Nodes: ' + str (len (nodes_list ))+ '</p>'
446
543
html_string += '<p>Cores: ' + str (cores ) + '</p>'
447
544
545
+ # Draw nipype nodes Gantt chart and runtimes
448
546
html_string += draw_lines (start_node ['start' ], duration , minute_scale ,
449
547
space_between_minutes )
450
548
html_string += draw_nodes (start_node ['start' ], nodes_list , cores , minute_scale ,
451
549
space_between_minutes , colors )
452
550
453
- result = log_to_events (logfile )
454
-
455
- #threads_estimated = calculate_resources(result, 'num_threads')
456
- #html_string += draw_thread_bar(threads_estimated, space_between_minutes, minute_scale, '#90BBD7')
457
-
458
- #threads_real = calculate_resources(result, 'runtime_threads')
459
- #html_string += draw_thread_bar(threads_real, space_between_minutes, minute_scale, '#03969D')
460
-
461
-
462
- #memory_estimated = calculate_resources(result, 'estimated_memory_gb')
463
- #html_string += draw_memory_bar(memory_estimated, space_between_minutes, minute_scale, '#90BBD7')
464
-
465
- memory_real = calculate_resources (result , 'runtime_memory_gb' )
466
- html_string += draw_memory_bar (nodes_list , space_between_minutes , minute_scale , '#03969D' )
467
-
551
+ # Get memory timeseries
552
+ estimated_mem_ts = calculate_resource_timeseries (events , 'estimated_memory_gb' )
553
+ runtime_mem_ts = calculate_resource_timeseries (events , 'runtime_memory_gb' )
554
+ # Plot gantt chart
555
+ html_string += draw_resource_bar (start_node ['start' ], last_node ['finish' ], estimated_mem_ts ,
556
+ space_between_minutes , minute_scale , '#90BBD7' , 1200 , 'Memory' )
557
+ html_string += draw_resource_bar (start_node ['start' ], last_node ['finish' ], runtime_mem_ts ,
558
+ space_between_minutes , minute_scale , '#03969D' , 1200 , 'Memory' )
559
+
560
+ # Get threads timeseries
561
+ estimated_threads_ts = calculate_resource_timeseries (events , 'estimated_threads' )
562
+ runtime_threads_ts = calculate_resource_timeseries (events , 'runtime_threads' )
563
+ # Plot gantt chart
564
+ html_string += draw_resource_bar (start_node ['start' ], last_node ['finish' ], estimated_threads_ts ,
565
+ space_between_minutes , minute_scale , '#90BBD7' , 600 , 'Threads' )
566
+ html_string += draw_resource_bar (start_node ['start' ], last_node ['finish' ], runtime_threads_ts ,
567
+ space_between_minutes , minute_scale , '#03969D' , 600 , 'Threads' )
468
568
469
569
#finish html
470
570
html_string += '''
471
571
</div>
472
572
</body>'''
473
573
474
574
#save file
475
- html_file = open (logfile + '.html' , 'wb' )
575
+ html_file = open (logfile + '.html' , 'wb' )
476
576
html_file .write (html_string )
477
577
html_file .close ()
0 commit comments