Skip to content

Commit f61a326

Browse files
shengquan-niMA77HEW820
authored andcommitted
Fix slow scan operators (#3215)
This PR fixes the slow workflow execution issue by calling desc.sourceSchema only once in the scan operator executor's constructor, instead of for every output tuple. This PR also includes some reformatting to remove unused imports.
1 parent d965715 commit f61a326

File tree

3 files changed

+5
-4
lines changed

3 files changed

+5
-4
lines changed

core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/CSVScanSourceOpExec.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package edu.uci.ics.amber.operator.source.scan.csv
33
import com.univocity.parsers.csv.{CsvFormat, CsvParser, CsvParserSettings}
44
import edu.uci.ics.amber.core.executor.SourceOperatorExecutor
55
import edu.uci.ics.amber.core.storage.DocumentFactory
6-
import edu.uci.ics.amber.core.tuple.{AttributeTypeUtils, TupleLike}
6+
import edu.uci.ics.amber.core.tuple.{AttributeTypeUtils, Schema, TupleLike}
77
import edu.uci.ics.amber.util.JSONUtils.objectMapper
88

99
import java.io.InputStreamReader
@@ -16,6 +16,7 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat
1616
var parser: CsvParser = _
1717
var nextRow: Array[String] = _
1818
var numRowGenerated = 0
19+
private val schema: Schema = desc.sourceSchema()
1920

2021
override def produceTuple(): Iterator[TupleLike] = {
2122

@@ -42,7 +43,7 @@ class CSVScanSourceOpExec private[csv] (descString: String) extends SourceOperat
4243
try {
4344
TupleLike(
4445
ArraySeq.unsafeWrapArray(
45-
AttributeTypeUtils.parseFields(row.asInstanceOf[Array[Any]], desc.sourceSchema())
46+
AttributeTypeUtils.parseFields(row.asInstanceOf[Array[Any]], schema)
4647
): _*
4748
)
4849
} catch {

core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/csv/ParallelCSVScanSourceOpExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class ParallelCSVScanSourceOpExec private[csv] (
2020
val desc: ParallelCSVScanSourceOpDesc =
2121
objectMapper.readValue(descString, classOf[ParallelCSVScanSourceOpDesc])
2222
private var reader: BufferedBlockReader = _
23+
private val schema = desc.sourceSchema()
2324

2425
override def produceTuple(): Iterator[TupleLike] =
2526
new Iterator[TupleLike]() {
@@ -42,7 +43,6 @@ class ParallelCSVScanSourceOpExec private[csv] (
4243
return null
4344
}
4445

45-
val schema = desc.sourceSchema()
4646
// however the null values won't present if omitted in the end, we need to match nulls.
4747
if (fields.length != schema.getAttributes.size)
4848
fields = Stream

core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/source/scan/json/JSONLScanSourceOpExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ class JSONLScanSourceOpExec private[json] (
2121
objectMapper.readValue(descString, classOf[JSONLScanSourceOpDesc])
2222
private var rows: Iterator[String] = _
2323
private var reader: BufferedReader = _
24+
private val schema = desc.sourceSchema()
2425

2526
override def produceTuple(): Iterator[TupleLike] = {
2627
rows.flatMap { line =>
2728
Try {
28-
val schema = desc.sourceSchema()
2929
val data = JSONToMap(objectMapper.readTree(line), desc.flatten).withDefaultValue(null)
3030
val fields = schema.getAttributeNames.map { fieldName =>
3131
parseField(data(fieldName), schema.getAttribute(fieldName).getType)

0 commit comments

Comments
 (0)