22import cloudpickle
33import pickle
44
5+ class GearsRemoteRepartitionStep ():
6+ def __init__ (self , extractor ):
7+ self .extractor = extractor
8+
9+ def AddToGB (self , gb ):
10+ gb .repartition (self .extractor )
11+
512class GearsRemoteMapStep ():
613 def __init__ (self , callback ):
714 self .callback = callback
815
9- def AddToGB (self , gb , globalsDict ):
10- self .callback .__globals__ .update (globalsDict )
16+ def AddToGB (self , gb ):
1117 gb .map (self .callback )
1218
1319class GearsRemoteForeachStep ():
1420 def __init__ (self , callback ):
1521 self .callback = callback
1622
17- def AddToGB (self , gb , globalsDict ):
18- self .callback .__globals__ .update (globalsDict )
23+ def AddToGB (self , gb ):
1924 gb .foreach (self .callback )
2025
2126class GearsRemoteFlatMapStep ():
2227 def __init__ (self , callback ):
2328 self .callback = callback
2429
25- def AddToGB (self , gb , globalsDict ):
26- self .callback .__globals__ .update (globalsDict )
30+ def AddToGB (self , gb ):
2731 gb .flatmap (self .callback )
2832
2933class GearsRemoteFilterStep ():
3034 def __init__ (self , callback ):
3135 self .callback = callback
3236
33- def AddToGB (self , gb , globalsDict ):
34- self .callback .__globals__ .update (globalsDict )
37+ def AddToGB (self , gb ):
3538 gb .filter (self .callback )
3639
3740class GearsRemoteCountByStep ():
3841 def __init__ (self , callback ):
3942 self .callback = callback
4043
41- def AddToGB (self , gb , globalsDict ):
42- self .callback .__globals__ .update (globalsDict )
44+ def AddToGB (self , gb ):
4345 gb .countby (self .callback )
4446
4547class GearsRemoteAvgByStep ():
4648 def __init__ (self , callback ):
4749 self .callback = callback
4850
49- def AddToGB (self , gb , globalsDict ):
50- self .callback .__globals__ .update (globalsDict )
51+ def AddToGB (self , gb ):
5152 gb .avg (self .callback )
5253
5354class GearsRemoteCountStep ():
5455 def __init__ (self ):
5556 self .callback = callback
5657
57- def AddToGB (self , gb , globalsDict ):
58+ def AddToGB (self , gb ):
5859 gb .count ()
5960
6061class GearsRemoteDistinctStep ():
6162 def __init__ (self ):
6263 self .callback = callback
6364
64- def AddToGB (self , gb , globalsDict ):
65+ def AddToGB (self , gb ):
6566 gb .distinct ()
6667
6768class GearsRemoteAggregateStep ():
@@ -70,9 +71,7 @@ def __init__(self, zero, seqOp, combOp):
7071 self .seqOp = seqOp
7172 self .combOp = combOp
7273
73- def AddToGB (self , gb , globalsDict ):
74- self .seqOp .__globals__ .update (globalsDict )
75- self .combOp .__globals__ .update (globalsDict )
74+ def AddToGB (self , gb ):
7675 gb .aggregate (self .zero , self .seqOp , self .combOp )
7776
7877class GearsRemoteAggregateByStep ():
@@ -82,60 +81,55 @@ def __init__(self, extractor, zero, seqOp, combOp):
8281 self .seqOp = seqOp
8382 self .combOp = combOp
8483
85- def AddToGB (self , gb , globalsDict ):
86- self .seqOp .__globals__ .update (globalsDict )
87- self .combOp .__globals__ .update (globalsDict )
84+ def AddToGB (self , gb ):
8885 gb .aggregate (self .extractor , self .zero , self .seqOp , self .combOp )
8986
9087class GearsRemoteSortStep ():
9188 def __init__ (self , reverse ):
9289 self .reverse = reverse
9390
94- def AddToGB (self , gb , globalsDict ):
91+ def AddToGB (self , gb ):
9592 gb .sort (self .reverse )
9693
9794class GearsRemoteLimitStep ():
9895 def __init__ (self , count , offset ):
9996 self .count = count
10097 self .offset = offset
10198
102- def AddToGB (self , gb , globalsDict ):
99+ def AddToGB (self , gb ):
103100 gb .limit (self .count , self .offset )
104101
105102class GearsRemoteRunStep ():
106- def __init__ (self , arg , convertToStr , collect ):
103+ def __init__ (self , arg , convertToStr , collect , kargs ):
107104 self .arg = arg
108105 self .convertToStr = convertToStr
109106 self .collect = collect
107+ self .kargs = kargs
110108
111- def AddToGB (self , gb , globalsDict ):
112- gb .run (self .arg , self .convertToStr , self .collect )
109+ def AddToGB (self , gb ):
110+ gb .run (self .arg , self .convertToStr , self .collect , ** self . kargs )
113111
114112class GearsRemoteRegisterStep ():
115- def __init__ (self , regex , mode , batch , duration ,
116- eventTypes , keyTypes , onRegistered , onFailedPolicy ,
117- onFailedRetryInterval ):
118- self .regex = regex
119- self .mode = mode
120- self .batch = batch
121- self .duration = duration
122- self .eventTypes = eventTypes
123- self .keyTypes = keyTypes
124- self .onRegistered = onRegistered
125- self .onFailedPolicy = onFailedPolicy
126- self .onFailedRetryInterval = onFailedRetryInterval
127-
128- def AddToGB (self , gb , globalsDict ):
129- gb .register (self .regex , self .mode , self .batch , self .duration ,
130- self .eventTypes , self .keyTypes , self .onRegistered , self .onFailedPolicy ,
131- self .onFailedRetryInterval )
113+ def __init__ (self , prefix , convertToStr , collect , kargs ):
114+ self .prefix = prefix
115+ self .convertToStr = convertToStr
116+ self .collect = collect
117+ self .kargs = kargs
118+
119+ def AddToGB (self , gb ):
120+ gb .register (self .prefix , self .convertToStr , self .collect , ** self .kargs )
121+
132122
133123class GearsPipe ():
134124 def __init__ (self , reader = 'KeysReader' , defaultArg = '*' ):
135125 self .reader = reader
136126 self .defaultArg = defaultArg
137127 self .steps = []
138128
129+ def repartition (self , extractor ):
130+ self .steps .append (GearsRemoteRepartitionStep (extractor ))
131+ return self
132+
139133 def map (self , callback ):
140134 self .steps .append (GearsRemoteMapStep (callback ))
141135 return self
@@ -184,15 +178,17 @@ def limit(self, count, offset):
184178 self .steps .append (GearsRemoteLimitStep (count , offset ))
185179 return self
186180
187- def run (self , arg , convertToStr , collect ):
188- self .steps .append (GearsRemoteRunStep (arg , convertToStr , collect ))
181+ def run (self , arg , convertToStr , collect , ** kargs ):
182+ self .steps .append (GearsRemoteRunStep (arg , convertToStr , collect , kargs ))
183+
184+ def register (self , prefix , convertToStr , collect , ** kargs ):
185+ self .steps .append (GearsRemoteRegisterStep (prefix , convertToStr , collect , kargs ))
186+
187+ def createAndRun (self , GB ):
188+ gb = GB (self .reader )
189+ for s in self .steps :
190+ s .AddToGB (gb )
189191
190- def register (self , regex , mode , batch , duration ,
191- eventTypes , keyTypes , onRegistered , onFailedPolicy ,
192- onFailedRetryInterval ):
193- self .steps .append (GearsRemoteRegisterStep (regex , mode , batch , duration ,
194- eventTypes , keyTypes , onRegistered , onFailedPolicy ,
195- onFailedRetryInterval ))
196192
197193
198194class GearsRemoteBuilder ():
@@ -202,6 +198,10 @@ def __init__(self, reader='KeysReader', defaultArg='*', r=None):
202198 self .r = r
203199 self .pipe = GearsPipe (reader , defaultArg )
204200
201+ def repartition (self , extractor ):
202+ self .pipe .repartition (extractor )
203+ return self
204+
205205 def map (self , callback ):
206206 self .pipe .map (callback )
207207 return self
@@ -250,20 +250,59 @@ def limit(self, count, offset=0):
250250 self .pipe .limit (count , offset )
251251 return self
252252
253- def run (self , arg = None , convertToStr = False , collect = True ):
254- self .pipe .run (arg , convertToStr , collect )
253+ def run (self , arg = None , collect = True , ** kargs ):
254+ self .map (lambda x : cloudpickle .dumps (x ))
255+ self .pipe .run (arg , False , collect )
255256 selfBytes = cloudpickle .dumps (self .pipe )
256- results = self .r .execute_command ('RG.PYEXECUTEREMOTE' , selfBytes )
257+ serverCode = '''
258+ import cloudpickle
259+ p = cloudpickle.loads(%s)
260+ p.createAndRun(GB)
261+ ''' % selfBytes
262+ results = self .r .execute_command ('RG.PYEXECUTE' , serverCode )
257263 res , errs = results
264+ res = [cloudpickle .loads (record ) for record in res ]
258265 return res , errs
259266
260- def register (self , regex = '*' , mode = 'async' , batch = 1 , duration = 0 ,
261- eventTypes = None , keyTypes = None , onRegistered = None , onFailedPolicy = "continue" ,
262- onFailedRetryInterval = 1 ):
263- self .pipe .register (regex , mode , batch , duration ,
264- eventTypes , keyTypes , onRegistered , onFailedPolicy ,
265- onFailedRetryInterval )
267+ def register (self , prefix = '*' , convertToStr = True , collect = True , ** kargs ):
268+ self .pipe .register (prefix , convertToStr , collect , ** kargs )
266269 selfBytes = cloudpickle .dumps (self .pipe )
267- res = self .r .execute_command ('RG.PYEXECUTEREMOTE' , selfBytes )
270+ serverCode = '''
271+ import cloudpickle
272+ p = cloudpickle.loads(%s)
273+ p.createAndRun(GB)
274+ ''' % selfBytes
275+ res = self .r .execute_command ('RG.PYEXECUTE' , serverCode )
268276 return res
269277
278+ def log (msg , level = 'notice' ):
279+ from redisgears import log as redisLog
280+ redisLog (msg , level = level )
281+
282+ def gearsConfigGet (key , default = None ):
283+ from redisgears import config_get as redisConfigGet
284+ val = redisConfigGet (key )
285+ return val if val is not None else default
286+
287+ def execute (* args ):
288+ from redisgears import executeCommand as redisExecute
289+ return redisExecute (* args )
290+
291+ def hashtag ():
292+ from redisgears import getMyHashTag as redisHashtag
293+ return redisHashtag ()
294+
295+ class atomic :
296+ def __init__ (self ):
297+ from redisgears import atomicCtx as redisAomic
298+ self .atomic = redisAomic ()
299+ pass
300+
301+ def __enter__ (self ):
302+ self .atomic .__enter__ ()
303+ return self
304+
305+ def __exit__ (self , type , value , traceback ):
306+ self .atomic .__exit__ ()
307+
308+
0 commit comments