1717package za .co .absa .cobrix .spark .cobol .source
1818
1919import org .apache .hadoop .fs .Path
20- import org .apache .spark .sql .sources .{ BaseRelation , DataSourceRegister , RelationProvider , SchemaRelationProvider }
20+ import org .apache .spark .sql .sources ._
2121import org .apache .spark .sql .types .StructType
22- import org .apache .spark .sql .{SQLContext , SparkSession }
22+ import org .apache .spark .sql .{DataFrame , SQLContext , SaveMode , SparkSession }
2323import za .co .absa .cobrix .cobol .internal .Logging
2424import za .co .absa .cobrix .cobol .reader .parameters .{CobolParameters , CobolParametersParser , Parameters }
2525import za .co .absa .cobrix .cobol .reader .parameters .CobolParametersParser ._
@@ -34,6 +34,7 @@ import za.co.absa.cobrix.spark.cobol.utils.{BuildProperties, SparkUtils}
3434class DefaultSource
3535 extends RelationProvider
3636 with SchemaRelationProvider
37+ with CreatableRelationProvider
3738 with DataSourceRegister
3839 with ReaderFactory
3940 with Logging {
@@ -44,6 +45,7 @@ class DefaultSource
4445 createRelation(sqlContext, parameters, null )
4546 }
4647
48+ /** Reader relation */
4749 override def createRelation (sqlContext : SQLContext , parameters : Map [String , String ], schema : StructType ): BaseRelation = {
4850 CobolParametersValidator .validateOrThrow(parameters, sqlContext.sparkSession.sparkContext.hadoopConfiguration)
4951
@@ -58,6 +60,36 @@ class DefaultSource
5860 cobolParameters.debugIgnoreFileSize)(sqlContext)
5961 }
6062
63+ /** Writer relation */
64+ override def createRelation (sqlContext : SQLContext , mode : SaveMode , parameters : Map [String , String ], data : DataFrame ): BaseRelation = {
65+ val path = parameters.getOrElse(" path" ,
66+ throw new IllegalArgumentException (" Path is required for this data source." ))
67+
68+ mode match {
69+ case SaveMode .Overwrite =>
70+ val outputPath = new Path (path)
71+ val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
72+ val fs = outputPath.getFileSystem(hadoopConf)
73+ if (fs.exists(outputPath)) {
74+ fs.delete(outputPath, true )
75+ }
76+ case SaveMode .Append =>
77+ case _ =>
78+ }
79+
80+ // Simply save each row as comma-separated values in a text file
81+ data.rdd
82+ .map(row => row.mkString(" ," ))
83+ .saveAsTextFile(path)
84+
85+ new BaseRelation {
86+ override def sqlContext : SQLContext = sqlContext
87+
88+ override def schema : StructType = data.schema
89+ }
90+ }
91+
92+
6193 // TODO fix with the correct implementation once the correct Reader hierarchy is put in place.
6294 override def buildReader (spark : SparkSession , parameters : Map [String , String ]): FixedLenReader = null
6395
0 commit comments