@@ -28,7 +28,7 @@ import za.co.absa.spline.producer.rest.ProducerAPI
2828import java .io .File
2929import java .nio .file .{DirectoryStream , Files , Path }
3030import java .util .concurrent .ExecutorService
31- import scala .concurrent .Future
31+ import scala .concurrent .{ Future , blocking }
3232import scala .jdk .CollectionConverters ._
3333import scala .util .Success
3434
@@ -84,15 +84,23 @@ class LineageImporter(restClient: RESTClientApacheHttpImpl, failOnErrors: Boolea
8484 .filter(_.isFile)
8585
8686 val eventualCounts = Future .traverse(filesIterable) { file =>
87- log.debug(s " Processing file: ${file.getName}" )
88- val rawFileContent = Files .readString(file.toPath).trim
89- val jsonContent = contentPreprocessingFn(rawFileContent)
90- val eventualRes = doImport(jsonContent, endpoint)
91- .andThen({ case Success (_) => progressTracker.tap(Console .out) })
92- .map(_ => 1 )
93- withErrorHandling(eventualRes, 0 , file.getName)
87+ for {
88+ jsonContent <- Future {
89+ blocking {
90+ log.debug(s " Processing file: ${file.getName}" )
91+ val rawFileContent = Files .readString(file.toPath).trim
92+ contentPreprocessingFn(rawFileContent)
93+ }
94+ }
95+
96+ eventualRes = doImport(jsonContent, endpoint)
97+ .andThen({ case Success (_) => progressTracker.tap(Console .out) })
98+ .map(_ => 1 )
99+
100+ res <- withErrorHandling(eventualRes, 0 , file.getName)
101+
102+ } yield res
94103 }
95-
96104 eventualCounts.map(_.sum)
97105 }
98106
0 commit comments