Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ object SourceFilterSerde extends Logging {
// refer to org.apache.spark.sql.catalyst.CatalystTypeConverters.CatalystTypeConverter#toScala
dataType match {
case _: BooleanType => exprBuilder.setBoolVal(value.asInstanceOf[Boolean])
case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte])
case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short])
case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte].toInt)
case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short].toInt)
case _: IntegerType => exprBuilder.setIntVal(value.asInstanceOf[Int])
case _: LongType => exprBuilder.setLongVal(value.asInstanceOf[Long])
case _: FloatType => exprBuilder.setFloatVal(value.asInstanceOf[Float])
Expand Down
4 changes: 2 additions & 2 deletions spark/src/main/scala/org/apache/comet/serde/literals.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ object CometLiteral extends CometExpressionSerde[Literal] with Logging {
exprBuilder.setIsNull(false)
dataType match {
case _: BooleanType => exprBuilder.setBoolVal(value.asInstanceOf[Boolean])
case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte])
case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short])
case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte].toInt)
case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short].toInt)
case _: IntegerType | _: DateType => exprBuilder.setIntVal(value.asInstanceOf[Int])
case _: LongType | _: TimestampType | _: TimestampNTZType =>
exprBuilder.setLongVal(value.asInstanceOf[Long])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ case class CometBatchScanExec(wrapped: BatchScanExec, runtimeFilters: Seq[Expres

override def next(): ColumnarBatch = {
val batch = batches.next()
numOutputRows += batch.numRows()
numOutputRows += batch.numRows().toLong
batch
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ case class CometBroadcastExchangeExec(
longMetric("numOutputRows") += numRows
if (numRows >= maxBroadcastRows) {
throw QueryExecutionErrors.cannotBroadcastTableOverMaxTableRowsError(
maxBroadcastRows,
maxBroadcastRows.toLong,
numRows)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ case class CometCollectLimitExec(
outputPartitioning,
serializer,
metrics)
metrics("numPartitions").set(dep.partitioner.numPartitions)
metrics("numPartitions").set(dep.partitioner.numPartitions.toLong)

new CometShuffledBatchRDD(dep, readMetrics)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ case class CometColumnarToRowExec(child: SparkPlan)
val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
batches.flatMap { batch =>
numInputBatches += 1
numOutputRows += batch.numRows()
numOutputRows += batch.numRows().toLong
batch.rowIterator().asScala.map(toUnsafe)
}
}
Expand Down Expand Up @@ -120,7 +120,7 @@ case class CometColumnarToRowExec(child: SparkPlan)
.flatMap(CometUtils.decodeBatches(_, this.getClass.getSimpleName))
.flatMap { batch =>
numInputBatches += 1
numOutputRows += batch.numRows()
numOutputRows += batch.numRows().toLong
batch.rowIterator().asScala.map(toUnsafe)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ case class CometScanExec(
driverMetrics("staticFilesSize") = filesSize
}
if (relation.partitionSchema.nonEmpty) {
driverMetrics("numPartitions") = partitions.length
driverMetrics("numPartitions") = partitions.length.toLong
}
}

Expand Down Expand Up @@ -277,7 +277,7 @@ case class CometScanExec(

override def next(): ColumnarBatch = {
val batch = batches.next()
numOutputRows += batch.numRows()
numOutputRows += batch.numRows().toLong
batch
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ case class CometSparkToColumnarExec(child: SparkPlan)
val startNs = System.nanoTime()
val batch = iter.next()
conversionTime += System.nanoTime() - startNs
numInputRows += batch.numRows()
numInputRows += batch.numRows().toLong
numOutputBatches += 1
batch
}
Expand Down Expand Up @@ -123,7 +123,7 @@ case class CometSparkToColumnarExec(child: SparkPlan)
CometArrowConverters.rowToArrowBatchIter(
sparkBatches,
schema,
maxRecordsPerBatch,
maxRecordsPerBatch.toLong,
timeZoneId,
context)
createTimingIter(arrowBatches, numInputRows, numOutputBatches, conversionTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ case class CometTakeOrderedAndProjectExec(
outputPartitioning,
serializer,
metrics)
metrics("numPartitions").set(dep.partitioner.numPartitions)
metrics("numPartitions").set(dep.partitioner.numPartitions.toLong)

new CometShuffledBatchRDD(dep, readMetrics)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class CometBlockStoreShuffleReader[K, C](
// Update the context task metrics for each record read.
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
recordIter.map { record =>
readMetrics.incRecordsRead(record._2.numRows())
readMetrics.incRecordsRead(record._2.numRows().toLong)
record
},
context.taskMetrics().mergeShuffleReadMetrics())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ case class CometShuffleExchangeExec(
outputPartitioning,
serializer,
metrics)
metrics("numPartitions").set(dep.partitioner.numPartitions)
metrics("numPartitions").set(dep.partitioner.numPartitions.toLong)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
sparkContext,
Expand All @@ -151,7 +151,7 @@ case class CometShuffleExchangeExec(
outputPartitioning,
serializer,
metrics)
metrics("numPartitions").set(dep.partitioner.numPartitions)
metrics("numPartitions").set(dep.partitioner.numPartitions.toLong)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
sparkContext,
Expand Down Expand Up @@ -385,7 +385,7 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec {
// end up being almost the same regardless of the index. substantially scrambling the
// seed by hashing will help. Refer to SPARK-21782 for more details.
val partitionId = TaskContext.get().partitionId()
var position = new XORShiftRandom(partitionId).nextInt(numPartitions)
var position = new XORShiftRandom(partitionId.toLong).nextInt(numPartitions)
(_: InternalRow) => {
// The HashPartitioner will handle the `mod` by the number of partitions
position += 1
Expand Down Expand Up @@ -432,7 +432,7 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec {
row: InternalRow): UnsafeExternalRowSorter.PrefixComputer.Prefix = {
// The hashcode generated from the binary form of a [[UnsafeRow]] should not be null.
result.isNull = false
result.value = row.hashCode()
result.value = row.hashCode().toLong
result
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
def makeDecimalRDD(num: Int, decimal: DecimalType, useDictionary: Boolean): DataFrame = {
val div = if (useDictionary) 5 else num // narrow the space to make it dictionary encoded
spark
.range(num)
.range(num.toLong)
.map(_ % div)
// Parquet doesn't allow column names with spaces, have to add an alias here.
// Minus 500 here so that negative decimals are also tested.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1700,7 +1700,7 @@ class CometExecSuite extends CometTestBase {
withTable("t1") {
val numRows = 10
spark
.range(numRows)
.range(numRows.toLong)
.selectExpr("if (id % 2 = 0, null, id) AS a", s"$numRows - id AS b")
.repartition(3) // Move data across multiple partitions
.write
Expand Down Expand Up @@ -1737,7 +1737,7 @@ class CometExecSuite extends CometTestBase {
withTable("t1") {
val numRows = 10
spark
.range(numRows)
.range(numRows.toLong)
.selectExpr("if (id % 2 = 0, null, id) AS a", s"$numRows - id AS b")
.repartition(3) // Force repartition to test data will come to single partition
.write
Expand Down Expand Up @@ -1768,7 +1768,7 @@ class CometExecSuite extends CometTestBase {
withTable("t1") {
val numRows = 10
spark
.range(numRows)
.range(numRows.toLong)
.selectExpr("if (id % 2 = 0, null, id) AS a", s"$numRows - id AS b")
.repartition(3) // Force repartition to test data will come to single partition
.write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.io.{File, FileFilter}
import java.math.{BigDecimal, BigInteger}
import java.time.{ZoneId, ZoneOffset}

import scala.annotation.nowarn
import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
Expand Down Expand Up @@ -52,6 +53,7 @@ import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
import org.apache.comet.rules.CometScanTypeChecker

@nowarn("cat=w-flag-numeric-widen")
abstract class ParquetReadSuite extends CometTestBase {
import testImplicits._

Expand Down Expand Up @@ -1532,6 +1534,7 @@ abstract class ParquetReadSuite extends CometTestBase {
}

test("test pre-fetching multiple files") {
@nowarn("msg=implicit numeric widening")
def makeRawParquetFile(
path: Path,
dictionaryEnabled: Boolean,
Expand Down Expand Up @@ -1767,7 +1770,7 @@ abstract class ParquetReadSuite extends CometTestBase {
}

private def withId(id: Int) =
new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id.toLong).build()

// Based on Spark ParquetIOSuite.test("vectorized reader: array of nested struct")
test("array of nested struct with and without field id") {
Expand Down
14 changes: 9 additions & 5 deletions spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.spark.sql

import java.util.concurrent.atomic.AtomicInteger

import scala.annotation.nowarn
import scala.concurrent.duration._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
Expand Down Expand Up @@ -550,6 +551,7 @@ abstract class CometTestBase
case None => f(spark.read.format("parquet").load(path))
}

@nowarn("cat=deprecation")
protected def createParquetWriter(
schema: MessageType,
path: Path,
Expand All @@ -558,8 +560,8 @@ abstract class CometTestBase
dictionaryPageSize: Int = 1024,
pageRowCountLimit: Int = ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT,
rowGroupSize: Long = 1024 * 1024L): ParquetWriter[Group] = {
val hadoopConf = spark.sessionState.newHadoopConf()

val hadoopConf = spark.sessionState.newHadoopConf()
ExampleParquetWriter
.builder(path)
.withDictionaryEncoding(dictionaryEnabled)
Expand Down Expand Up @@ -647,6 +649,7 @@ abstract class CometTestBase
}
}

@nowarn("cat=w-flag-numeric-widen")
def makeParquetFileAllPrimitiveTypes(
path: Path,
dictionaryEnabled: Boolean,
Expand Down Expand Up @@ -768,7 +771,7 @@ abstract class CometTestBase
if (rand.nextBoolean()) {
None
} else {
Some(getValue(i, div))
Some(getValue(i.toLong, div.toLong))
}
}
expected.foreach { opt =>
Expand Down Expand Up @@ -822,7 +825,7 @@ abstract class CometTestBase
if (rand.nextBoolean()) {
None
} else {
Some(getValue(i, div))
Some(getValue(i.toLong, div.toLong))
}
}
expected.foreach { opt =>
Expand Down Expand Up @@ -1000,7 +1003,7 @@ abstract class CometTestBase
val div = if (dictionaryEnabled) 10 else n // maps value to a small range for dict to kick in

val expected = (0 until n).map { i =>
Some(getValue(i, div))
Some(getValue(i.toLong, div.toLong))
}
expected.foreach { opt =>
val timestampFormats = List(
Expand Down Expand Up @@ -1048,7 +1051,7 @@ abstract class CometTestBase
def makeDecimalRDD(num: Int, decimal: DecimalType, useDictionary: Boolean): DataFrame = {
val div = if (useDictionary) 5 else num // narrow the space to make it dictionary encoded
spark
.range(num)
.range(num.toLong)
.map(_ % div)
// Parquet doesn't allow column names with spaces, have to add an alias here.
// Minus 500 here so that negative decimals are also tested.
Expand Down Expand Up @@ -1175,6 +1178,7 @@ abstract class CometTestBase
df.showString(_numRows, truncate, vertical)
}

@nowarn("cat=w-flag-numeric-widen")
def makeParquetFile(
path: Path,
total: Int,
Expand Down
6 changes: 4 additions & 2 deletions spark/src/test/scala/org/apache/spark/sql/GenTPCHData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ object GenTPCHData {
// Install the data generators in all nodes
// TODO: think a better way to install on each worker node
// such as https://stackoverflow.com/a/40876671
spark.range(0, workers, 1, workers).foreach(worker => installDBGEN(baseDir)(worker))
spark
.range(0L, workers.toLong, 1L, workers)
.foreach(worker => installDBGEN(baseDir)(worker))
s"${baseDir}/dbgen"
} else {
config.dbgenDir
Expand All @@ -91,7 +93,7 @@ object GenTPCHData {

// Clean up
if (defaultDbgenDir != null) {
spark.range(0, workers, 1, workers).foreach { _ =>
spark.range(0L, workers.toLong, 1L, workers).foreach { _ =>
val _ = FileUtils.deleteQuietly(defaultDbgenDir)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
new Benchmark(
s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCardinality), " +
s"single aggregate ${aggregateFunction.toString}",
values,
values.toLong,
output = output)

withTempPath { dir =>
Expand Down Expand Up @@ -104,7 +104,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
new Benchmark(
s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCardinality), " +
s"single aggregate ${aggregateFunction.toString} on decimal",
values,
values.toLong,
output = output)

val df = makeDecimalDataFrame(values, dataType, false);
Expand Down Expand Up @@ -145,7 +145,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
new Benchmark(
s"Grouped HashAgg Exec: multiple group keys (cardinality $groupingKeyCard), " +
s"single aggregate ${aggregateFunction.toString}",
values,
values.toLong,
output = output)

withTempPath { dir =>
Expand Down Expand Up @@ -186,7 +186,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
new Benchmark(
s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCard), " +
s"multiple aggregates ${aggregateFunction.toString}",
values,
values.toLong,
output = output)

withTempPath { dir =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object CometArithmeticBenchmark extends CometBenchmarkBase {
val dataType = IntegerType
val benchmark = new Benchmark(
s"Binary op ${dataType.sql}, dictionary = $useDictionary",
values,
values.toLong,
output = output)

withTempPath { dir =>
Expand Down Expand Up @@ -78,7 +78,7 @@ object CometArithmeticBenchmark extends CometBenchmarkBase {
useDictionary: Boolean): Unit = {
val benchmark = new Benchmark(
s"Binary op ${dataType.sql}, dictionary = $useDictionary",
values,
values.toLong,
output = output)
val df = makeDecimalDataFrame(values, dataType, useDictionary)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ trait CometBenchmarkBase extends SqlBasedBenchmark {
withTempTable(tbl) {
import spark.implicits._
spark
.range(values)
.range(values.toLong)
.map(_ => if (useDictionary) Random.nextLong % 5 else Random.nextLong)
.createOrReplaceTempView(tbl)
runBenchmark(benchmarkName)(f(values))
Expand Down Expand Up @@ -168,7 +168,7 @@ trait CometBenchmarkBase extends SqlBasedBenchmark {

val div = if (useDictionary) 5 else values
spark
.range(values)
.range(values.toLong)
.map(_ % div)
.select((($"value" - 500) / 100.0) cast decimal as Symbol("dec"))
}
Expand Down
Loading