Skip to content

Commit 49fa287

Browse files
authored
feat: introduce hadoop mini cluster to test native scan on hdfs (apache#1556)
## Which issue does this PR close? Closes apache#1515. ## Rationale for this change test native scan on hdfs ## What changes are included in this PR? introduce hadoop mini cluster to test native scan on hdfs ## How are these changes tested? Successfully run `CometReadHdfsBenchmark` locally (tips: build native enable hdfs: `cd native && cargo build --features hdfs`)
1 parent 2b5b918 commit 49fa287

File tree

5 files changed

+210
-3
lines changed

5 files changed

+210
-3
lines changed

native/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ datafusion-comet-proto = { workspace = true }
6868
object_store = { workspace = true }
6969
url = { workspace = true }
7070
parking_lot = "0.12.3"
71-
datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true}
71+
datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default-features = false, features = ["hdfs"] }
7272

7373
[dev-dependencies]
7474
pprof = { version = "0.14.0", features = ["flamegraph"] }

pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ under the License.
5858
<protobuf.version>3.25.5</protobuf.version>
5959
<parquet.version>1.13.1</parquet.version>
6060
<parquet.maven.scope>provided</parquet.maven.scope>
61+
<hadoop.version>3.3.4</hadoop.version>
6162
<arrow.version>16.0.0</arrow.version>
6263
<codehaus.jackson.version>1.9.13</codehaus.jackson.version>
6364
<spotless.version>2.43.0</spotless.version>
@@ -447,6 +448,13 @@ under the License.
447448
<version>5.1.0</version>
448449
</dependency>
449450

451+
<dependency>
452+
<groupId>org.apache.hadoop</groupId>
453+
<artifactId>hadoop-client-minicluster</artifactId>
454+
<version>${hadoop.version}</version>
455+
<scope>test</scope>
456+
</dependency>
457+
450458
</dependencies>
451459

452460
</dependencyManagement>

spark/pom.xml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,30 @@ under the License.
141141
<artifactId>arrow-c-data</artifactId>
142142
<scope>test</scope>
143143
</dependency>
144+
<dependency>
145+
<groupId>org.apache.hadoop</groupId>
146+
<artifactId>hadoop-client-minicluster</artifactId>
147+
<scope>test</scope>
148+
<exclusions>
149+
<!-- hadoop clients are provided by spark -->
150+
<exclusion>
151+
<artifactId>hadoop-client-api</artifactId>
152+
<groupId>org.apache.hadoop</groupId>
153+
</exclusion>
154+
<exclusion>
155+
<artifactId>hadoop-client-runtime</artifactId>
156+
<groupId>org.apache.hadoop</groupId>
157+
</exclusion>
158+
<exclusion>
159+
<artifactId>snappy-java</artifactId>
160+
<groupId>org.xerial.snappy</groupId>
161+
</exclusion>
162+
<exclusion>
163+
<artifactId>junit</artifactId>
164+
<groupId>junit</groupId>
165+
</exclusion>
166+
</exclusions>
167+
</dependency>
144168
</dependencies>
145169

