@@ -10,6 +10,14 @@ def AddToGB(self, gb, globalsDict):
1010 self .callback .__globals__ .update (globalsDict )
1111 gb .map (self .callback )
1212
13+ class GearsRemoteForeachStep ():
14+ def __init__ (self , callback ):
15+ self .callback = callback
16+
17+ def AddToGB (self , gb , globalsDict ):
18+ self .callback .__globals__ .update (globalsDict )
19+ gb .foreach (self .callback )
20+
1321class GearsRemoteFlatMapStep ():
1422 def __init__ (self , callback ):
1523 self .callback = callback
@@ -103,6 +111,13 @@ def __init__(self, arg, convertToStr, collect):
103111 def AddToGB (self , gb , globalsDict ):
104112 gb .run (self .arg , self .convertToStr , self .collect )
105113
114+ class GearsRemoteRegisterStep ():
115+ def __init__ (self , arg ):
116+ self .arg = arg
117+
118+ def AddToGB (self , gb , globalsDict ):
119+ gb .register (self .arg ) if self .arg else gb .register ()
120+
106121
107122class GearsPipe ():
108123 def __init__ (self , reader = 'KeysReader' , defaultArg = '*' ):
@@ -114,6 +129,10 @@ def map(self, callback):
114129 self .steps .append (GearsRemoteMapStep (callback ))
115130 return self
116131
132+ def foreach (self , callback ):
133+ self .steps .append (GearsRemoteForeachStep (callback ))
134+ return self
135+
117136 def flatmap (self , callback ):
118137 self .steps .append (GearsRemoteFlatMapStep (callback ))
119138 return self
@@ -157,6 +176,9 @@ def limit(self, count, offset):
157176 def run (self , arg , convertToStr , collect ):
158177 self .steps .append (GearsRemoteRunStep (arg , convertToStr , collect ))
159178
179+ def register (self , arg ):
180+ self .steps .append (GearsRemoteRegisterStep (arg ))
181+
160182
161183class GearsRemoteBuilder ():
162184 def __init__ (self , reader = 'KeysReader' , defaultArg = '*' , r = None ):
@@ -169,6 +191,10 @@ def map(self, callback):
169191 self .pipe .map (callback )
170192 return self
171193
194+ def foreach (self , callback ):
195+ self .pipe .foreach (callback )
196+ return self
197+
172198 def flatmap (self , callback ):
173199 self .pipe .flatmap (callback )
174200 return self
@@ -216,3 +242,9 @@ def run(self, arg=None, convertToStr=False, collect=True):
216242 res , errs = results
217243 res = [cloudpickle .loads (record ) for record in res ]
218244 return res , errs
245+
246+ def register (self , arg = None ):
247+ self .pipe .register (arg )
248+ selfBytes = cloudpickle .dumps (self .pipe )
249+ res = self .r .execute_command ('RG.PYEXECUTEREMOTE' , selfBytes )
250+ return res
0 commit comments