@@ -19,18 +19,20 @@ package ai.deepsense.deeplang.doperations.readwritedataframe.filestorage
1919import java .io .{File , IOException , PrintWriter }
2020
2121import scala .io .Source
22+
2223import org .apache .spark .rdd .RDD
2324import org .apache .spark .sql .execution .datasources .csv .{DataframeToDriverCsvFileWriter , RawCsvRDDToDataframe }
24- import org .apache .spark .sql .{SaveMode , DataFrame => SparkDataFrame }
25+ import org .apache .spark .sql .{Dataset , Encoders , Row , SaveMode , DataFrame => SparkDataFrame }
2526import ai .deepsense .commons .resources .ManagedResource
2627import ai .deepsense .deeplang .ExecutionContext
2728import ai .deepsense .deeplang .doperables .dataframe .DataFrame
2829import ai .deepsense .deeplang .doperations .inout .{InputFileFormatChoice , OutputFileFormatChoice }
2930import ai .deepsense .deeplang .doperations .readwritedataframe .filestorage .csv .CsvOptions
3031import ai .deepsense .deeplang .doperations .readwritedataframe .{FilePath , FileScheme }
32+ import ai .deepsense .deeplang .readjsondataset .JsonReader
3133import ai .deepsense .sparkutils .SQL
3234
33- object DriverFiles {
35+ object DriverFiles extends JsonReader {
3436
3537 def read (driverPath : String , fileFormat : InputFileFormatChoice )
3638 (implicit context : ExecutionContext ): SparkDataFrame = fileFormat match {
@@ -59,13 +61,14 @@ object DriverFiles {
5961 val lines = Source .fromFile(driverPath).getLines().toStream
6062 val fileLinesRdd = context.sparkContext.parallelize(lines)
6163
62- RawCsvRDDToDataframe .parse(fileLinesRdd, context.sparkSQLSession, params)
64+ RawCsvRDDToDataframe .parse(fileLinesRdd, context.sparkSQLSession.sparkSession , params)
6365 }
6466
6567 private def readJson (driverPath : String )(implicit context : ExecutionContext ) = {
6668 val lines = Source .fromFile(driverPath).getLines().toStream
6769 val fileLinesRdd = context.sparkContext.parallelize(lines)
68- context.sparkSQLSession.read.json(fileLinesRdd)
70+ val sparkSession = context.sparkSQLSession.sparkSession
71+ readJsonFromRdd(fileLinesRdd, sparkSession)
6972 }
7073
7174 private def writeCsv
@@ -77,7 +80,8 @@ object DriverFiles {
7780 dataFrame.sparkDataFrame,
7881 params,
7982 dataFrame.schema.get,
80- path.pathWithoutScheme
83+ path.pathWithoutScheme,
84+ context.sparkSQLSession.sparkSession
8185 )
8286 }
8387
0 commit comments