146170
<build>
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet
21+
22+
import java.io.{File, FileWriter}
23+
import java.net.InetAddress
24+
import java.nio.file.Files
25+
import java.util.UUID
26+
27+
import scala.collection.JavaConverters._
28+
29+
import org.apache.commons.io.FileUtils
30+
import org.apache.hadoop.conf.Configuration
31+
import org.apache.hadoop.fs.{FileSystem, Path}
32+
import org.apache.hadoop.hdfs.MiniDFSCluster
33+
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys
34+
import org.apache.spark.internal.Logging
35+
36+
/**
37+
* Trait for starting and stopping a MiniDFSCluster for testing.
38+
*
39+
* Most copy from:
40+
* https://github.com/apache/kyuubi/blob/master/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala
41+
*/
42+
trait WithHdfsCluster extends Logging {
43+
44+
private var hadoopConfDir: File = _
45+
private var hdfsCluster: MiniDFSCluster = _
46+
private var hdfsConf: Configuration = _
47+
private var tmpRootDir: Path = _
48+
private var fileSystem: FileSystem = _
49+
50+
def startHdfsCluster(): Unit = {
51+
hdfsConf = new Configuration()
52+
// before HADOOP-18206 (3.4.0), HDFS MetricsLogger strongly depends on
53+
// commons-logging, we should disable it explicitly, otherwise, it throws
54+
// ClassNotFound: org.apache.commons.logging.impl.Log4JLogger
55+
hdfsConf.set("dfs.namenode.metrics.logger.period.seconds", "0")
56+
hdfsConf.set("dfs.datanode.metrics.logger.period.seconds", "0")
57+
// Set bind host to localhost to avoid java.net.BindException
58+
hdfsConf.setIfUnset("dfs.namenode.rpc-bind-host", "localhost")
59+
60+
hdfsCluster = new MiniDFSCluster.Builder(hdfsConf)
61+
.checkDataNodeAddrConfig(true)
62+
.checkDataNodeHostConfig(true)
63+
.build()
64+
logInfo(
65+
"NameNode address in configuration is " +
66+
s"${hdfsConf.get(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY)}")
67+
hadoopConfDir =
68+
Files.createTempDirectory(s"comet_hdfs_conf_${UUID.randomUUID().toString}").toFile
69+
saveHadoopConf(hadoopConfDir)
70+
71+
fileSystem = hdfsCluster.getFileSystem
72+
tmpRootDir = new Path("/tmp")
73+
fileSystem.mkdirs(tmpRootDir)
74+
}
75+
76+
def stopHdfsCluster(): Unit = {
77+
if (hdfsCluster != null) hdfsCluster.shutdown(true)
78+
if (hadoopConfDir != null) FileUtils.deleteDirectory(hadoopConfDir)
79+
}
80+
81+
private def saveHadoopConf(hadoopConfDir: File): Unit = {
82+
val configToWrite = new Configuration(false)
83+
val hostName = InetAddress.getLocalHost.getHostName
84+
hdfsConf.iterator().asScala.foreach { kv =>
85+
val key = kv.getKey
86+
val value = kv.getValue.replaceAll(hostName, "localhost")
87+
configToWrite.set(key, value)
88+
}
89+
val file = new File(hadoopConfDir, "core-site.xml")
90+
val writer = new FileWriter(file)
91+
configToWrite.writeXml(writer)
92+
writer.close()
93+
}
94+
95+
def getHadoopConf: Configuration = hdfsConf
96+
def getDFSPort: Int = hdfsCluster.getNameNodePort
97+
def getHadoopConfDir: String = hadoopConfDir.getAbsolutePath
98+
def getHadoopConfFile: Path = new Path(hadoopConfDir.toURI.toURL.toString, "core-site.xml")
99+
def getTmpRootDir: Path = tmpRootDir
100+
def getFileSystem: FileSystem = fileSystem
101+
102+
def withTmpHdfsDir(tmpDir: Path => Unit): Unit = {
103+
val tempPath = new Path(tmpRootDir, UUID.randomUUID().toString)
104+
fileSystem.mkdirs(tempPath)
105+
try tmpDir(tempPath)
106+
finally fileSystem.delete(tempPath, true)
107+
}
108+
109+
}

spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@ import java.io.File
2424
import scala.collection.JavaConverters._
2525
import scala.util.Random
2626

27+
import org.apache.hadoop.fs.Path
2728
import org.apache.spark.TestUtils
2829
import org.apache.spark.benchmark.Benchmark
30+
import org.apache.spark.sql.{DataFrame, SparkSession}
2931
import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
3032
import org.apache.spark.sql.types._
3133
import org.apache.spark.sql.vectorized.ColumnVector
3234

33-
import org.apache.comet.CometConf
35+
import org.apache.comet.{CometConf, WithHdfsCluster}
3436
import org.apache.comet.CometConf.{SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT}
3537
import org.apache.comet.parquet.BatchReader
3638

