1414# limitations under the License.
1515
1616import logging
17- import six
18-
19- from six .moves import queue
17+ import queue
2018
2119from orquesta import constants
2220from orquesta import events
@@ -119,7 +117,7 @@ def get_task_sequence(self, task_id, route):
119117 task_id , route = q .get ()
120118
121119 for i , t in enumerate (self .sequence ):
122- for k , v in six . iteritems ( t ["prev" ]):
120+ for k , v in t ["prev" ]. items ( ):
123121 p = self .sequence [v ]
124122 if p ["id" ] == task_id and p ["route" ] == route :
125123 seq .append ((i , t ))
@@ -590,7 +588,7 @@ def get_task(self, task_id, route):
590588 if getattr (task_spec , "delay" , None ):
591589 task_delay = task_spec .delay
592590
593- if isinstance (task_delay , six . string_types ):
591+ if isinstance (task_delay , str ):
594592 task_delay = expr_base .evaluate (task_delay , task_ctx )
595593
596594 if not isinstance (task_delay , int ):
@@ -790,7 +788,7 @@ def setup_retry_in_task_state(self, task_state_entry, in_ctx_idxs):
790788
791789 # Evaluate the retry delay value.
792790 if "delay" in task_state_entry ["retry" ] and isinstance (
793- task_state_entry ["retry" ]["delay" ], six . string_types
791+ task_state_entry ["retry" ]["delay" ], str
794792 ):
795793 delay_value = expr_base .evaluate (task_state_entry ["retry" ]["delay" ], in_ctx )
796794
@@ -801,7 +799,7 @@ def setup_retry_in_task_state(self, task_state_entry, in_ctx_idxs):
801799
802800 # Evaluate the retry count value.
803801 if "count" in task_state_entry ["retry" ] and isinstance (
804- task_state_entry ["retry" ]["count" ], six . string_types
802+ task_state_entry ["retry" ]["count" ], str
805803 ):
806804 count_value = expr_base .evaluate (task_state_entry ["retry" ]["count" ], in_ctx )
807805
@@ -1227,7 +1225,7 @@ def _collapse_task_rerun_requests(self, tasks=None):
12271225 # Only the index is required for further evaluation below.
12281226 result = {
12291227 k : [i [0 ] for i in self .workflow_state .get_task_sequence (t .task_id , t .route )]
1230- for k , t in six . iteritems ( tasks )
1228+ for k , t in tasks . items ( )
12311229 }
12321230
12331231 # If the list of task request is greater than one, then we have to check whether
@@ -1238,10 +1236,7 @@ def _collapse_task_rerun_requests(self, tasks=None):
12381236 # The for loops below identify task requests that have subsequent task sequences
12391237 # not in other task requests.
12401238 result = {
1241- k : i
1242- for k , i in six .iteritems (result )
1243- for j in result .values ()
1244- if len (set (i ) - set (j )) > 0
1239+ k : i for k , i in result .items () for j in result .values () if len (set (i ) - set (j )) > 0
12451240 }
12461241
12471242 return result
@@ -1256,9 +1251,7 @@ def request_workflow_rerun(self, task_requests=None):
12561251 tasks = {t .task_state_entry_id : t for t in task_requests or []}
12571252
12581253 # If the list of tasks is provided, verify if task exist and rerunnable.
1259- invalid_rerun_requests = [
1260- t for k , t in six .iteritems (tasks ) if k not in self .workflow_state .tasks
1261- ]
1254+ invalid_rerun_requests = [t for k , t in tasks .items () if k not in self .workflow_state .tasks ]
12621255
12631256 if invalid_rerun_requests :
12641257 raise exc .InvalidTaskRerunRequest (invalid_rerun_requests )
@@ -1277,7 +1270,7 @@ def request_workflow_rerun(self, task_requests=None):
12771270 self ._get_task_state_idx (t .task_id , t .route ),
12781271 self .workflow_state .get_task (t .task_id , t .route ),
12791272 )
1280- for k , t in six . iteritems ( tasks )
1273+ for k , t in tasks . items ( )
12811274 if k in self ._collapse_task_rerun_requests (tasks )
12821275 }
12831276
@@ -1299,11 +1292,11 @@ def request_workflow_rerun(self, task_requests=None):
12991292 continuable_candidates = {
13001293 constants .TASK_STATE_ROUTE_FORMAT % (t ["id" ], str (t ["route" ])): t
13011294 for i , t in self .workflow_state .get_terminal_tasks ()
1302- if len ([k for k , v in six . iteritems ( t ["next" ]) if v ]) > 0
1295+ if len ([k for k , v in t ["next" ]. items ( ) if v ]) > 0
13031296 }
13041297
13051298 # Automatically resume all continuable candidates.
1306- for _ , task in sorted (six . iteritems ( continuable_candidates ), key = lambda x : x [0 ]):
1299+ for _ , task in sorted (continuable_candidates . items ( ), key = lambda x : x [0 ]):
13071300 # Reset terminal status for the continuable candidate.
13081301 task .pop ("term" , None )
13091302
0 commit comments