Skip to content

Commit cef3e05

Browse files
test: fix Spark 3.5 tests (apache#1482)
1 parent 9152dc2 commit cef3e05

File tree

10 files changed

+266
-132
lines changed

10 files changed

+266
-132
lines changed

.github/workflows/spark_sql_test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ jobs:
4545
matrix:
4646
os: [ubuntu-24.04]
4747
java-version: [11]
48-
spark-version: [{short: '3.4', full: '3.4.3'}, {short: '3.5', full: '3.5.1'}]
48+
spark-version: [{short: '3.4', full: '3.4.3'}, {short: '3.5', full: '3.5.4'}]
4949
module:
5050
- {name: "catalyst", args1: "catalyst/test", args2: ""}
5151
- {name: "sql/core-1", args1: "", args2: sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest}

common/src/main/java/org/apache/comet/parquet/BatchReader.java

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121

2222
import java.io.Closeable;
2323
import java.io.IOException;
24-
import java.lang.reflect.InvocationTargetException;
25-
import java.lang.reflect.Method;
2624
import java.net.URI;
2725
import java.net.URISyntaxException;
2826
import java.util.Arrays;
@@ -35,8 +33,6 @@
3533
import java.util.concurrent.LinkedBlockingQueue;
3634

3735
import scala.Option;
38-
import scala.collection.Seq;
39-
import scala.collection.mutable.Buffer;
4036

4137
import org.slf4j.Logger;
4238
import org.slf4j.LoggerFactory;
@@ -61,9 +57,9 @@
6157
import org.apache.parquet.schema.Type;
6258
import org.apache.spark.TaskContext;
6359
import org.apache.spark.TaskContext$;
64-
import org.apache.spark.executor.TaskMetrics;
6560
import org.apache.spark.sql.catalyst.InternalRow;
6661
import org.apache.spark.sql.comet.parquet.CometParquetReadSupport;
62+
import org.apache.spark.sql.comet.shims.ShimTaskMetrics;
6763
import org.apache.spark.sql.execution.datasources.PartitionedFile;
6864
import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
6965
import org.apache.spark.sql.execution.metric.SQLMetric;
@@ -350,7 +346,8 @@ public void init() throws URISyntaxException, IOException {
350346
// Note that this tries to get thread local TaskContext object, if this is called at other
351347
// thread, it won't update the accumulator.
352348
if (taskContext != null) {
353-
Option<AccumulatorV2<?, ?>> accu = getTaskAccumulator(taskContext.taskMetrics());
349+
Option<AccumulatorV2<?, ?>> accu =
350+
ShimTaskMetrics.getTaskAccumulator(taskContext.taskMetrics());
354351
if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) {
355352
@SuppressWarnings("unchecked")
356353
AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, Integer>) accu.get();
@@ -637,27 +634,4 @@ public Option<Throwable> call() throws Exception {
637634
}
638635
}
639636
}
640-
641-
// Signature of externalAccums changed from returning a Buffer to returning a Seq. If comet is
642-
// expecting a Buffer but the Spark version returns a Seq or vice versa, we get a
643-
// method not found exception.
644-
@SuppressWarnings("unchecked")
645-
private Option<AccumulatorV2<?, ?>> getTaskAccumulator(TaskMetrics taskMetrics) {
646-
Method externalAccumsMethod;
647-
try {
648-
externalAccumsMethod = TaskMetrics.class.getDeclaredMethod("externalAccums");
649-
externalAccumsMethod.setAccessible(true);
650-
String returnType = externalAccumsMethod.getReturnType().getName();
651-
if (returnType.equals("scala.collection.mutable.Buffer")) {
652-
return ((Buffer<AccumulatorV2<?, ?>>) externalAccumsMethod.invoke(taskMetrics))
653-
.lastOption();
654-
} else if (returnType.equals("scala.collection.Seq")) {
655-
return ((Seq<AccumulatorV2<?, ?>>) externalAccumsMethod.invoke(taskMetrics)).lastOption();
656-
} else {
657-
return Option.apply(null); // None
658-
}
659-
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
660-
return Option.apply(null); // None
661-
}
662-
}
663637
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.comet.shims
21+
22+
import org.apache.spark.executor.TaskMetrics
23+
import org.apache.spark.util.AccumulatorV2
24+
25+
object ShimTaskMetrics {
26+
27+
def getTaskAccumulator(taskMetrics: TaskMetrics): Option[AccumulatorV2[_, _]] =
28+
taskMetrics.externalAccums.lastOption
29+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.comet.shims
21+
22+
import org.apache.spark.executor.TaskMetrics
23+
import org.apache.spark.util.AccumulatorV2
24+
25+
object ShimTaskMetrics {
26+
27+
def getTaskAccumulator(taskMetrics: TaskMetrics): Option[AccumulatorV2[_, _]] =
28+
taskMetrics.externalAccums.lastOption
29+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.comet.shims
21+
22+
import scala.collection.mutable.ArrayBuffer
23+
24+
import org.apache.spark.executor.TaskMetrics
25+
import org.apache.spark.util.AccumulatorV2
26+
27+
object ShimTaskMetrics {
28+
29+
def getTaskAccumulator(taskMetrics: TaskMetrics): Option[AccumulatorV2[_, _]] =
30+
taskMetrics.withExternalAccums(identity[ArrayBuffer[AccumulatorV2[_, _]]](_)).lastOption
31+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark.sql.comet.shims
21+
22+
import org.apache.spark.executor.TaskMetrics
23+
import org.apache.spark.util.AccumulatorV2
24+
25+
object ShimTaskMetrics {
26+
27+
def getTaskAccumulator(taskMetrics: TaskMetrics): Option[AccumulatorV2[_, _]] =
28+
taskMetrics.externalAccums.lastOption
29+
}

0 commit comments

Comments
 (0)