@@ -40,7 +42,7 @@ import org.apache.comet.parquet.BatchReader
4042
* benchmark-org.apache.spark.sql.benchmark.CometReadBenchmark` Results will be written to
4143
* "spark/benchmarks/CometReadBenchmark-**results.txt".
4244
*/
43-
object CometReadBenchmark extends CometBenchmarkBase {
45+
class CometReadBaseBenchmark extends CometBenchmarkBase {
4446

4547
def numericScanBenchmark(values: Int, dataType: DataType): Unit = {
4648
// Benchmarks running through spark sql.
@@ -71,6 +73,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
7173
sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
7274
withSQLConf(
7375
CometConf.COMET_ENABLED.key -> "true",
76+
CometConf.COMET_EXEC_ENABLED.key -> "true",
7477
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
7578
spark.sql(s"select $query from parquetV1Table").noop()
7679
}
@@ -79,6 +82,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
7982
sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
8083
withSQLConf(
8184
CometConf.COMET_ENABLED.key -> "true",
85+
CometConf.COMET_EXEC_ENABLED.key -> "true",
8286
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
8387
spark.sql(s"select $query from parquetV1Table").noop()
8488
}
@@ -118,6 +122,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
118122
sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
119123
withSQLConf(
120124
CometConf.COMET_ENABLED.key -> "true",
125+
CometConf.COMET_EXEC_ENABLED.key -> "true",
121126
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
122127
spark.sql("select sum(id) from parquetV1Table").noop()
123128
}
@@ -126,6 +131,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
126131
sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
127132
withSQLConf(
128133
CometConf.COMET_ENABLED.key -> "true",
134+
CometConf.COMET_EXEC_ENABLED.key -> "true",
129135
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
130136
spark.sql("select sum(id) from parquetV1Table").noop()
131137
}
@@ -244,6 +250,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
244250
benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
245251
withSQLConf(
246252
CometConf.COMET_ENABLED.key -> "true",
253+
CometConf.COMET_EXEC_ENABLED.key -> "true",
247254
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
248255
spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 0").noop()
249256
}
@@ -252,6 +259,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
252259
benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
253260
withSQLConf(
254261
CometConf.COMET_ENABLED.key -> "true",
262+
CometConf.COMET_EXEC_ENABLED.key -> "true",
255263
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
256264
spark.sql("select sum(c2) from parquetV1Table where c1 + 1 > 0").noop()
257265
}
@@ -300,6 +308,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
300308
sqlBenchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
301309
withSQLConf(
302310
CometConf.COMET_ENABLED.key -> "true",
311+
CometConf.COMET_EXEC_ENABLED.key -> "true",
303312
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
304313
spark.sql("select sum(length(id)) from parquetV1Table").noop()
305314
}
@@ -308,6 +317,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
308317
sqlBenchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
309318
withSQLConf(
310319
CometConf.COMET_ENABLED.key -> "true",
320+
CometConf.COMET_EXEC_ENABLED.key -> "true",
311321
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
312322
spark.sql("select sum(length(id)) from parquetV1Table").noop()
313323
}
@@ -352,6 +362,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
352362
benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
353363
withSQLConf(
354364
CometConf.COMET_ENABLED.key -> "true",
365+
CometConf.COMET_EXEC_ENABLED.key -> "true",
355366
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
356367
spark
357368
.sql("select sum(length(c2)) from parquetV1Table where c1 is " +
@@ -363,6 +374,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
363374
benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
364375
withSQLConf(
365376
CometConf.COMET_ENABLED.key -> "true",
377+
CometConf.COMET_EXEC_ENABLED.key -> "true",
366378
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
367379
spark
368380
.sql("select sum(length(c2)) from parquetV1Table where c1 is " +
@@ -403,6 +415,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
403415
benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
404416
withSQLConf(
405417
CometConf.COMET_ENABLED.key -> "true",
418+
CometConf.COMET_EXEC_ENABLED.key -> "true",
406419
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
407420
spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop()
408421
}
@@ -411,6 +424,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
411424
benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
412425
withSQLConf(
413426
CometConf.COMET_ENABLED.key -> "true",
427+
CometConf.COMET_EXEC_ENABLED.key -> "true",
414428
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
415429
spark.sql(s"SELECT sum(c$middle) FROM parquetV1Table").noop()
416430
}
@@ -452,6 +466,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
452466
benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
453467
withSQLConf(
454468
CometConf.COMET_ENABLED.key -> "true",
469+
CometConf.COMET_EXEC_ENABLED.key -> "true",
455470
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
456471
spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
457472
}
@@ -460,6 +475,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
460475
benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
461476
withSQLConf(
462477
CometConf.COMET_ENABLED.key -> "true",
478+
CometConf.COMET_EXEC_ENABLED.key -> "true",
463479
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
464480
spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
465481
}
@@ -501,6 +517,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
501517
benchmark.addCase("SQL Parquet - Comet Native DataFusion") { _ =>
502518
withSQLConf(
503519
CometConf.COMET_ENABLED.key -> "true",
520+
CometConf.COMET_EXEC_ENABLED.key -> "true",
504521
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_DATAFUSION) {
505522
spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
506523
}
@@ -509,6 +526,7 @@ object CometReadBenchmark extends CometBenchmarkBase {
509526
benchmark.addCase("SQL Parquet - Comet Native Iceberg Compat") { _ =>
510527
withSQLConf(
511528
CometConf.COMET_ENABLED.key -> "true",
529+
CometConf.COMET_EXEC_ENABLED.key -> "true",
512530
CometConf.COMET_NATIVE_SCAN_IMPL.key -> SCAN_NATIVE_ICEBERG_COMPAT) {
513531
spark.sql("SELECT * FROM parquetV1Table WHERE c1 + 1 > 0").noop()
514532
}
@@ -587,3 +605,51 @@ object CometReadBenchmark extends CometBenchmarkBase {
587605
}
588606
}
589607
}
608+
609+
object CometReadBenchmark extends CometReadBaseBenchmark {}
610+
611+
object CometReadHdfsBenchmark extends CometReadBaseBenchmark with WithHdfsCluster {
612+
613+
override def getSparkSession: SparkSession = {
614+
// start HDFS cluster and add hadoop conf
615+
startHdfsCluster()
616+
val sparkSession = super.getSparkSession
617+
sparkSession.sparkContext.hadoopConfiguration.addResource(getHadoopConfFile)
618+
sparkSession
619+
}
620+
621+
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
622+
try {
623+
super.runCometBenchmark(mainArgs)
624+
} finally {
625+
stopHdfsCluster()
626+
}
627+
}
628+
629+
override def readerBenchmark(values: Int, dataType: DataType): Unit = {
630+
// ignore reader benchmark for HDFS
631+
}
632+
633+
// mock local dir to hdfs
634+
override protected def withTempPath(f: File => Unit): Unit = {
635+
super.withTempPath { dir =>
636+
val tempHdfsPath = new Path(getTmpRootDir, dir.getName)
637+
getFileSystem.mkdirs(tempHdfsPath)
638+
try f(dir)
639+
finally getFileSystem.delete(tempHdfsPath, true)
640+
}
641+
}
642+
override protected def prepareTable(
643+
dir: File,
644+
df: DataFrame,
645+
partition: Option[String]): Unit = {
646+
val testDf = if (partition.isDefined) {
647+
df.write.partitionBy(partition.get)
648+
} else {
649+
df.write
650+
}
651+
val tempHdfsPath = getFileSystem.resolvePath(new Path(getTmpRootDir, dir.getName))
652+
val parquetV1Path = new Path(tempHdfsPath, "parquetV1")
653+
saveAsParquetV1Table(testDf, parquetV1Path.toString)
654+
}
655+
}

0 commit comments

Comments
 (0)