Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,16 @@ object CometConf extends ShimCometConf {
.longConf
.createWithDefault(3000L)

val COMET_LIBHDFS_SCHEMES_KEY = "fs.comet.libhdfs.schemes"

val COMET_LIBHDFS_SCHEMES: OptionalConfigEntry[String] =
conf(s"spark.hadoop.$COMET_LIBHDFS_SCHEMES_KEY")
.doc(
"Defines filesystem schemes (e.g., hdfs, webhdfs) that the native side accesses " +
"via libhdfs, separated by commas.")
.stringConf
.createOptional

/** Create a config to enable a specific operator */
private def createExecEnabledConfig(
exec: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ package org.apache.comet.objectstore
import java.net.URI
import java.util.Locale

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration

import org.apache.comet.CometConf.COMET_LIBHDFS_SCHEMES_KEY

object NativeConfig {

private val objectStoreConfigPrefixes = Map(
Expand Down Expand Up @@ -55,16 +58,22 @@ object NativeConfig {
def extractObjectStoreOptions(hadoopConf: Configuration, uri: URI): Map[String, String] = {
val scheme = uri.getScheme.toLowerCase(Locale.ROOT)

import scala.collection.JavaConverters._
val options = scala.collection.mutable.Map[String, String]()

// The schemes will use libhdfs
val libhdfsSchemes = hadoopConf.get(COMET_LIBHDFS_SCHEMES_KEY)
if (StringUtils.isNotBlank(libhdfsSchemes)) {
options(COMET_LIBHDFS_SCHEMES_KEY) = libhdfsSchemes
}

// Get prefixes for this scheme, return early if none found
val prefixes = objectStoreConfigPrefixes.get(scheme)
if (prefixes.isEmpty) {
return Map.empty[String, String]
return options.toMap
}

import scala.collection.JavaConverters._

// Extract all configurations that match the object store prefixes
val options = scala.collection.mutable.Map[String, String]()
hadoopConf.iterator().asScala.foreach { entry =>
val key = entry.getKey
val value = entry.getValue
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,5 @@ Comet provides the following configuration settings.
| spark.comet.shuffle.preferDictionary.ratio | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 |
| spark.comet.shuffle.sizeInBytesMultiplier | Comet reports smaller sizes for shuffle due to using Arrow's columnar memory format and this can result in Spark choosing a different join strategy due to the estimated size of the exchange being smaller. Comet will multiple sizeInBytes by this amount to avoid regressions in join strategy. | 1.0 |
| spark.comet.sparkToColumnar.supportedOperatorList | A comma-separated list of operators that will be converted to Arrow columnar format when 'spark.comet.sparkToColumnar.enabled' is true | Range,InMemoryTableScan |
| spark.hadoop.fs.comet.libhdfs.schemes | Defines filesystem schemes (e.g., hdfs, webhdfs) that the native side accesses via libhdfs, separated by commas. | |
<!--END:CONFIG_TABLE-->
16 changes: 14 additions & 2 deletions native/core/src/parquet/parquet_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,17 @@ fn value_field(entries_field: &FieldRef) -> Option<FieldRef> {
}
}

fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap<String, String>) -> bool {
const COMET_LIBHDFS_SCHEMES_KEY: &str = "fs.comet.libhdfs.schemes";
let scheme = url.scheme();
if let Some(libhdfs_schemes) = object_store_configs.get(COMET_LIBHDFS_SCHEMES_KEY) {
use itertools::Itertools;
libhdfs_schemes.split(",").contains(scheme)
} else {
scheme == "hdfs"
}
}

// Mirrors object_store::parse::parse_url for the hdfs object store
#[cfg(feature = "hdfs")]
fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
Expand Down Expand Up @@ -369,8 +380,9 @@ pub(crate) fn prepare_object_store_with_configs(
) -> Result<(ObjectStoreUrl, Path), ExecutionError> {
let mut url = Url::parse(url.as_str())
.map_err(|e| ExecutionError::GeneralError(format!("Error parsing URL {url}: {e}")))?;
let is_hdfs_scheme = is_hdfs_scheme(&url, object_store_configs);
let mut scheme = url.scheme();
if scheme == "s3a" {
if !is_hdfs_scheme && scheme == "s3a" {
scheme = "s3";
url.set_scheme("s3").map_err(|_| {
ExecutionError::GeneralError("Could not convert scheme from s3a to s3".to_string())
Expand All @@ -382,7 +394,7 @@ pub(crate) fn prepare_object_store_with_configs(
&url[url::Position::BeforeHost..url::Position::AfterPort],
);

let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if scheme == "hdfs" {
let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if is_hdfs_scheme {
parse_hdfs_url(&url)
} else if scheme == "s3" {
objectstore::s3::create_store(&url, object_store_configs, Duration::from_secs(300))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.hadoop.fs;

import java.net.URI;

import org.apache.hadoop.fs.RawLocalFileSystem;

public class FakeHDFSFileSystem extends RawLocalFileSystem {

public static final String PREFIX = "fake://fake-bucket";

public FakeHDFSFileSystem() {
// Avoid `URI scheme is not "file"` error on
// RawLocalFileSystem$DeprecatedRawLocalFileStatus.getOwner
RawLocalFileSystem.useStatIfAvailable();
}

@Override
public String getScheme() {
return "fake";
}

@Override
public URI getUri() {
return URI.create(PREFIX);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.parquet

import java.io.File
import java.nio.file.Files
import java.util.UUID

import org.apache.commons.io.FileUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{CometTestBase, DataFrame, SaveMode}
import org.apache.spark.sql.comet.CometNativeScanExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions.{col, sum}

import org.apache.comet.CometConf
import org.apache.comet.hadoop.fs.FakeHDFSFileSystem

class ParquetReadFromFakeHadoopFsSuite extends CometTestBase with AdaptiveSparkPlanHelper {

private var fake_root_dir: File = _

override protected def sparkConf: SparkConf = {
val conf = super.sparkConf
conf.set("spark.hadoop.fs.fake.impl", "org.apache.comet.hadoop.fs.FakeHDFSFileSystem")
conf.set("spark.hadoop.fs.defaultFS", FakeHDFSFileSystem.PREFIX)
conf.set(CometConf.COMET_LIBHDFS_SCHEMES.key, "fake,hdfs")
}

override def beforeAll(): Unit = {
// Initialize fake root dir
fake_root_dir = Files.createTempDirectory(s"comet_fake_${UUID.randomUUID().toString}").toFile
// Initialize Spark session
super.beforeAll()
}

protected override def afterAll(): Unit = {
if (fake_root_dir != null) FileUtils.deleteDirectory(fake_root_dir)
super.afterAll()
}

private def writeTestParquetFile(filePath: String): Unit = {
val df = spark.range(0, 1000)
df.write.format("parquet").mode(SaveMode.Overwrite).save(filePath)
}

private def assertCometNativeScanOnFakeFs(df: DataFrame): Unit = {
val scans = collect(df.queryExecution.executedPlan) { case p: CometNativeScanExec =>
p
}
assert(scans.size == 1)
assert(
scans.head.nativeOp.getNativeScan
.getFilePartitions(0)
.getPartitionedFile(0)
.getFilePath
.startsWith(FakeHDFSFileSystem.PREFIX))
}

test("test native_datafusion scan on fake fs") {
val testFilePath =
s"${FakeHDFSFileSystem.PREFIX}${fake_root_dir.getAbsolutePath}/data/test-file.parquet"
writeTestParquetFile(testFilePath)
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) {
val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")))
assertCometNativeScanOnFakeFs(df)
assert(df.first().getLong(0) == 499500)
}
}
}
Loading