Skip to content

Commit 3c1043c

Browse files
author
Meir Shpilraien (Spielrein)
authored
Merge pull request #7 from RedisGears/fixed_to_work_with_pyexecute
Fixed
2 parents 2556c4c + 2ea9eaf commit 3c1043c

File tree

3 files changed

+166
-61
lines changed

3 files changed

+166
-61
lines changed

example.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from gearsclient import GearsRemoteBuilder as GRB
2+
from gearsclient import log, hashtag, execute, atomic
3+
import redis
4+
5+
conn = redis.Redis(host='localhost', port=6379)
6+
7+
# count for each genre how many times it appears
8+
9+
# class test:
10+
# def __init__(self):
11+
# self.count = 1
12+
13+
# def func1(x):
14+
# return test()
15+
16+
# def func2(x):
17+
# x.count += 1
18+
# return x
19+
20+
def func(x):
21+
with atomic():
22+
execute('hset', 'h', 'foo', 'bar')
23+
execute('hset', 'h', 'foo1', 'bar1')
24+
return x
25+
26+
res = GRB('ShardsIDReader',r=conn).\
27+
map(func).\
28+
run()
29+
30+
print(res)
31+
# if len(res[1]) > 0:
32+
# print(res[1])
33+
34+
# for r in res[0]:
35+
# print('%s' % str(r.count))

gearsclient/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
from .redisgears_builder import GearsRemoteBuilder
1+
from .redisgears_builder import GearsRemoteBuilder, log, gearsConfigGet, execute, atomic, hashtag

gearsclient/redisgears_builder.py

Lines changed: 130 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2,66 +2,82 @@
22
import cloudpickle
33
import pickle
44

5+
class GearsRemoteLocalGroupByStep():
6+
def __init__(self, extractor, reducer):
7+
self.extractor = extractor
8+
self.reducer = reducer
9+
10+
def AddToGB(self, gb):
11+
gb.localgroupby(self.extractor, self.reducer)
12+
13+
class GearsRemoteAccumulateStep():
14+
def __init__(self, accumulator):
15+
self.accumulator = accumulator
16+
17+
def AddToGB(self, gb):
18+
gb.accumulate(self.accumulator)
19+
20+
class GearsRemoteRepartitionStep():
21+
def __init__(self, extractor):
22+
self.extractor = extractor
23+
24+
def AddToGB(self, gb):
25+
gb.repartition(self.extractor)
26+
527
class GearsRemoteMapStep():
628
def __init__(self, callback):
729
self.callback = callback
830

9-
def AddToGB(self, gb, globalsDict):
10-
self.callback.__globals__.update(globalsDict)
31+
def AddToGB(self, gb):
1132
gb.map(self.callback)
1233

1334
class GearsRemoteForeachStep():
1435
def __init__(self, callback):
1536
self.callback = callback
1637

17-
def AddToGB(self, gb, globalsDict):
18-
self.callback.__globals__.update(globalsDict)
38+
def AddToGB(self, gb):
1939
gb.foreach(self.callback)
2040

2141
class GearsRemoteFlatMapStep():
2242
def __init__(self, callback):
2343
self.callback = callback
2444

25-
def AddToGB(self, gb, globalsDict):
26-
self.callback.__globals__.update(globalsDict)
45+
def AddToGB(self, gb):
2746
gb.flatmap(self.callback)
2847

2948
class GearsRemoteFilterStep():
3049
def __init__(self, callback):
3150
self.callback = callback
3251

33-
def AddToGB(self, gb, globalsDict):
34-
self.callback.__globals__.update(globalsDict)
52+
def AddToGB(self, gb):
3553
gb.filter(self.callback)
3654

3755
class GearsRemoteCountByStep():
3856
def __init__(self, callback):
3957
self.callback = callback
4058

41-
def AddToGB(self, gb, globalsDict):
42-
self.callback.__globals__.update(globalsDict)
59+
def AddToGB(self, gb):
4360
gb.countby(self.callback)
4461

4562
class GearsRemoteAvgByStep():
4663
def __init__(self, callback):
4764
self.callback = callback
4865

49-
def AddToGB(self, gb, globalsDict):
50-
self.callback.__globals__.update(globalsDict)
66+
def AddToGB(self, gb):
5167
gb.avg(self.callback)
5268

5369
class GearsRemoteCountStep():
5470
def __init__(self):
5571
self.callback = callback
5672

57-
def AddToGB(self, gb, globalsDict):
73+
def AddToGB(self, gb):
5874
gb.count()
5975

6076
class GearsRemoteDistinctStep():
6177
def __init__(self):
6278
self.callback = callback
6379

