Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ under the License.
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-collection-compat_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.stream.Collectors;

import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.mutable.Buffer;

Expand Down Expand Up @@ -81,6 +80,8 @@
import org.apache.comet.vector.CometVector;
import org.apache.comet.vector.NativeUtil;

import static scala.jdk.javaapi.CollectionConverters.*;

/**
* A vectorized Parquet reader that reads a Parquet file in a batched fashion.
*
Expand Down Expand Up @@ -255,7 +256,7 @@ public void init() throws Throwable {
ParquetReadOptions readOptions = builder.build();

Map<String, String> objectStoreOptions =
JavaConverters.mapAsJavaMap(NativeConfig.extractObjectStoreOptions(conf, file.pathUri()));
asJava(NativeConfig.extractObjectStoreOptions(conf, file.pathUri()));

// TODO: enable off-heap buffer when they are ready
ReadOptions cometReadOptions = ReadOptions.builder(conf).build();
Expand Down Expand Up @@ -306,7 +307,7 @@ public void init() throws Throwable {
List<Type> fields = requestedSchema.getFields();
List<Type> fileFields = fileSchema.getFields();
ParquetColumn[] parquetFields =
JavaConverters.seqAsJavaList(parquetColumn.children()).toArray(new ParquetColumn[0]);
asJava(parquetColumn.children()).toArray(new ParquetColumn[0]);
int numColumns = fields.size();
if (partitionSchema != null) numColumns += partitionSchema.size();
columnReaders = new AbstractColumnReader[numColumns];
Expand Down Expand Up @@ -618,14 +619,14 @@ private DataType getSparkArrayTypeByFieldId(
}

private void checkParquetType(ParquetColumn column) throws IOException {
String[] path = JavaConverters.seqAsJavaList(column.path()).toArray(new String[0]);
String[] path = asJava(column.path()).toArray(new String[0]);
if (containsPath(fileSchema, path)) {
if (column.isPrimitive()) {
ColumnDescriptor desc = column.descriptor().get();
ColumnDescriptor fd = fileSchema.getColumnDescription(desc.getPath());
TypeUtil.checkParquetType(fd, column.sparkType());
} else {
for (ParquetColumn childColumn : JavaConverters.seqAsJavaList(column.children())) {
for (ParquetColumn childColumn : asJava(column.children())) {
checkColumn(childColumn);
}
}
Expand All @@ -645,7 +646,7 @@ private void checkParquetType(ParquetColumn column) throws IOException {
* file schema, or whether it conforms to the type of the file schema.
*/
private void checkColumn(ParquetColumn column) throws IOException {
String[] path = JavaConverters.seqAsJavaList(column.path()).toArray(new String[0]);
String[] path = asJava(column.path()).toArray(new String[0]);
if (containsPath(fileSchema, path)) {
if (column.isPrimitive()) {
ColumnDescriptor desc = column.descriptor().get();
Expand All @@ -654,7 +655,7 @@ private void checkColumn(ParquetColumn column) throws IOException {
throw new UnsupportedOperationException("Schema evolution not supported.");
}
} else {
for (ParquetColumn childColumn : JavaConverters.seqAsJavaList(column.children())) {
for (ParquetColumn childColumn : asJava(column.children())) {
checkColumn(childColumn);
}
}
Expand Down Expand Up @@ -805,7 +806,7 @@ public void close() throws IOException {
@SuppressWarnings("deprecation")
private int loadNextBatch() throws Throwable {

for (ParquetColumn childColumn : JavaConverters.seqAsJavaList(parquetColumn.children())) {
for (ParquetColumn childColumn : asJava(parquetColumn.children())) {
checkParquetType(childColumn);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.comet.CometSchemaImporter;
import org.apache.comet.vector.*;

import static scala.jdk.javaapi.CollectionConverters.*;

// TODO: extend ColumnReader instead of AbstractColumnReader to reduce code duplication
public class NativeColumnReader extends AbstractColumnReader {
protected static final Logger LOG = LoggerFactory.getLogger(NativeColumnReader.class);
Expand Down Expand Up @@ -145,9 +147,7 @@ public CometDecodedVector loadVector() {
ArrowSchema[] schemas = {schema};

CometDecodedVector cometVector =
(CometDecodedVector)
scala.collection.JavaConverters.seqAsJavaList(nativeUtil.importVector(arrays, schemas))
.get(0);
(CometDecodedVector) asJava(nativeUtil.importVector(arrays, schemas)).get(0);

// Update whether the current vector contains any null values. This is used in the following
// batch(s) to determine whether we can skip loading the native vector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.spark.sql.comet.execution.arrow

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.apache.arrow.vector._
import org.apache.arrow.vector.complex._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.spark.sql.comet.parquet

import java.util.{Locale, UUID}

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.apache.parquet.schema._
import org.apache.parquet.schema.LogicalTypeAnnotation.ListLogicalTypeAnnotation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.io.{DataInputStream, DataOutputStream, File}
import java.nio.ByteBuffer
import java.nio.channels.Channels

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.apache.arrow.c.CDataDictionaryProvider
import org.apache.arrow.vector.{BigIntVector, BitVector, DateDayVector, DecimalVector, FieldVector, FixedSizeBinaryVector, Float4Vector, Float8Vector, IntVector, SmallIntVector, TimeStampMicroTZVector, TimeStampMicroVector, TinyIntVector, ValueVector, VarBinaryVector, VarCharVector, VectorSchemaRoot}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;

import scala.collection.JavaConverters;

import org.junit.Test;

import org.apache.arrow.memory.BufferAllocator;
Expand All @@ -46,6 +44,7 @@

import static org.apache.spark.sql.types.DataTypes.*;
import static org.junit.Assert.*;
import static scala.jdk.javaapi.CollectionConverters.*;

@SuppressWarnings("unchecked")
public class TestColumnReader {
Expand Down Expand Up @@ -97,7 +96,7 @@ public void testConstantVectors() {
StructField field = StructField.apply("f", type, false, null);

List<Object> values = Collections.singletonList(VALUES.get(i));
InternalRow row = GenericInternalRow.apply(JavaConverters.asScalaBuffer(values).toSeq());
InternalRow row = GenericInternalRow.apply(asScala(values).toSeq());
ConstantColumnReader reader = new ConstantColumnReader(field, BATCH_SIZE, row, 0, true);
reader.readBatch(BATCH_SIZE);
CometVector vector = reader.currentBatch();
Expand Down
6 changes: 6 additions & 0 deletions dev/ensure-jars-have-correct-contents.sh
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ allowed_expr+="|^org/apache/spark/CometPlugin.class$"
allowed_expr+="|^org/apache/spark/CometDriverPlugin.*$"
allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.class$"
allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.*$"
allowed_expr+="|^scala-collection-compat.properties$"
allowed_expr+="|^scala/$"
allowed_expr+="|^scala/annotation/"
allowed_expr+="|^scala/collection/"
allowed_expr+="|^scala/jdk/"
allowed_expr+="|^scala/util/"

allowed_expr+=")"
declare -i bad_artifacts=0
Expand Down
12 changes: 12 additions & 0 deletions fuzz-testing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,23 @@ under the License.
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-collection-compat_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.datafusion</groupId>
<artifactId>comet-spark-spark${spark.version.short}_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-collection-compat_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.rogach</groupId>
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ under the License.
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-collection-compat_${scala.binary.version}</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down
1 change: 1 addition & 0 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ under the License.
<!-- Relocate Protobuf since Spark uses 2.5.0 while Comet uses 3.x -->
<include>com.google.protobuf:protobuf-java</include>
<include>com.google.guava:guava</include>
<include>org.scala-lang.modules:scala-collection-compat_${scala.binary.version}</include>
</includes>
</artifactSet>
<filters>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import scala.Option;
import scala.Product2;
import scala.collection.JavaConverters;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

Expand Down Expand Up @@ -81,6 +80,8 @@
import org.apache.comet.CometConf;
import org.apache.comet.Native;

import static scala.jdk.javaapi.CollectionConverters.*;

/**
* This is based on Spark {@link UnsafeShuffleWriter}, as a writer to write shuffling rows into
* Arrow format after sorting rows based on the partition ID.
Expand Down Expand Up @@ -201,7 +202,7 @@ public long getPeakMemoryUsedBytes() {
/** This convenience method should only be called in test code. */
@VisibleForTesting
public void write(Iterator<Product2<K, V>> records) throws IOException {
write(JavaConverters.asScalaIteratorConverter(records).asScala());
write(asScala(records));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.comet.parquet

import scala.collection.JavaConverters
import scala.jdk.CollectionConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.parquet.filter2.predicate.FilterApi
Expand Down Expand Up @@ -164,7 +164,7 @@ class CometParquetFileFormat(scanImpl: String)
datetimeRebaseSpec.mode == CORRECTED,
partitionSchema,
file.partitionValues,
JavaConverters.mapAsJavaMap(metrics))
metrics.asJava)
try {
batchReader.init()
} catch {
Expand Down Expand Up @@ -198,7 +198,7 @@ class CometParquetFileFormat(scanImpl: String)
datetimeRebaseSpec.mode == CORRECTED,
partitionSchema,
file.partitionValues,
JavaConverters.mapAsJavaMap(metrics))
metrics.asJava)
try {
batchReader.init()
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.apache.comet.parquet

import scala.collection.JavaConverters
import scala.collection.mutable
import scala.jdk.CollectionConverters._

import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate}
import org.apache.parquet.hadoop.ParquetInputFormat
Expand Down Expand Up @@ -139,7 +139,7 @@ case class CometParquetPartitionReaderFactory(
datetimeRebaseSpec.mode == CORRECTED,
partitionSchema,
file.partitionValues,
JavaConverters.mapAsJavaMap(metrics))
metrics.asJava)
val taskContext = Option(TaskContext.get)
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => cometReader.close()))
return cometReader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.comet.parquet

import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.jdk.CollectionConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import java.sql.{Date, Timestamp}
import java.time.{Duration, Instant, LocalDate, Period}
import java.util.Locale

import scala.collection.JavaConverters._
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.jdk.CollectionConverters._

import org.apache.parquet.column.statistics.{Statistics => ParquetStatistics}
import org.apache.parquet.filter2.predicate._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ package org.apache.comet.rules

import java.net.URI

import scala.collection.{mutable, JavaConverters}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -443,7 +444,7 @@ object CometScanRule extends Logging {
// previously validated
case _ =>
try {
val objectStoreOptions = JavaConverters.mapAsJavaMap(objectStoreConfigMap)
val objectStoreOptions = objectStoreConfigMap.asJava
Native.validateObjectStoreConfig(filePath, objectStoreOptions)
} catch {
case e: CometNativeException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.comet.serde

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.apache.spark.sql.execution.ProjectExec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.comet.serde

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.apache.spark.sql.execution.SortExec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.apache.comet.serde

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.comet.serde

import scala.collection.JavaConverters.asJavaIterableConverter
import scala.jdk.CollectionConverters._

import org.apache.spark.sql.catalyst.expressions.{Attribute, EvalMode}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, BitAndAgg, BitOrAgg, BitXorAgg, BloomFilterAggregate, CentralMomentAgg, Corr, Count, Covariance, CovPopulation, CovSample, First, Last, Max, Min, StddevPop, StddevSamp, Sum, VariancePop, VarianceSamp}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.comet.serde

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.apache.spark.sql.catalyst.expressions.{Attribute, CaseWhen, Coalesce, Expression, If, IsNotNull}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.comet.serde

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.apache.spark.sql.catalyst.expressions.{And, Attribute, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, InSet, IsNaN, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not, Or}
import org.apache.spark.sql.types.BooleanType
Expand Down
2 changes: 1 addition & 1 deletion spark/src/main/scala/org/apache/comet/serde/structs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.comet.serde

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.apache.spark.sql.catalyst.expressions.{Attribute, CreateNamedStruct, GetArrayStructFields, GetStructField, StructsToJson}
import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, StructType}
Expand Down
Loading
Loading