@@ -393,25 +393,32 @@ def __init__(self, rdd, bsize=-1, dtype=np.ndarray, noblock=False):
393393 raise ValueError ("Only supported type for ArrayRDD is np.ndarray!" )
394394 super (ArrayRDD , self ).__init__ (rdd , bsize , dtype , noblock )
395395
396- def _on_axis (self , func , axis = None ):
397- rdd = self ._rdd . map ( lambda x : getattr ( x , func )( axis = axis ) )
396+ def __mul__ (self , other ):
397+ return self .multiply ( other )
398398
399- if axis is None :
400- return getattr (np .array (rdd .collect ()), func )()
401- elif axis == 0 :
402- return rdd .reduce (
403- lambda a , b : getattr (np .array ((a , b )), func )(axis = 0 ))
404- else :
405- return rdd .reduce (lambda a , b : np .concatenate ((a , b )))
399+ def __add__ (self , other ):
400+ return self .add (other )
401+
402+ def __sub__ (self , other ):
403+ return self .subtract (other )
404+
405+ def __div__ (self , other ):
406+ return self .divide (other )
407+
408+ __truediv__ = __div__
409+
410+ def __pow__ (self , other ):
411+ return self .power (other )
412+
413+ def __floordiv__ (self , other ):
414+ return self .floor_divide (other )
415+
416+ def __mod__ (self , other ):
417+ return self .mod (other )
406418
407419 def tosparse (self ):
408420 return SparseRDD (self ._rdd .map (lambda x : sp .csr_matrix (x )))
409421
410- def dot (self , other ):
411- # TODO naive dot implementation with another ArrayRDD
412- rdd = self ._rdd .map (lambda x : x .dot (other ))
413- return ArrayRDD (rdd , bsize = self .bsize , noblock = True )
414-
415422 def flatten (self ):
416423 return self .map (lambda x : x .flatten ())
417424
@@ -424,14 +431,42 @@ def max(self, axis=None):
424431 def prod (self , axis = None ):
425432 return self ._on_axis ('prod' , axis )
426433
434+ def dot (self , other ):
435+ return self ._on_other ('dot' , other )
427436
428- class SparseRDD (BlockRDD , ArrayLikeRDDMixin ):
437+ def add (self , other ):
438+ return self ._on_other ('add' , other )
429439
430- def __init__ (self , rdd , bsize = - 1 , dtype = sp .spmatrix , noblock = False ):
431- if dtype is not sp .spmatrix :
432- raise ValueError ("Only supported type for SparseRDD is"
433- " sp.spmatrix!" )
434- super (SparseRDD , self ).__init__ (rdd , bsize , dtype , noblock )
440+ def subtract (self , other ):
441+ return self ._on_other ('subtract' , other )
442+
443+ def multiply (self , other ):
444+ return self ._on_other ('multiply' , other )
445+
446+ def divide (self , other ):
447+ return self ._on_other ('divide' , other )
448+
449+ def power (self , other ):
450+ return self ._on_other ('power' , other )
451+
452+ def floor_divide (self , other ):
453+ return self ._on_other ('floor_divide' , other )
454+
455+ def true_divide (self , other ):
456+ return self ._on_other ('true_divide' , other )
457+
458+ def mod (self , other ):
459+ return self ._on_other ('mod' , other )
460+
461+ def fmod (self , other ):
462+ return self ._on_other ('fmod' , other )
463+
464+ def remainder (self , other ):
465+ return self ._on_other ('remainder' , other )
466+
467+ def _on_other (self , func , other ):
468+ rdd = self ._rdd .map (lambda x : getattr (np , func )(x , other ))
469+ return ArrayRDD (rdd , noblock = True )
435470
436471 def _on_axis (self , func , axis = None ):
437472 rdd = self ._rdd .map (lambda x : getattr (x , func )(axis = axis ))
@@ -440,9 +475,18 @@ def _on_axis(self, func, axis=None):
440475 return getattr (np .array (rdd .collect ()), func )()
441476 elif axis == 0 :
442477 return rdd .reduce (
443- lambda a , b : getattr (sp . vstack ((a , b )), func )(axis = 0 ))
478+ lambda a , b : getattr (np . array ((a , b )), func )(axis = 0 ))
444479 else :
445- return rdd .reduce (lambda a , b : sp .vstack ((a , b )))
480+ return rdd .reduce (lambda a , b : np .concatenate ((a , b )))
481+
482+
483+ class SparseRDD (BlockRDD , ArrayLikeRDDMixin ):
484+
485+ def __init__ (self , rdd , bsize = - 1 , dtype = sp .spmatrix , noblock = False ):
486+ if dtype is not sp .spmatrix :
487+ raise ValueError ("Only supported type for SparseRDD is"
488+ " sp.spmatrix!" )
489+ super (SparseRDD , self ).__init__ (rdd , bsize , dtype , noblock )
446490
447491 def toarray (self ):
448492 """Returns the data as numpy.array from each partition."""
@@ -466,6 +510,17 @@ def min(self, axis=None):
466510 def max (self , axis = None ):
467511 return self ._on_axis ('max' , axis )
468512
513+ def _on_axis (self , func , axis = None ):
514+ rdd = self ._rdd .map (lambda x : getattr (x , func )(axis = axis ))
515+
516+ if axis is None :
517+ return getattr (np .array (rdd .collect ()), func )()
518+ elif axis == 0 :
519+ return rdd .reduce (
520+ lambda a , b : getattr (sp .vstack ((a , b )), func )(axis = 0 ))
521+ else :
522+ return rdd .reduce (lambda a , b : sp .vstack ((a , b )))
523+
469524
470525class DictRDD (BlockRDD ):
471526
0 commit comments