64-
def AddToGB(self, gb, globalsDict):
80+
def AddToGB(self, gb):
6581
gb.distinct()
6682

6783
class GearsRemoteAggregateStep():
@@ -70,9 +86,7 @@ def __init__(self, zero, seqOp, combOp):
7086
self.seqOp = seqOp
7187
self.combOp = combOp
7288

73-
def AddToGB(self, gb, globalsDict):
74-
self.seqOp.__globals__.update(globalsDict)
75-
self.combOp.__globals__.update(globalsDict)
89+
def AddToGB(self, gb):
7690
gb.aggregate(self.zero, self.seqOp, self.combOp)
7791

7892
class GearsRemoteAggregateByStep():
@@ -82,60 +96,63 @@ def __init__(self, extractor, zero, seqOp, combOp):
8296
self.seqOp = seqOp
8397
self.combOp = combOp
8498

85-
def AddToGB(self, gb, globalsDict):
86-
self.seqOp.__globals__.update(globalsDict)
87-
self.combOp.__globals__.update(globalsDict)
99+
def AddToGB(self, gb):
88100
gb.aggregate(self.extractor, self.zero, self.seqOp, self.combOp)
89101

90102
class GearsRemoteSortStep():
91103
def __init__(self, reverse):
92104
self.reverse = reverse
93105

94-
def AddToGB(self, gb, globalsDict):
106+
def AddToGB(self, gb):
95107
gb.sort(self.reverse)
96108

97109
class GearsRemoteLimitStep():
98110
def __init__(self, count, offset):
99111
self.count = count
100112
self.offset = offset
101113

102-
def AddToGB(self, gb, globalsDict):
114+
def AddToGB(self, gb):
103115
gb.limit(self.count, self.offset)
104116

105117
class GearsRemoteRunStep():
106-
def __init__(self, arg, convertToStr, collect):
118+
def __init__(self, arg, convertToStr, collect, kargs):
107119
self.arg = arg
108120
self.convertToStr = convertToStr
109121
self.collect = collect
122+
self.kargs = kargs
110123

111-
def AddToGB(self, gb, globalsDict):
112-
gb.run(self.arg, self.convertToStr, self.collect)
124+
def AddToGB(self, gb):
125+
gb.run(self.arg, self.convertToStr, self.collect, **self.kargs)
113126

114127
class GearsRemoteRegisterStep():
115-
def __init__(self, regex, mode, batch, duration,
116-
eventTypes, keyTypes, onRegistered, onFailedPolicy,
117-
onFailedRetryInterval):
118-
self.regex = regex
119-
self.mode = mode
120-
self.batch = batch
121-
self.duration = duration
122-
self.eventTypes = eventTypes
123-
self.keyTypes = keyTypes
124-
self.onRegistered = onRegistered
125-
self.onFailedPolicy = onFailedPolicy
126-
self.onFailedRetryInterval = onFailedRetryInterval
127-
128-
def AddToGB(self, gb, globalsDict):
129-
gb.register(self.regex, self.mode, self.batch, self.duration,
130-
self.eventTypes, self.keyTypes, self.onRegistered, self.onFailedPolicy,
131-
self.onFailedRetryInterval)
128+
def __init__(self, prefix, convertToStr, collect, kargs):
129+
self.prefix = prefix
130+
self.convertToStr = convertToStr
131+
self.collect = collect
132+
self.kargs = kargs
133+
134+
def AddToGB(self, gb):
135+
gb.register(self.prefix, self.convertToStr, self.collect, **self.kargs)
136+
132137

133138
class GearsPipe():
134139
def __init__(self, reader='KeysReader', defaultArg='*'):
135140
self.reader = reader
136141
self.defaultArg = defaultArg
137142
self.steps = []
138143

144+
def localgroupby(self, extractor, reducer):
145+
self.steps.append(GearsRemoteLocalGroupByStep(extractor, reducer))
146+
return self
147+
148+
def accumulate(self, accumulator):
149+
self.steps.append(GearsRemoteAccumulateStep(accumulator))
150+
return self
151+
152+
def repartition(self, extractor):
153+
self.steps.append(GearsRemoteRepartitionStep(extractor))
154+
return self
155+
139156
def map(self, callback):
140157
self.steps.append(GearsRemoteMapStep(callback))
141158
return self
@@ -184,15 +201,17 @@ def limit(self, count, offset):
184201
self.steps.append(GearsRemoteLimitStep(count, offset))
185202
return self
186203

