1313
1414
1515class TestGalaxyInvocations (GalaxyTestBase .GalaxyTestBase ):
16+ workflow_id : str
17+ pause_workflow_id : str
18+
19+ @classmethod
20+ def setUpClass (cls ) -> None :
21+ super ().setUpClass ()
22+ path = test_util .get_abspath (os .path .join ("data" , "paste_columns.ga" ))
23+ cls .workflow_id = cls .gi .workflows .import_workflow_from_local_path (path )["id" ]
24+ path = test_util .get_abspath (os .path .join ("data" , "test_workflow_pause.ga" ))
25+ cls .pause_workflow_id = cls .gi .workflows .import_workflow_from_local_path (path )["id" ]
26+
1627 def setUp (self ):
1728 super ().setUp ()
18- path = test_util .get_abspath (os .path .join ("data" , "paste_columns.ga" ))
19- self .workflow_id = self .gi .workflows .import_workflow_from_local_path (path )["id" ]
2029 self .history_id = self .gi .histories .create_history (name = "TestGalaxyInvocations" )["id" ]
2130 self .dataset_id = self ._test_dataset (self .history_id )
22- path = test_util .get_abspath (os .path .join ("data" , "test_workflow_pause.ga" ))
23- self .pause_workflow_id = self .gi .workflows .import_workflow_from_local_path (path )["id" ]
2431
2532 def tearDown (self ):
2633 self .gi .histories .delete_history (self .history_id , purge = True )
@@ -42,16 +49,16 @@ def test_cancel_invocation(self):
4249 def test_get_invocations (self ):
4350 invoc1 = self ._invoke_workflow ()
4451
45- # Run the first workflow on another history
52+ # Run another workflow on the same history
53+ path = test_util .get_abspath (os .path .join ("data" , "paste_columns.ga" ))
54+ workflow2_id = self .gi .workflows .import_workflow_from_local_path (path )["id" ]
4655 dataset = {"src" : "hda" , "id" : self .dataset_id }
47- hist2_id = self .gi .histories .create_history ("hist2" )["id" ]
4856 invoc2 = self .gi .workflows .invoke_workflow (
49- self . workflow_id , history_id = hist2_id , inputs = {"Input 1" : dataset , "Input 2" : dataset }, inputs_by = "name"
57+ workflow2_id , history_id = self . history_id , inputs = {"Input 1" : dataset , "Input 2" : dataset }, inputs_by = "name"
5058 )
5159
52- # Run another workflow on the 2nd history
53- path = test_util .get_abspath (os .path .join ("data" , "paste_columns.ga" ))
54- workflow2_id = self .gi .workflows .import_workflow_from_local_path (path )["id" ]
60+ # Run the second workflow on another history
61+ hist2_id = self .gi .histories .create_history ("hist2" )["id" ]
5562 invoc3 = self .gi .workflows .invoke_workflow (
5663 workflow2_id , history_id = hist2_id , inputs = {"Input 1" : dataset , "Input 2" : dataset }, inputs_by = "name"
5764 )
@@ -60,14 +67,13 @@ def test_get_invocations(self):
6067 self .gi .invocations .wait_for_invocation (invoc ["id" ])
6168
6269 # Test filtering by workflow ID
63- for wf_id , expected_invoc_num in {self .workflow_id : 2 , workflow2_id : 1 }.items ():
64- invocs = self .gi .invocations .get_invocations (workflow_id = wf_id )
65- assert len (invocs ) == expected_invoc_num
66- for invoc in invocs :
67- assert invoc ["workflow_id" ] == wf_id
70+ invocs = self .gi .invocations .get_invocations (workflow_id = workflow2_id )
71+ assert len (invocs ) == 2
72+ for invoc in invocs :
73+ assert invoc ["workflow_id" ] == workflow2_id
6874
6975 # Test filtering by history ID
70- for hist_id , expected_invoc_num in {self .history_id : 1 , hist2_id : 2 }.items ():
76+ for hist_id , expected_invoc_num in {self .history_id : 2 , hist2_id : 1 }.items ():
7177 invocs = self .gi .invocations .get_invocations (history_id = hist_id )
7278 assert len (invocs ) == expected_invoc_num
7379 for invoc in invocs :
0 commit comments