Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ case class StorageConfig(store: String, container: String, fileName: String, acc

case class OnDemandJobRequest(request_id: String, request_data : String,download_urls :List[String], status: String)


case class MergeScriptConfig(`type`: String, id: String, frequency: String, basePath: String, rollup: Integer, rollupAge: Option[String] = None, rollupCol: Option[String] = None, rollupRange: Option[Integer] = None,
merge: MergeFiles, container: String, postContainer: Option[String] = None, deltaFileAccess: Option[Boolean] = Option(true), reportFileAccess: Option[Boolean] = Option(true))
case class MergeFiles(files: List[Map[String, String]], dims: List[String])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename as MergeConfig. This is no longer a script

@scala.beans.BeanInfo
case class DruidOutput(t: Map[String, Any]) extends Map[String,Any] with Input with AlgoInput with AlgoOutput with Output {
private val internalMap = t
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package org.ekstep.analytics.framework.util

import org.apache.commons.io.FilenameUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext
import org.apache.spark.sql.functions.{col, unix_timestamp, _}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.ekstep.analytics.framework.{FrameworkContext, MergeScriptConfig, StorageConfig}
import org.joda.time
import org.joda.time.format.DateTimeFormat
import org.ekstep.analytics.framework.util.DatasetUtil.extensions

class MergeUtil {

def mergeFile(mergeConfig: MergeScriptConfig)(implicit sc: SparkContext, fc: FrameworkContext): Unit = {
implicit val sqlContext = new SQLContext(sc)
mergeConfig.merge.files.foreach(filePaths => {
val path = new Path(filePaths("reportPath"))
val mergeResult:(DataFrame, StorageConfig) = mergeConfig.`type`.toLowerCase() match {
case "local" =>
val deltaDF = sqlContext.read.options(Map("header" -> "true")).csv(filePaths("deltaPath"))
val reportDF = sqlContext.read.options(Map("header" -> "true")).csv(filePaths("reportPath"))
(mergeReport(deltaDF, reportDF, mergeConfig),StorageConfig(mergeConfig.`type`, null, FilenameUtils.getFullPathNoEndSeparator(filePaths("reportPath"))))
case "azure" =>
val deltaDF = downloadAzureFile(filePaths("deltaPath"),
mergeConfig.deltaFileAccess.getOrElse(true), mergeConfig.container)

val reportDF = downloadAzureFile(filePaths("reportPath"), mergeConfig.reportFileAccess.getOrElse(true),
mergeConfig.postContainer.getOrElse("reports"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to download file?

(mergeReport(deltaDF, reportDF, mergeConfig),StorageConfig(mergeConfig.`type`, mergeConfig.postContainer.get, path.getParent.getName))
case _ =>
throw new Exception("Merge type unknown");
}
val mergeDF= mergeResult._1
mergeDF.saveToBlobStore(mergeResult._2, "csv", FilenameUtils.removeExtension(path.getName), Option(Map("header" -> "true", "mode" -> "overwrite")), None)
mergeDF.saveToBlobStore(mergeResult._2, "json", FilenameUtils.removeExtension(path.getName), Option(Map("header" -> "true", "mode" -> "overwrite")), None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename the older file with date before overwriting?

})
}


def mergeReport(delta: DataFrame, reportDF: DataFrame, mergeConfig: MergeScriptConfig): DataFrame = {

if (mergeConfig.rollup > 0) {
val defaultFormat = "dd-MM-yyyy"
val reportDfColumns = reportDF.columns
Comment on lines +43 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need an append mode where we just append the delta data to the existing data on few columns for the same on the same rollupCol? For ex: adding data to a cumulative report (yearly or monthly or weekly cumulative files)

val rollupCol = mergeConfig.rollupCol.getOrElse("Date")
val deltaDF = delta.withColumn(rollupCol, date_format(col(rollupCol), defaultFormat))
val filteredDf = reportDF.as("report").join(deltaDF.as("delta"),
col("report." + rollupCol) === col("delta." + rollupCol), "inner")
.select("report.*")

val finalDf = reportDF.except(filteredDf).union(deltaDF.dropDuplicates()
.drop(deltaDF.columns.filter(p => !reportDfColumns.contains(p)): _*)
.select(reportDfColumns.head, reportDfColumns.tail: _*))
rollupReport(finalDf, mergeConfig).orderBy(unix_timestamp(col(rollupCol), defaultFormat))
}
else
delta
}

def rollupReport(reportDF: DataFrame, mergeScriptConfig: MergeScriptConfig): DataFrame = {
val defaultFormat = "dd-MM-yyyy"
val subtract = (x: Int, y: Int) => x - y
val rollupRange = subtract(mergeScriptConfig.rollupRange.get,1)
val maxDate = reportDF.agg(max(unix_timestamp(col("Date"),defaultFormat)) as "Max").collect().apply(0).getAs[Long]("Max")
val convert = (x: Long) => x * 1000L
val endDate = new time.DateTime(convert(maxDate))
var endYear = endDate.year().get()
var endMonth = endDate.monthOfYear().get()
val startDate = mergeScriptConfig.rollupAge.get match {
case "ACADEMIC_YEAR" =>
if (endMonth <= 5)
endYear = subtract(subtract(endYear, 1), rollupRange)
else
endYear = subtract(endYear, rollupRange)
new time.DateTime(endYear, 6, 1, 0, 0, 0)
case "GEN_YEAR" =>
endYear = subtract(endYear, rollupRange)
new time.DateTime(endYear, 1, 1, 0, 0, 0)
case "MONTH" =>
endMonth = subtract(endMonth, rollupRange)
endYear = if (endMonth < 1) endYear + ((if (endMonth != 0) endMonth else -1) / 12).floor.toInt else endYear
endMonth = if (endMonth < 1) endMonth + 12 else endMonth
new time.DateTime(endYear, endMonth, 1, 0, 0, 0)
case "WEEK" =>
endDate.withDayOfWeek(1).minusWeeks(rollupRange)
case "DAY" =>
endDate.minusDays(rollupRange.toInt)
case _ =>
new time.DateTime(1970, 1, 1, 0, 0, 0)
}
reportDF.filter(p => DateTimeFormat.forPattern(defaultFormat)
.parseDateTime(p.getAs[String]("Date"))
.getMillis >= startDate.asInstanceOf[time.DateTime].getMillis)
}

def downloadAzureFile(filePath: String, isPrivate: Boolean, container: String)(implicit sqlContext: SQLContext, fc: FrameworkContext): DataFrame = {

val storageService =
if (isPrivate)
fc.getStorageService("azure", "azure_storage_key", "azure_storage_secret")
else
fc.getStorageService("azure", "report_storage_key", "report_storage_secret")
val keys = storageService.searchObjects(container, filePath)

sqlContext.read.options(Map("header" -> "true")).csv(storageService.getPaths(container, keys).toArray.mkString(","))
}

}
2 changes: 2 additions & 0 deletions analytics-core/src/test/resources/delta.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
State,Producer,Number of Successful QR Scans,Date
ka,dev.sunbird.learning.platform,1007.0,2020-11-28
2 changes: 2 additions & 0 deletions analytics-core/src/test/resources/delta_rollup.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Date,State,Producer,Number of Successful QR Scans
2020-11-08,ka,local.sunbird.learning.platform,1007.0
6 changes: 6 additions & 0 deletions analytics-core/src/test/resources/report.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
State,Producer,Number of Successful QR Scans,Date
ka,dev.sunbird.learning.platform,1007.0,19-04-1999
ka,dev.sunbird.learning.platform,1007.0,28-11-2020
ka,dev.sunbird.learning.platform,1007.0,01-01-2021
ka,dev.sunbird.learning.platform,1007.0,28-02-2020
ka,dev.sunbird.learning.platform,1007.0,11-02-2020
1 change: 1 addition & 0 deletions analytics-core/src/test/resources/report.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"State":"ka","Producer":"dev.sunbird.learning.platform","Number of Successful QR Scans":"1007.0","Date":"28-11-2020"}
11 changes: 11 additions & 0 deletions analytics-core/src/test/resources/report_rollup.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
State,Producer,Number of Successful QR Scans,Date
ka,dev.sunbird.learning.platform,1007.0,19-04-1999
ka,dev.sunbird.learning.platform,1007.0,19-07-2019
ka,dev.sunbird.learning.platform,1007.0,31-10-2020
ka,dev.sunbird.learning.platform,1007.0,01-11-2020
ka,dev.sunbird.learning.platform,1007.0,02-11-2020
ka,dev.sunbird.learning.platform,1007.0,03-11-2020
ka,dev.sunbird.learning.platform,1007.0,04-11-2020
ka,dev.sunbird.learning.platform,1007.0,05-11-2020
ka,dev.sunbird.learning.platform,1007.0,06-11-2020
ka,dev.sunbird.learning.platform,1007.0,07-11-2020
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package org.ekstep.analytics.framework.util

import org.apache.hadoop.fs.azure.AzureException
import org.apache.spark.sql.SQLContext
import org.ekstep.analytics.framework._
import org.scalamock.scalatest.MockFactory
import org.scalatest.Matchers
import org.sunbird.cloud.storage.BaseStorageService

class TestMergeUtil extends SparkSpec with Matchers with MockFactory {

"MergeUtil" should "test the merge function" in {

implicit val fc = new FrameworkContext
val mergeUtil = new MergeUtil()

val config =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"ACADEMIC_YEAR",
|"rollupCol":"Date","rollupRange":1,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin


mergeUtil.mergeFile(JSONUtils.deserialize[MergeScriptConfig](config))

}


"MergeUtil" should "test the azure merge function" in {

implicit val mockFc = mock[FrameworkContext]
val mockStorageService = mock[BaseStorageService]
val mergeUtil = new MergeUtil()

val config =
"""{"type":"azure","id":"daily_metrics.csv","frequency":"DAY","basePath":"/mount/data/analytics/tmp","rollup":1,"rollupAge":"ACADEMIC_YEAR",
|"rollupCol":"Date","rollupRange":1,"merge":{"files":[{"reportPath":"apekx/daily_metrics.csv",
|"deltaPath":"druid-reports/ETB-Consumption-Daily-Reports/apekx/2020-11-03.csv"}],"dims":["Date"]},"container":"reports",
|"postContainer":"test-container","deltaFileAccess":true,"reportFileAccess":false}""".stripMargin
val jsonConfig = JSONUtils.deserialize[MergeScriptConfig](config)
(mockFc.getStorageService(_:String, _:String, _:String):BaseStorageService).expects("azure", "azure_storage_key", "azure_storage_secret").returns(mockStorageService)
(mockStorageService.searchObjects _).expects(jsonConfig.container,"druid-reports/ETB-Consumption-Daily-Reports/apekx/2020-11-03.csv",None,None,None,"yyyy-MM-dd").returns(null)
(mockStorageService.getPaths _).expects(jsonConfig.container, null).returns(List("src/test/resources/delta.csv"))
(mockFc.getStorageService(_:String, _:String, _:String):BaseStorageService).expects("azure", "report_storage_key", "report_storage_secret").returns(mockStorageService)
(mockStorageService.searchObjects _).expects(jsonConfig.postContainer.get,"apekx/daily_metrics.csv",None,None,None,"yyyy-MM-dd").returns(null)
(mockStorageService.getPaths _).expects(jsonConfig.postContainer.get, null).returns(List("src/test/resources/report.csv"))
a[AzureException] should be thrownBy {
mergeUtil.mergeFile(JSONUtils.deserialize[MergeScriptConfig](config))
}
}


"MergeUtil" should "test the exception case" in {

implicit val mockFc = mock[FrameworkContext]
val mergeUtil = new MergeUtil()

val config =
"""{"type":"blob","id":"daily_metrics.csv","frequency":"DAY","basePath":"/mount/data/analytics/tmp","rollup":1,"rollupAge":"ACADEMIC_YEAR",
|"rollupCol":"Date","rollupRange":1,"merge":{"files":[{"reportPath":"apekx/daily_metrics.csv",
|"deltaPath":"druid-reports/ETB-Consumption-Daily-Reports/apekx/2020-11-03.csv"}],"dims":["Date"]},"container":"reports",
|"postContainer":"test-container","deltaFileAccess":true,"reportFileAccess":true}""".stripMargin

a[Exception] should be thrownBy {
mergeUtil.mergeFile(JSONUtils.deserialize[MergeScriptConfig](config))
}
}

"MergeUtil" should "test all rollup conditions" in {

implicit val mockFc = mock[FrameworkContext]
val mergeUtil = new MergeUtil()

val config =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"ACADEMIC_YEAR",
|"rollupCol":"Date","rollupRange":2,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
implicit val sqlContext = new SQLContext(sc)
val deltaDF = sqlContext.read.options(Map("header" -> "true")).csv("src/test/resources/delta_rollup.csv")
val reportDF = sqlContext.read.options(Map("header" -> "true")).csv("src/test/resources/report_rollup.csv")
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeScriptConfig](config)).count should be(10)
val config1 =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"GEN_YEAR",
|"rollupCol":"Date","rollupRange":1,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeScriptConfig](config1)).count should be(9)
val config2 =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"MONTH",
|"rollupCol":"Date","rollupRange":1,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeScriptConfig](config2)).count should be(8)
val config3 =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"WEEK",
|"rollupCol":"Date","rollupRange":1,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeScriptConfig](config3)).count should be(7)