187-
def run(self, arg, convertToStr, collect):
188-
self.steps.append(GearsRemoteRunStep(arg, convertToStr, collect))
204+
def run(self, arg, convertToStr, collect, **kargs):
205+
self.steps.append(GearsRemoteRunStep(arg, convertToStr, collect, kargs))
206+
207+
def register(self, prefix, convertToStr, collect, **kargs):
208+
self.steps.append(GearsRemoteRegisterStep(prefix, convertToStr, collect, kargs))
209+
210+
def createAndRun(self, GB):
211+
gb = GB(self.reader)
212+
for s in self.steps:
213+
s.AddToGB(gb)
189214

190-
def register(self, regex, mode, batch, duration,
191-
eventTypes, keyTypes, onRegistered, onFailedPolicy,
192-
onFailedRetryInterval):
193-
self.steps.append(GearsRemoteRegisterStep(regex, mode, batch, duration,
194-
eventTypes, keyTypes, onRegistered, onFailedPolicy,
195-
onFailedRetryInterval))
196215

197216

198217
class GearsRemoteBuilder():
@@ -202,6 +221,18 @@ def __init__(self, reader='KeysReader', defaultArg='*', r=None):
202221
self.r = r
203222
self.pipe = GearsPipe(reader, defaultArg)
204223

224+
def localgroupby(self, extractor, reducer):
225+
self.pipe.localgroupby(extractor, reducer)
226+
return self
227+
228+
def accumulate(self, accumulator):
229+
self.pipe.accumulate(accumulator)
230+
return self
231+
232+
def repartition(self, extractor):
233+
self.pipe.repartition(extractor)
234+
return self
235+
205236
def map(self, callback):
206237
self.pipe.map(callback)
207238
return self
@@ -250,20 +281,59 @@ def limit(self, count, offset=0):
250281
self.pipe.limit(count, offset)
251282
return self
252283

253-
def run(self, arg=None, convertToStr=False, collect=True):
254-
self.pipe.run(arg, convertToStr, collect)
284+
def run(self, arg=None, collect=True, **kargs):
285+
self.map(lambda x: cloudpickle.dumps(x))
286+
self.pipe.run(arg, False, collect)
255287
selfBytes = cloudpickle.dumps(self.pipe)
256-
results = self.r.execute_command('RG.PYEXECUTEREMOTE', selfBytes)
288+
serverCode = '''
289+
import cloudpickle
290+
p = cloudpickle.loads(%s)
291+
p.createAndRun(GB)
292+
''' % selfBytes
293+
results = self.r.execute_command('RG.PYEXECUTE', serverCode)
257294
res, errs = results
295+
res = [cloudpickle.loads(record) for record in res]
258296
return res, errs
259297

260-
def register(self, regex='*', mode='async', batch=1, duration=0,
261-
eventTypes=None, keyTypes=None, onRegistered=None, onFailedPolicy="continue",
262-
onFailedRetryInterval=1):
263-
self.pipe.register(regex, mode, batch, duration,
264-
eventTypes, keyTypes, onRegistered, onFailedPolicy,
265-
onFailedRetryInterval)
298+
def register(self, prefix='*', convertToStr=True, collect=True, **kargs):
299+
self.pipe.register(prefix, convertToStr, collect, **kargs)
266300
selfBytes = cloudpickle.dumps(self.pipe)
267-
res = self.r.execute_command('RG.PYEXECUTEREMOTE', selfBytes)
301+
serverCode = '''
302+
import cloudpickle
303+
p = cloudpickle.loads(%s)
304+
p.createAndRun(GB)
305+
''' % selfBytes
306+
res = self.r.execute_command('RG.PYEXECUTE', serverCode)
268307
return res
269308

309+
def log(msg, level='notice'):
310+
from redisgears import log as redisLog
311+
redisLog(msg, level=level)
312+
313+
def gearsConfigGet(key, default=None):
314+
from redisgears import config_get as redisConfigGet
315+
val = redisConfigGet(key)
316+
return val if val is not None else default
317+
318+
def execute(*args):
319+
from redisgears import executeCommand as redisExecute
320+
return redisExecute(*args)
321+
322+
def hashtag():
323+
from redisgears import getMyHashTag as redisHashtag
324+
return redisHashtag()
325+
326+
class atomic:
327+
def __init__(self):
328+
from redisgears import atomicCtx as redisAtomic
329+
self.atomic = redisAtomic()
330+
pass
331+
332+
def __enter__(self):
333+
self.atomic.__enter__()
334+
return self
335+
336+
def __exit__(self, type, value, traceback):
337+
self.atomic.__exit__()
338+
339+

0 commit comments

Comments
 (0)