Skip to content

Commit 75fa5d4

Browse files
Run and Register functions doesn't work
In Gears v0.9 (latest), Run and Register functions doesn't work. It works fine now but it is not backward compatible from previous Gears versions.
1 parent 599d2a7 commit 75fa5d4

File tree

1 file changed

+29
-10
lines changed

1 file changed

+29
-10
lines changed

gearsclient/redisgears_builder.py

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,23 @@ def AddToGB(self, gb, globalsDict):
112112
gb.run(self.arg, self.convertToStr, self.collect)
113113

114114
class GearsRemoteRegisterStep():
115-
def __init__(self, arg):
116-
self.arg = arg
115+
def __init__(self, regex, mode, batch, duration,
116+
eventTypes, keyTypes, onRegistered, onFailedPolicy,
117+
onFailedRetryInterval):
118+
self.regex = regex
119+
self.mode = mode
120+
self.batch = batch
121+
self.duration = duration
122+
self.eventTypes = eventTypes
123+
self.keyTypes = keyTypes
124+
self.onRegistered = onRegistered
125+
self.onFailedPolicy = onFailedPolicy
126+
self.onFailedRetryInterval = onFailedRetryInterval
117127

118128
def AddToGB(self, gb, globalsDict):
119-
gb.register(self.arg) if self.arg else gb.register()
120-
129+
gb.register(self.regex, self.mode, self.batch, self.duration,
130+
self.eventTypes, self.keyTypes, self.onRegistered, self.onFailedPolicy,
131+
self.onFailedRetryInterval)
121132

122133
class GearsPipe():
123134
def __init__(self, reader='KeysReader', defaultArg='*'):
@@ -176,8 +187,12 @@ def limit(self, count, offset):
176187
def run(self, arg, convertToStr, collect):
177188
self.steps.append(GearsRemoteRunStep(arg, convertToStr, collect))
178189

179-
def register(self, arg):
180-
self.steps.append(GearsRemoteRegisterStep(arg))
190+
def register(self, regex, mode, batch, duration,
191+
eventTypes, keyTypes, onRegistered, onFailedPolicy,
192+
onFailedRetryInterval):
193+
self.steps.append(GearsRemoteRegisterStep(regex, mode, batch, duration,
194+
eventTypes, keyTypes, onRegistered, onFailedPolicy,
195+
onFailedRetryInterval))
181196

182197

183198
class GearsRemoteBuilder():
@@ -220,7 +235,7 @@ def distinct(self):
220235
return self
221236

222237
def aggregate(self, zero, seqOp, combOp):
223-
self.pipe.countby(callback)
238+
self.pipe.aggregate(zero, seqOp, combOp)
224239
return self
225240

226241
def aggregateby(self, extractor, zero, seqOp, combOp):
@@ -240,11 +255,15 @@ def run(self, arg=None, convertToStr=False, collect=True):
240255
selfBytes = cloudpickle.dumps(self.pipe)
241256
results = self.r.execute_command('RG.PYEXECUTEREMOTE', selfBytes)
242257
res, errs = results
243-
res = [cloudpickle.loads(record) for record in res]
244258
return res, errs
245259

246-
def register(self, arg=None):
247-
self.pipe.register(arg)
260+
def register(self, regex='*', mode='async', batch=1, duration=0,
261+
eventTypes=None, keyTypes=None, onRegistered=None, onFailedPolicy="continue",
262+
onFailedRetryInterval=1):
263+
self.pipe.register(regex, mode, batch, duration,
264+
eventTypes, keyTypes, onRegistered, onFailedPolicy,
265+
onFailedRetryInterval)
248266
selfBytes = cloudpickle.dumps(self.pipe)
249267
res = self.r.execute_command('RG.PYEXECUTEREMOTE', selfBytes)
250268
return res
269+

0 commit comments

Comments
 (0)