-
+
-
hdfs://
"org.apache.spark" %% s"spark-$name" % Version.spark excludeScalatest
+ val hadoop = (name: String) => "org.apache.hadoop" % s"hadoop-$name" % (Version.hadoop + ".+")
+
val spray = (name: String) => "io.spray" %% s"spray-$name" % Version.spray excludeAkkaActor
val akkaActor = akka("actor")
@@ -114,7 +116,9 @@ object Dependencies {
object Spark {
val components = Seq(
sparkCore,
- sparkMLLib)
+ sparkMLLib,
+ hadoop("aws")
+ )
// val provided = components.map(_ % Provided)
val provided = components
val test = components.map(_ % s"$Test,it")
diff --git a/seahorse-workflow-executor/deeplang/src/main/scala/ai/deepsense/deeplang/doperations/readwritedataframe/FileScheme.scala b/seahorse-workflow-executor/deeplang/src/main/scala/ai/deepsense/deeplang/doperations/readwritedataframe/FileScheme.scala
index 8dd8ade00..4817ed1b7 100644
--- a/seahorse-workflow-executor/deeplang/src/main/scala/ai/deepsense/deeplang/doperations/readwritedataframe/FileScheme.scala
+++ b/seahorse-workflow-executor/deeplang/src/main/scala/ai/deepsense/deeplang/doperations/readwritedataframe/FileScheme.scala
@@ -30,11 +30,14 @@ object FileScheme {
case object HDFS extends FileScheme("hdfs")
case object File extends FileScheme("file")
case object Library extends FileScheme("library")
+ case object S3 extends FileScheme("s3")
+ case object S3A extends FileScheme("s3a")
+ case object S3N extends FileScheme("s3n")
// TODO Autoderive values. There is macro-library for extracting sealed case objects.
- val values = Seq(HTTP, HTTPS, FTP, HDFS, File, Library)
+ val values = Seq(HTTP, HTTPS, FTP, HDFS, File, Library, S3, S3A, S3N)
- val supportedByParquet = Seq(HDFS)
+ val supportedByParquet = Seq(HDFS, S3, S3A, S3N)
def fromPath(path: String): FileScheme = {
val matchingFileSchema = values.find(schema => path.startsWith(schema.pathPrefix))
diff --git a/seahorse-workflow-executor/deeplang/src/main/scala/ai/deepsense/deeplang/doperations/readwritedataframe/filestorage/DataFrameFromFileReader.scala b/seahorse-workflow-executor/deeplang/src/main/scala/ai/deepsense/deeplang/doperations/readwritedataframe/filestorage/DataFrameFromFileReader.scala
index 9aafd9b18..fcdcab556 100644
--- a/seahorse-workflow-executor/deeplang/src/main/scala/ai/deepsense/deeplang/doperations/readwritedataframe/filestorage/DataFrameFromFileReader.scala
+++ b/seahorse-workflow-executor/deeplang/src/main/scala/ai/deepsense/deeplang/doperations/readwritedataframe/filestorage/DataFrameFromFileReader.scala
@@ -39,16 +39,18 @@ object DataFrameFromFileReader {
private def readUsingProvidedFileScheme
(path: FilePath, fileFormat: InputFileFormatChoice)
- (implicit context: ExecutionContext): DataFrame =
+ (implicit context: ExecutionContext): DataFrame = {
+ import FileScheme._
path.fileScheme match {
- case FileScheme.Library =>
+ case Library =>
val filePath = FilePathFromLibraryPath(path)
readUsingProvidedFileScheme(filePath, fileFormat)
case FileScheme.File => DriverFiles.read(path.pathWithoutScheme, fileFormat)
- case FileScheme.HTTP | FileScheme.HTTPS | FileScheme.FTP =>
+ case HTTP | HTTPS | FTP =>
val downloadedPath = FileDownloader.downloadFile(path.fullPath)
readUsingProvidedFileScheme(downloadedPath, fileFormat)
- case FileScheme.HDFS => ClusterFiles.read(path, fileFormat)
+ case HDFS | S3 | S3A | S3N => ClusterFiles.read(path, fileFormat)
}
+ }
}
diff --git a/seahorse-workflow-executor/deeplang/src/main/scala/ai/deepsense/deeplang/doperations/readwritedataframe/filestorage/DataFrameToFileWriter.scala b/seahorse-workflow-executor/deeplang/src/main/scala/ai/deepsense/deeplang/doperations/readwritedataframe/filestorage/DataFrameToFileWriter.scala
index ad18c8ea1..0db3c792b 100644
--- a/seahorse-workflow-executor/deeplang/src/main/scala/ai/deepsense/deeplang/doperations/readwritedataframe/filestorage/DataFrameToFileWriter.scala
+++ b/seahorse-workflow-executor/deeplang/src/main/scala/ai/deepsense/deeplang/doperations/readwritedataframe/filestorage/DataFrameToFileWriter.scala
@@ -66,7 +66,7 @@ object DataFrameToFileWriter {
new java.io.File(libraryPath).getParentFile.mkdirs()
writeUsingProvidedFileScheme(fileChoice, dataFrame, filePath, saveMode)
case FileScheme.File => DriverFiles.write(dataFrame, path, fileChoice.getFileFormat(), saveMode)
- case HDFS => ClusterFiles.write(dataFrame, path, fileChoice.getFileFormat(), saveMode)
+ case HDFS | S3 | S3A | S3N => ClusterFiles.write(dataFrame, path, fileChoice.getFileFormat(), saveMode)
case HTTP | HTTPS | FTP => throw NotSupportedScheme(path.fileScheme)
}
}
diff --git a/seahorse-workflow-executor/deeplang/src/main/scala/ai/deepsense/deeplang/doperations/readwritedatasource/ToDatasourceConverters.scala b/seahorse-workflow-executor/deeplang/src/main/scala/ai/deepsense/deeplang/doperations/readwritedatasource/ToDatasourceConverters.scala
index 59c77082b..70c466608 100644
--- a/seahorse-workflow-executor/deeplang/src/main/scala/ai/deepsense/deeplang/doperations/readwritedatasource/ToDatasourceConverters.scala
+++ b/seahorse-workflow-executor/deeplang/src/main/scala/ai/deepsense/deeplang/doperations/readwritedatasource/ToDatasourceConverters.scala
@@ -110,7 +110,10 @@ trait ToDatasourceConverters {
case FileScheme.File |
FileScheme.FTP |
FileScheme.HTTP |
- FileScheme.HTTPS =>
+ FileScheme.HTTPS |
+ FileScheme.S3 |
+ FileScheme.S3A |
+ FileScheme.S3N =>
params.setExternalFileParams(fileType)
params.setDatasourceType(DatasourceType.EXTERNALFILE)
case FileScheme.HDFS =>
@@ -147,7 +150,10 @@ trait ToDatasourceConverters {
case FileScheme.File |
FileScheme.FTP |
FileScheme.HTTP |
- FileScheme.HTTPS =>
+ FileScheme.HTTPS |
+ FileScheme.S3 |
+ FileScheme.S3A |
+ FileScheme.S3N =>
params.setExternalFileParams(fileType)
params.setDatasourceType(DatasourceType.EXTERNALFILE)
case FileScheme.HDFS =>
diff --git a/seahorse-workflow-executor/project/Dependencies.scala b/seahorse-workflow-executor/project/Dependencies.scala
index 31243e458..f892ff98d 100644
--- a/seahorse-workflow-executor/project/Dependencies.scala
+++ b/seahorse-workflow-executor/project/Dependencies.scala
@@ -25,7 +25,7 @@ object Version {
case "2.0.0" | "2.0.1" | "2.0.2" => ("2.11.8", "1.8", "2.7.1", "2.4.9", "3.3.+")
}
- val amazonS3 = "1.10.16"
+ val amazonSDK = "1.7.4"
val googleApi = "1.22.0"
val mockito = "1.10.19"
val nsscalaTime = "1.8.0"
@@ -53,7 +53,7 @@ object Library {
val akkaActor = akka("actor")
val akkaTestkit = akka("testkit")
- val amazonS3 = "com.amazonaws" % "aws-java-sdk-s3" % Version.amazonS3 excludeJackson
+ val amazonSDK = "com.amazonaws" % "aws-java-sdk" % Version.amazonSDK excludeJackson
val apacheCommonsLang3 = "org.apache.commons" % "commons-lang3" % Version.apacheCommons
val apacheCommonsCsv = "org.apache.commons" % "commons-csv" % "1.1" // Also used by spark-csv
val config = "com.typesafe" % "config" % "1.3.1"
@@ -171,7 +171,7 @@ object Dependencies {
akkaActor,
sprayClient,
apacheCommonsLang3,
- amazonS3,
+ amazonSDK,
nscalaTime,
scalaReflect,
apacheCommonsCsv,
diff --git a/sessionmanager/src/main/scala/ai/deepsense/sessionmanager/service/sessionspawner/sparklauncher/clusters/SeahorseSparkLauncher.scala b/sessionmanager/src/main/scala/ai/deepsense/sessionmanager/service/sessionspawner/sparklauncher/clusters/SeahorseSparkLauncher.scala
index 723b1c588..8d2bc6985 100644
--- a/sessionmanager/src/main/scala/ai/deepsense/sessionmanager/service/sessionspawner/sparklauncher/clusters/SeahorseSparkLauncher.scala
+++ b/sessionmanager/src/main/scala/ai/deepsense/sessionmanager/service/sessionspawner/sparklauncher/clusters/SeahorseSparkLauncher.scala
@@ -57,6 +57,7 @@ object SeahorseSparkLauncher {
val extraClassPath = s"${sparkLauncherConfig.weJarPath}:${jars.mkString(":")}"
jars.toList.foldLeft(sparkLauncher) { (acc, jar) => acc.addJar(jar) }
+ .addHadoopAwsJars()
.setConf("spark.driver.extraClassPath", extraClassPath)
.setConfOpt("spark.executor.memory", clusterConfig.executorMemory)
.setConfOpt("spark.driver.memory", clusterConfig.driverMemory)
@@ -74,6 +75,12 @@ object SeahorseSparkLauncher {
}
implicit class RichSparkLauncher(self: SparkLauncher) {
+
+ def addHadoopAwsJars(): SparkLauncher = {
+ self.addJar("/opt/docker/app/lib/com.amazonaws.aws-java-sdk-1.7.4.jar")
+ .addJar("/opt/docker/app/lib/org.apache.hadoop.hadoop-aws-2.7.6.jar")
+ }
+
def setConfOpt(key: String, valueOpt: Option[String]): SparkLauncher = {
valueOpt match {
case Some(value) => self.setConf(key, value)