1717package za .co .absa .cobrix .spark .cobol .source
1818
1919import org .apache .hadoop .fs .Path
20- import org .apache .spark .sql .sources .{ BaseRelation , DataSourceRegister , RelationProvider , SchemaRelationProvider }
20+ import org .apache .spark .sql .sources ._
2121import org .apache .spark .sql .types .StructType
22- import org .apache .spark .sql .{SQLContext , SparkSession }
22+ import org .apache .spark .sql .{DataFrame , SQLContext , SaveMode , SparkSession }
2323import za .co .absa .cobrix .cobol .internal .Logging
2424import za .co .absa .cobrix .cobol .reader .parameters .CobolParameters
2525import za .co .absa .cobrix .spark .cobol .parameters .CobolParametersParser ._
@@ -35,6 +35,7 @@ import za.co.absa.cobrix.spark.cobol.utils.{BuildProperties, SparkUtils}
3535class DefaultSource
3636 extends RelationProvider
3737 with SchemaRelationProvider
38+ with CreatableRelationProvider
3839 with DataSourceRegister
3940 with ReaderFactory
4041 with Logging {
@@ -45,6 +46,7 @@ class DefaultSource
4546 createRelation(sqlContext, parameters, null )
4647 }
4748
49+ /** Reader relation */
4850 override def createRelation (sqlContext : SQLContext , parameters : Map [String , String ], schema : StructType ): BaseRelation = {
4951 CobolParametersValidator .validateOrThrow(parameters, sqlContext.sparkSession.sparkContext.hadoopConfiguration)
5052
@@ -59,6 +61,36 @@ class DefaultSource
5961 cobolParameters.debugIgnoreFileSize)(sqlContext)
6062 }
6163
64+ /** Writer relation */
65+ override def createRelation (sqlContext : SQLContext , mode : SaveMode , parameters : Map [String , String ], data : DataFrame ): BaseRelation = {
66+ val path = parameters.getOrElse(" path" ,
67+ throw new IllegalArgumentException (" Path is required for this data source." ))
68+
69+ mode match {
70+ case SaveMode .Overwrite =>
71+ val outputPath = new Path (path)
72+ val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
73+ val fs = outputPath.getFileSystem(hadoopConf)
74+ if (fs.exists(outputPath)) {
75+ fs.delete(outputPath, true )
76+ }
77+ case SaveMode .Append =>
78+ case _ =>
79+ }
80+
81+ // Simply save each row as comma-separated values in a text file
82+ data.rdd
83+ .map(row => row.mkString(" ," ))
84+ .saveAsTextFile(path)
85+
86+ new BaseRelation {
87+ override def sqlContext : SQLContext = sqlContext
88+
89+ override def schema : StructType = data.schema
90+ }
91+ }
92+
93+
6294 // TODO fix with the correct implementation once the correct Reader hierarchy is put in place.
6395 override def buildReader (spark : SparkSession , parameters : Map [String , String ]): FixedLenReader = null
6496
0 commit comments