Skip to content

Commit 6db1a5c

Browse files
committed
Update pipeline methods
1 parent f1c4f9a commit 6db1a5c

File tree

8 files changed

+4044
-15
lines changed

8 files changed

+4044
-15
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ scalaVersion := "2.11.8"
77

88
libraryDependencies ++= Seq(
99
"org.apache.spark" %% "spark-core" % "2.1.0",
10-
"org.elasticsearch" %% "elasticsearch-spark-20" % "5.4.0",
10+
"org.elasticsearch" %% "elasticsearch-spark-20" % "5.6.0",
1111
"edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" artifacts(Artifact("stanford-corenlp", "models"), Artifact("stanford-corenlp")),
1212
"ch.qos.logback" % "logback-classic" % "1.2.3",
1313
"org.json4s" %% "json4s-native" % "3.5.0",

ipl-tweet.csv

Lines changed: 4002 additions & 0 deletions
Large diffs are not rendered by default.

src/main/scala/com/techmonad/pipeline/DataPipeline.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ object DataPipeline {
2323
val sourceRDD = applySource(workFlow.source)
2424
val validatedRDD = applyValidation(sourceRDD, workFlow.validations)
2525
val transformedRDD = applyTransformation(validatedRDD, workFlow.transformations)
26-
val schemaValidatedRDD = applySchemaValidation(transformedRDD, workFlow.schemaValidation)
26+
val schemaValidatedRDD = applySchemaValidation(transformedRDD, workFlow.schemaValidations)
2727
applySink(schemaValidatedRDD, workFlow.sink)
2828
} match {
2929
case Success(sink) =>
@@ -37,7 +37,7 @@ object DataPipeline {
3737
}
3838

3939
private def applySource(source: Source)(implicit sc: SparkContext) = {
40-
CSVReader.read(source.dir)
40+
CSVReader.read(source.path)
4141
}
4242

4343
private def applyValidation(rdd: RDD[Record], validations: List[String]): RDD[Record] =
@@ -67,5 +67,5 @@ object DataPipeline {
6767
sink.`type` match {
6868
case "ES" => new ESPersistenceRDD(rdd)
6969
}
70-
70+
7171
}

src/main/scala/com/techmonad/pipeline/RunDataPipeline.scala

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,38 @@ import com.techmonad.pipeline.workflow.WorkFlow
77
object RunDataPipeline extends App with SparkContextProvider with JsonHelper {
88

99
val workFlowJson =
10-
if (args.length < 1)
11-
throw new IllegalArgumentException("Data directory and workflow json are required")
12-
else
13-
args(0)
10+
"""
11+
|{
12+
| "source": {
13+
| "type": "CSV",
14+
| "path": "ipl-tweet.csv",
15+
| "meta":{"text_field":"text","date_field": "date","author_field":"author_name" }
16+
| },
17+
|
18+
| "validations": [ "COLUMN_VALIDATION", "FIELD_VALIDATION" ],
19+
|
20+
| "transformations": ["SENTIMENT_ANALYSIS" ],
21+
|
22+
| "schemaValidations": [ ],
23+
|
24+
| "sink": {
25+
| "type": "ES",
26+
| "meta":{ "index": "data_index","type": "twitter" }
27+
| }
28+
|}
29+
""".stripMargin
30+
31+
32+
/* if (args.length < 1)
33+
throw new IllegalArgumentException("Data directory and workflow json are required")
34+
else
35+
args(0)*/
1436

1537
val workFlow = parse(workFlowJson).extract[WorkFlow]
1638

1739
DataPipeline(workFlow).run
40+
41+
sc.stop()
42+
1843
}
1944

src/main/scala/com/techmonad/pipeline/transformation/sentiment/NLPSentimentAnalyzer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@ import edu.stanford.nlp.sentiment.SentimentCoreAnnotations
1313
import scala.collection.convert.wrapAll._
1414

1515

16-
object SentimentAnalyzer extends Transformation {
16+
object SentimentAnalyzer extends Transformation with Serializable{
1717

1818
override def transform(record: Record): Record =
1919
if (record.status != Status.ERROR) {
2020
val sentiment: String = NLPSentimentAnalyzer.getSentiment(record.data("text").toString)
21+
println("Analyzing Sentiment........... " + sentiment)
2122
record.copy(data = record.data + ("sentiment" -> sentiment))
2223
} else {
2324
record

src/main/scala/com/techmonad/pipeline/util/SparkContextProvider.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ trait SparkContextProvider {
77

88
val conf = new SparkConf().setMaster("local[*]").setAppName("DataPipeline")
99
implicit val sc = new SparkContext(conf)
10+
sc.setLogLevel("WARN")
1011

1112

1213
}

src/main/scala/com/techmonad/pipeline/validation/source/MandatoryColumnValidation.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@ import com.techmonad.pipeline.Record
44
import com.techmonad.pipeline.util.Status
55
import com.techmonad.pipeline.validation.Validation
66

7-
object MandatoryColumnValidation extends Validation {
7+
object MandatoryColumnValidation extends Validation with Serializable{
88

99

1010
override def name: String = "COLUMN_VALIDATION"
1111

1212
override def validate(record: Record): Record =
1313
record.data.get("text") match {
1414
case Some(text: String) if (text.trim.nonEmpty) =>
15-
record.data.get("date") match {
15+
record.data.get("date") match {
1616
case Some(date: String) if (date.trim.nonEmpty) =>
1717
record
1818
case None =>

src/main/scala/com/techmonad/pipeline/workflow/WorkFlows.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ case class WorkFlow(
44
source: Source,
55
validations: List[String],
66
transformations: List[String],
7-
schemaValidation: List[String],
7+
schemaValidations: List[String],
88
sink: Sink
99
)
1010

11-
case class Source(`type`: String, dir: String, meta: Map[String, String])
11+
case class Source(`type`: String, path: String, meta: Map[String, String])
1212

1313
case class Sink(`type`: String, meta: Map[String, String])
1414

@@ -18,7 +18,7 @@ case class Sink(`type`: String, meta: Map[String, String])
1818
|{
1919
| "source": {
2020
| "type": "CSV",
21-
| "dir": "s3://data/bucket_name",
21+
| "path": "/home/satendra/decooda/testing-csv",
2222
| "meta":{"text_field":"text","date_field": "date","author_field":"author_name" }
2323
| },
2424
|
@@ -29,7 +29,7 @@ case class Sink(`type`: String, meta: Map[String, String])
2929
| "schemaValidation": [ "DATA_MODEL_VALIDATION" ],
3030
|
3131
| "sink": {
32-
| "storage_type": "ES",
32+
| "type": "ES",
3333
| "meta":{ "index": "data_index","type": "twitter" }
3434
| }
3535
|}

0 commit comments

Comments
 (0)