@@ -20,14 +20,19 @@ package io.github.mandar2812.dynaml.pipes
2020
2121import scalaxy .streams .optimize
2222
23+ trait DataPipeConvertible [- Source , + Destination ] {
24+ def toPipe : (Source ) => Destination
25+ }
26+
27+
2328/**
2429 * Top level trait representing an
2530 * abstract pipe that defines a transformation
2631 * between two data types, i.e. [[Source ]] and [[Destination ]]
2732 * @author mandar2812 on 18/11/15.
2833 *
2934 * */
30- trait DataPipe [- Source , + Destination ] extends Serializable {
35+ trait DataPipe [- Source , + Destination ] extends DataPipeConvertible [ Source , Destination ] with Serializable {
3136
3237 self =>
3338
@@ -47,11 +52,11 @@ trait DataPipe[-Source, +Destination] extends Serializable {
4752 * [[Source ]] -> [[Further ]]
4853 *
4954 * */
50- def > [Further ](that : DataPipe [Destination , Further ]):
51- DataPipe [ Source , Further ] = {
52- val runFunc = ( d : Source ) => that.run(self.run(d))
53- DataPipe (runFunc )
54- }
55+ def > [Further ](that : DataPipeConvertible [Destination , Further ]) =
56+ DataPipe (( d : Source ) => that.toPipe(self.run(d)))
57+
58+ /* def >[Result1, Result2](that: BifurcationPipe[Destination, Result1, Result2] )
59+ : BifurcationPipe[Source, Result1, Result2] = DataPipe((x: Source) => that.run(self.run(x))) */
5560
5661 /**
5762 * Represents the composition of two
@@ -72,6 +77,8 @@ trait DataPipe[-Source, +Destination] extends Serializable {
7277 def >-< [OtherSource , OtherDestination ](that : DataPipe [OtherSource , OtherDestination ])
7378 : DataPipe2 [Source , OtherSource , (Destination , OtherDestination )] =
7479 DataPipe2 ((d1 : Source , d2 : OtherSource ) => (self(d1), that(d2)))
80+
81+ override def toPipe : Source => Destination = self.run _
7582}
7683
7784object DataPipe {
@@ -87,8 +94,7 @@ object DataPipe {
8794 }
8895 }
8996
90- def apply [S1 , D1 , S2 , D2 ](pipe1 : DataPipe [S1 , D1 ],
91- pipe2 : DataPipe [S2 , D2 ]): ParallelPipe [S1 , D1 , S2 , D2 ] =
97+ def apply [S1 , D1 , S2 , D2 ](pipe1 : DataPipe [S1 , D1 ], pipe2 : DataPipe [S2 , D2 ]): ParallelPipe [S1 , D1 , S2 , D2 ] =
9298 ParallelPipe (pipe1.run, pipe2.run)
9399
94100 def apply [S , D1 , D2 ](func : (S ) => (D1 , D2 )):
@@ -106,8 +112,6 @@ object DataPipe {
106112 }
107113}
108114
109-
110-
111115trait ParallelPipe [- Source1 , + Result1 , - Source2 , + Result2 ]
112116 extends DataPipe [(Source1 , Source2 ), (Result1 , Result2 )] {
113117
@@ -133,6 +137,14 @@ object ParallelPipe {
133137trait BifurcationPipe [- Source , + Result1 , + Result2 ]
134138 extends DataPipe [Source , (Result1 , Result2 )] {
135139
140+ self =>
141+
142+ def > [FinalResult ](other : DataPipe2 [Result1 , Result2 , FinalResult ]): DataPipe [Source , FinalResult ] =
143+ DataPipe ((input : Source ) => {
144+ val (x, y) = self.run(input)
145+ other.run(x, y)
146+ })
147+
136148}
137149
138150trait SideEffectPipe [I ] extends DataPipe [I , Unit ] {
@@ -141,12 +153,12 @@ trait SideEffectPipe[I] extends DataPipe[I, Unit] {
141153
142154object BifurcationPipe {
143155
144- def apply [Source ,
145- Destination1 ,
146- Destination2 ](pipe1 : DataPipe [Source , Destination1 ],
147- pipe2 : DataPipe [Source , Destination2 ]):
148- BifurcationPipe [Source , Destination1 , Destination2 ] = {
156+ def apply [Source , Destination1 , Destination2 ](f : (Source ) => (Destination1 , Destination2 )) = DataPipe (f)
149157
158+ def apply [Source , Destination1 , Destination2 ](
159+ pipe1 : DataPipe [Source , Destination1 ],
160+ pipe2 : DataPipe [Source , Destination2 ]):
161+ BifurcationPipe [Source , Destination1 , Destination2 ] = {
150162 DataPipe ((x : Source ) => (pipe1.run(x), pipe2.run(x)))
151163 }
152164}
0 commit comments