val config4 =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"DAY",
|"rollupCol":"Date","rollupRange":4,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeScriptConfig](config4)).count should be(4)
val config5 =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":1,"rollupAge":"None",
|"rollupCol":"Date","rollupRange":4,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeScriptConfig](config5)).count should be(11)
}

"MergeUtil" should "test without rollup condition" in {

implicit val mockFc = mock[FrameworkContext]
val mergeUtil = new MergeUtil()

val config =
"""{"type":"local","id":"consumption_usage_metrics","frequency":"DAY","basePath":"","rollup":0,"rollupAge":"ACADEMIC_YEAR",
|"rollupCol":"Date","rollupRange":2,"merge":{"files":[{"reportPath":"src/test/resources/report.csv",
|"deltaPath":"src/test/resources/delta.csv"}],
|"dims":["Date"]},"container":"test-container","postContainer":null,"deltaFileAccess":true,"reportFileAccess":true}""".stripMargin
implicit val sqlContext = new SQLContext(sc)
val deltaDF = sqlContext.read.options(Map("header" -> "true")).csv("src/test/resources/delta_rollup.csv")
val reportDF = sqlContext.read.options(Map("header" -> "true")).csv("src/test/resources/report_rollup.csv")
mergeUtil.mergeReport(deltaDF,reportDF,JSONUtils.deserialize[MergeScriptConfig](config)).count should be(1)

}

}