33import base64
44import importlib
55import json
6+ import uuid
7+
8+ from eb_sqs import settings
69
710try :
8- import cPickle as pickle
9- except :
10- import pickle
11+ import cPickle as pickle
12+ except Exception :
13+ import pickle
1114
1215
1316class WorkerTask (object ):
@@ -74,15 +77,15 @@ def copy(self, use_serialization):
7477
7578 @staticmethod
7679 def _pickle_args (args ):
77- # type: (dict ) -> unicode
80+ # type: (Any ) -> unicode
7881 return base64 .b64encode (pickle .dumps (args , pickle .HIGHEST_PROTOCOL )).decode ('utf-8' )
7982
8083 @staticmethod
8184 def deserialize (msg ):
8285 # type: (unicode) -> WorkerTask
8386 task = json .loads (msg )
8487
85- id = task [ 'id' ]
88+ id = task . get ( 'id' , str ( uuid . uuid4 ()))
8689 group_id = task .get ('groupId' )
8790
8891 abs_func_name = task ['func' ]
@@ -93,11 +96,15 @@ def deserialize(msg):
9396 func = getattr (func_module , func_name )
9497
9598 use_pickle = task .get ('pickle' , False )
96- queue = task ['queue' ]
97- args = WorkerTask ._unpickle_args (task ['args' ]) if use_pickle else task ['args' ]
99+ queue = task .get ('queue' , settings .DEFAULT_QUEUE )
100+
101+ task_args = task .get ('args' , [])
102+ args = WorkerTask ._unpickle_args (task_args ) if use_pickle else task_args
103+
98104 kwargs = WorkerTask ._unpickle_args (task ['kwargs' ]) if use_pickle else task ['kwargs' ]
99- max_retries = task ['maxRetries' ]
100- retry = task ['retry' ]
105+
106+ max_retries = task .get ('maxRetries' , settings .DEFAULT_MAX_RETRIES )
107+ retry = task .get ('retry' , 0 )
101108 retry_id = task .get ('retryId' )
102109
103110 return WorkerTask (id , group_id , queue , func , args , kwargs , max_retries , retry , retry_id , use_pickle )
0 commit comments