@@ -57,58 +57,16 @@ object IoHelpers extends LazyLogging {
5757 def loadFileToDF (pathInfo : IOResourceConfig )(implicit session : SparkSession ): DataFrame = {
5858 logger.info(s " load dataset ${pathInfo.path} with ${pathInfo.toString}" )
5959
60- val effectivePath = if (pathInfo.path.endsWith(" .zip" )) {
61- extractFileFromZip(pathInfo.path)
62- } else {
63- pathInfo.path
64- }
65-
6660 try
6761 pathInfo.options
68- .foldLeft(session.read.format(pathInfo.format)) { case (reader, options) =>
69- reader.options(options.map(c => c.k -> c.v).toMap)
62+ .foldLeft(session.read.format(pathInfo.format)) { case ops =>
63+ val options = ops._2.map(c => c.k -> c.v).toMap
64+ ops._1.options(options)
7065 }
71- .load(effectivePath )
66+ .load(pathInfo.path )
7267 catch {
7368 case e : Exception =>
74- logger.error(s " Error loading file $effectivePath with ${pathInfo.toString}" )
75- throw e
76- }
77- }
78-
79- private def extractFileFromZip (zipPath : String ): String = {
80- import better .files ._
81-
82- val zipFile = File (zipPath)
83- val innerFileName = zipFile.nameWithoutExtension
84- val tempDir = File .newTemporaryDirectory(" spark_zip_extract_" )
85-
86- logger.info(s " Extracting file $innerFileName from zip $zipPath to $tempDir" )
87-
88- try {
89- zipFile.unzipTo(tempDir)
90-
91- val extractedFiles = tempDir.children.toList
92- val matchingFile = extractedFiles.find(f => f.name == innerFileName)
93-
94- matchingFile match {
95- case Some (file) =>
96- logger.info(s " Found matching file: ${file.pathAsString}" )
97- file.pathAsString
98- case None =>
99- extractedFiles.headOption match {
100- case Some (file) =>
101- logger.info(s " No exact match found, using first file: ${file.pathAsString}" )
102- file.pathAsString
103- case None =>
104- logger.error(s " No files found in zip archive: $zipPath" )
105- throw new RuntimeException (s " Empty zip archive: $zipPath" )
106- }
107- }
108- } catch {
109- case e : Exception =>
110- logger.error(s " Error extracting from zip file $zipPath" , e)
111- tempDir.delete(swallowIOExceptions = true )
69+ logger.error(s " Error loading file ${pathInfo.path} with ${pathInfo.toString}" )
11270 throw e
11371 }
11472 }
0 commit comments