@@ -60,3 +60,36 @@ def test_callback_exception(tmpdir, plugin, stop_on_first_crash):
60
60
61
61
sleep (0.5 ) # Wait for callback to be called (python 2.7)
62
62
assert so .statuses == [("f_node" , "start" ), ("f_node" , "exception" )]
63
+
64
+
65
+ @pytest .mark .parametrize ("plugin" , ["Linear" , "MultiProc" , "LegacyMultiProc" ])
66
+ def test_callback_gantt (tmpdir , plugin ):
67
+ import logging
68
+ import logging .handlers
69
+
70
+ from os import path
71
+
72
+ from nipype .utils .profiler import log_nodes_cb
73
+ from nipype .utils .draw_gantt_chart import generate_gantt_chart
74
+
75
+ log_filename = 'callback.log'
76
+ logger = logging .getLogger ('callback' )
77
+ logger .setLevel (logging .DEBUG )
78
+ handler = logging .FileHandler (log_filename )
79
+ logger .addHandler (handler )
80
+
81
+ #create workflow
82
+ wf = pe .Workflow (name = "test" , base_dir = tmpdir .strpath )
83
+ f_node = pe .Node (
84
+ niu .Function (function = func , input_names = [], output_names = []), name = "f_node"
85
+ )
86
+ wf .add_nodes ([f_node ])
87
+ wf .config ["execution" ] = {"crashdump_dir" : wf .base_dir , "poll_sleep_duration" : 2 }
88
+
89
+ plugin_args = {"status_callback" : log_nodes_cb }
90
+ if plugin != "Linear" :
91
+ plugin_args ['n_procs' ] = 8
92
+ wf .run (plugin = plugin , plugin_args = plugin_args )
93
+
94
+ generate_gantt_chart ('callback.log' , 1 if plugin == "Linear" else 8 )
95
+ assert path .exists ('callback.log.html' )
0 commit comments