@@ -8,6 +8,8 @@ import com.techmonad.pipeline.workflow.{Sink, Source, WorkFlow}
88import org .apache .spark .SparkContext
99import org .apache .spark .rdd .RDD
1010
11+ import scala .util .{Failure , Success , Try }
12+
1113trait DataPipeline {
1214
1315 def run (): Unit
@@ -17,13 +19,20 @@ trait DataPipeline {
1719object DataPipeline {
1820
1921 def apply (workFlow : WorkFlow )(implicit sc : SparkContext ): DataPipeline = {
20- val sourceRDD = applySource(workFlow.source)
21- val validatedRDD = applyValidation(sourceRDD, workFlow.validations)
22- val transformedRDD = applyTransformation(validatedRDD, workFlow.transformations)
23- val schemaValidatedRDD = applySchemaValidation(transformedRDD, workFlow.schemaValidation)
24- new DataPipeline {
25- override def run (): Unit =
26- applySink(schemaValidatedRDD, workFlow.sink).run()
22+ Try {
23+ val sourceRDD = applySource(workFlow.source)
24+ val validatedRDD = applyValidation(sourceRDD, workFlow.validations)
25+ val transformedRDD = applyTransformation(validatedRDD, workFlow.transformations)
26+ val schemaValidatedRDD = applySchemaValidation(transformedRDD, workFlow.schemaValidation)
27+ applySink(schemaValidatedRDD, workFlow.sink)
28+ } match {
29+ case Success (sink) =>
30+ new DataPipeline {
31+ override def run (): Unit = sink.run()
32+ }
33+ case Failure (th) =>
34+ println(" Error: Invalid workflow " + workFlow) // TODO: use logger
35+ throw th
2736 }
2837 }
2938
@@ -54,10 +63,9 @@ object DataPipeline {
5463 applyValidation(rdd, validations)
5564 }
5665
57- private def applySink (rdd : RDD [Record ], sink : Sink ) = {
66+ private def applySink (rdd : RDD [Record ], sink : Sink ) =
5867 sink.`type` match {
5968 case " ES" => new ESPersistenceRDD (rdd)
6069 }
61-
62- }
63- }
70+
71+ }
0 commit comments