Skip to content

Commit 27c4d54

Browse files
committed
[WIP] opendal writes
1 parent c70d3c7 commit 27c4d54

File tree

7 files changed

+55
-55
lines changed

7 files changed

+55
-55
lines changed

native/Cargo.lock

Lines changed: 27 additions & 41 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/core/src/execution/operators/parquet_writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ impl ParquetWriterExec {
340340
.unwrap_or(output_file_path);
341341

342342
// Create output directory
343-
std::fs::create_dir_all(&local_path).map_err(|e| {
343+
std::fs::create_dir_all(local_path).map_err(|e| {
344344
DataFusionError::Execution(format!(
345345
"Failed to create output directory '{}': {}",
346346
local_path, e

native/core/src/lib.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,17 @@ use log4rs::{
3939
};
4040
use once_cell::sync::OnceCell;
4141

42-
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
42+
#[cfg(all(
43+
not(target_env = "msvc"),
44+
feature = "jemalloc",
45+
not(feature = "mimalloc")
46+
))]
4347
use tikv_jemallocator::Jemalloc;
4448

45-
#[cfg(feature = "mimalloc")]
49+
#[cfg(all(
50+
feature = "mimalloc",
51+
not(all(not(target_env = "msvc"), feature = "jemalloc"))
52+
))]
4653
use mimalloc::MiMalloc;
4754

4855
use errors::{try_unwrap_or_throw, CometError, CometResult};
@@ -55,11 +62,18 @@ pub mod execution;
5562
mod jvm_bridge;
5663
pub mod parquet;
5764

58-
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
65+
#[cfg(all(
66+
not(target_env = "msvc"),
67+
feature = "jemalloc",
68+
not(feature = "mimalloc")
69+
))]
5970
#[global_allocator]
6071
static GLOBAL: Jemalloc = Jemalloc;
6172

62-
#[cfg(feature = "mimalloc")]
73+
#[cfg(all(
74+
feature = "mimalloc",
75+
not(all(not(target_env = "msvc"), feature = "jemalloc"))
76+
))]
6377
#[global_allocator]
6478
static GLOBAL: MiMalloc = MiMalloc;
6579

native/core/src/parquet/parquet_support.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap<String, String>) ->
370370
}
371371

372372
// Creates an HDFS object store from a URL using the native HDFS implementation
373-
#[cfg(feature = "hdfs")]
373+
#[cfg(all(feature = "hdfs", not(feature = "hdfs-opendal")))]
374374
fn create_hdfs_object_store(
375375
url: &Url,
376376
) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {

native/fs-hdfs/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ publish = false
3232
readme = "README.md"
3333

3434
[lib]
35-
name = "hdfs"
35+
name = "fs_hdfs"
3636
path = "src/lib.rs"
3737

3838
[features]

native/hdfs/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ edition = { workspace = true }
3333

3434
[features]
3535
default = ["hdfs", "try_spawn_blocking"]
36-
hdfs = ["fs-hdfs"]
36+
hdfs = ["fs_hdfs"]
3737
hdfs3 = ["fs-hdfs3"]
3838
# Used for trying to spawn a blocking thread for implementing each object store interface when running in a tokio runtime
3939
try_spawn_blocking = []
@@ -42,7 +42,7 @@ try_spawn_blocking = []
4242
async-trait = { workspace = true }
4343
bytes = { workspace = true }
4444
chrono = { workspace = true }
45-
fs-hdfs = { version = "^0.1.12", optional = true }
45+
fs_hdfs = { package = "datafusion-comet-fs-hdfs3", path = "../fs-hdfs", optional = true }
4646
fs-hdfs3 = { version = "^0.1.12", optional = true }
4747
futures = { workspace = true }
4848
object_store = { workspace = true }

native/hdfs/src/object_store/hdfs.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ use std::sync::Arc;
2626
use async_trait::async_trait;
2727
use bytes::Bytes;
2828
use chrono::{DateTime, Utc};
29+
use fs_hdfs::hdfs::{get_hdfs_by_full_path, FileStatus, HdfsErr, HdfsFile, HdfsFs};
30+
use fs_hdfs::walkdir::HdfsWalkDir;
2931
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
30-
use hdfs::hdfs::{get_hdfs_by_full_path, FileStatus, HdfsErr, HdfsFile, HdfsFs};
31-
use hdfs::walkdir::HdfsWalkDir;
3232
use object_store::{
3333
path::{self, Path},
3434
Error, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload,
@@ -422,7 +422,7 @@ impl ObjectStore for HadoopFileSystem {
422422
hdfs.delete(&to, false).map_err(to_error)?;
423423
}
424424

425-
hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), &to)
425+
fs_hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), &to)
426426
.map_err(to_error)?;
427427

428428
Ok(())
@@ -437,7 +437,7 @@ impl ObjectStore for HadoopFileSystem {
437437
let to = HadoopFileSystem::path_to_filesystem(to);
438438

439439
maybe_spawn_blocking(move || {
440-
hdfs.rename(&from, &to).map_err(to_error)?;
440+
hdfs.rename(&from, &to, true).map_err(to_error)?;
441441

442442
Ok(())
443443
})
@@ -459,7 +459,7 @@ impl ObjectStore for HadoopFileSystem {
459459
});
460460
}
461461

462-
hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), &to)
462+
fs_hdfs::util::HdfsUtil::copy(hdfs.as_ref(), &from, hdfs.as_ref(), &to)
463463
.map_err(to_error)?;
464464

465465
Ok(())

0 commit comments

Comments
 (0)