Skip to content

Commit 94e3993

Browse files
added accumulate and localgroupby
1 parent 40aa7ab commit 94e3993

File tree

1 file changed

+31
-0
lines changed

1 file changed

+31
-0
lines changed

gearsclient/redisgears_builder.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,21 @@
22
import cloudpickle
33
import pickle
44

5+
class GearsRemoteLocalGroupByStep():
6+
def __init__(self, extractor, reducer):
7+
self.extractor = extractor
8+
self.reducer = reducer
9+
10+
def AddToGB(self, gb):
11+
gb.localgroupby(self.extractor, self.reducer)
12+
13+
class GearsRemoteAccumulateStep():
14+
def __init__(self, accumulator):
15+
self.accumulator = accumulator
16+
17+
def AddToGB(self, gb):
18+
gb.accumulate(self.accumulator)
19+
520
class GearsRemoteRepartitionStep():
621
def __init__(self, extractor):
722
self.extractor = extractor
@@ -126,6 +141,14 @@ def __init__(self, reader='KeysReader', defaultArg='*'):
126141
self.defaultArg = defaultArg
127142
self.steps = []
128143

144+
def localgroupby(self, extractor, reducer):
145+
self.steps.append(GearsRemoteLocalGroupByStep(extractor, reducer))
146+
return self
147+
148+
def accumulate(self, accumulator):
149+
self.steps.append(GearsRemoteAccumulateStep(accumulator))
150+
return self
151+
129152
def repartition(self, extractor):
130153
self.steps.append(GearsRemoteRepartitionStep(extractor))
131154
return self
@@ -198,6 +221,14 @@ def __init__(self, reader='KeysReader', defaultArg='*', r=None):
198221
self.r = r
199222
self.pipe = GearsPipe(reader, defaultArg)
200223

224+
def localgroupby(self, extractor, reducer):
225+
self.pipe.localgroupby(extractor, reducer)
226+
return self
227+
228+
def accumulate(self, accumulator):
229+
self.pipe.accumulate(accumulator)
230+
return self
231+
201232
def repartition(self, extractor):
202233
self.pipe.repartition(extractor)
203234
return self

0 commit comments

Comments
 (0)