@@ -112,12 +112,23 @@ def AddToGB(self, gb, globalsDict):
112112 gb .run (self .arg , self .convertToStr , self .collect )
113113
114114class GearsRemoteRegisterStep ():
115- def __init__ (self , arg ):
116- self .arg = arg
115+ def __init__ (self , regex , mode , batch , duration ,
116+ eventTypes , keyTypes , onRegistered , onFailedPolicy ,
117+ onFailedRetryInterval ):
118+ self .regex = regex
119+ self .mode = mode
120+ self .batch = batch
121+ self .duration = duration
122+ self .eventTypes = eventTypes
123+ self .keyTypes = keyTypes
124+ self .onRegistered = onRegistered
125+ self .onFailedPolicy = onFailedPolicy
126+ self .onFailedRetryInterval = onFailedRetryInterval
117127
118128 def AddToGB (self , gb , globalsDict ):
119- gb .register (self .arg ) if self .arg else gb .register ()
120-
129+ gb .register (self .regex , self .mode , self .batch , self .duration ,
130+ self .eventTypes , self .keyTypes , self .onRegistered , self .onFailedPolicy ,
131+ self .onFailedRetryInterval )
121132
122133class GearsPipe ():
123134 def __init__ (self , reader = 'KeysReader' , defaultArg = '*' ):
@@ -176,8 +187,12 @@ def limit(self, count, offset):
176187 def run (self , arg , convertToStr , collect ):
177188 self .steps .append (GearsRemoteRunStep (arg , convertToStr , collect ))
178189
179- def register (self , arg ):
180- self .steps .append (GearsRemoteRegisterStep (arg ))
190+ def register (self , regex , mode , batch , duration ,
191+ eventTypes , keyTypes , onRegistered , onFailedPolicy ,
192+ onFailedRetryInterval ):
193+ self .steps .append (GearsRemoteRegisterStep (regex , mode , batch , duration ,
194+ eventTypes , keyTypes , onRegistered , onFailedPolicy ,
195+ onFailedRetryInterval ))
181196
182197
183198class GearsRemoteBuilder ():
@@ -240,11 +255,15 @@ def run(self, arg=None, convertToStr=False, collect=True):
240255 selfBytes = cloudpickle .dumps (self .pipe )
241256 results = self .r .execute_command ('RG.PYEXECUTEREMOTE' , selfBytes )
242257 res , errs = results
243- res = [cloudpickle .loads (record ) for record in res ]
244258 return res , errs
245259
246- def register (self , arg = None ):
247- self .pipe .register (arg )
260+ def register (self , regex = '*' , mode = 'async' , batch = 1 , duration = 0 ,
261+ eventTypes = None , keyTypes = None , onRegistered = None , onFailedPolicy = "continue" ,
262+ onFailedRetryInterval = 1 ):
263+ self .pipe .register (regex , mode , batch , duration ,
264+ eventTypes , keyTypes , onRegistered , onFailedPolicy ,
265+ onFailedRetryInterval )
248266 selfBytes = cloudpickle .dumps (self .pipe )
249267 res = self .r .execute_command ('RG.PYEXECUTEREMOTE' , selfBytes )
250268 return res
269+
0 commit comments