Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1963,6 +1963,7 @@ class LogicalPlanBuilder:
self,
root_dir: str,
write_mode: WriteMode,
write_success_file: bool,
file_format: FileFormat,
file_format_options: PyFormatSinkOption | None = None,
partition_cols: list[PyExpr] | None = None,
Expand Down
2 changes: 2 additions & 0 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ def write_parquet(
root_dir: str | pathlib.Path,
compression: str = "snappy",
write_mode: Literal["append", "overwrite", "overwrite-partitions"] = "append",
write_success_file: bool = False,
partition_cols: list[ColumnInputType] | None = None,
io_config: IOConfig | None = None,
) -> "DataFrame":
Expand Down Expand Up @@ -825,6 +826,7 @@ def write_parquet(
root_dir=root_dir,
partition_cols=cols,
write_mode=WriteMode.from_str(write_mode),
write_success_file=write_success_file,
file_format=FileFormat.Parquet,
compression=compression,
io_config=io_config,
Expand Down
2 changes: 2 additions & 0 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ def write_tabular(
write_mode: WriteMode,
file_format: FileFormat,
io_config: IOConfig,
write_success_file: bool = False,
file_format_option: PyFormatSinkOption | None = None,
partition_cols: list[Expression] | None = None,
compression: str | None = None,
Expand All @@ -342,6 +343,7 @@ def write_tabular(
builder = self._builder.table_write(
str(root_dir),
write_mode,
write_success_file,
file_format,
file_format_option,
part_cols_pyexprs,
Expand Down
20 changes: 19 additions & 1 deletion src/daft-local-execution/src/sinks/commit_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl BlockingSink for CommitWriteSink {
overwrite_files(
root_uri.to_string(),
written_paths,
get_io_client(true, file_info.io_config.unwrap_or_default().into())?,
get_io_client(true, file_info.io_config.clone().unwrap_or_default().into())?,
matches!(file_info.write_mode, WriteMode::OverwritePartitions),
)
.await?;
Expand All @@ -161,6 +161,24 @@ impl BlockingSink for CommitWriteSink {
written_file_path_record_batches.into(),
None,
);

// Create _SUCCESS file if write_success_file is true
if file_info.write_success_file {
let (_, root_uri) = parse_url(&file_info.root_dir)?;
let io_config_clone = file_info.io_config.clone();
let io_client = get_io_client(true, io_config_clone.unwrap_or_default().into())?;
let source = io_client.get_source(&root_uri).await?;
let success_file_path = format!("{}/_SUCCESS", root_uri.trim_end_matches('/'));

if let Err(e) = source.put(&success_file_path, tokio_util::bytes::Bytes::new(), None).await {
log::warn!(
"Failed to create _SUCCESS file at {}: {}.",
success_file_path,
e
);
}
}

Ok(BlockingSinkFinalizeOutput::Finished(vec![Arc::new(
written_file_paths_mp,
)]))
Expand Down
5 changes: 5 additions & 0 deletions src/daft-logical-plan/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,7 @@ impl LogicalPlanBuilder {
&self,
root_dir: &str,
write_mode: WriteMode,
write_success_file: bool,
file_format: FileFormat,
format_option: Option<FormatSinkOption>,
partition_cols: Option<Vec<ExprRef>>,
Expand All @@ -725,6 +726,7 @@ impl LogicalPlanBuilder {
partition_cols,
compression,
io_config,
write_success_file,
));

let logical_plan: LogicalPlan =
Expand Down Expand Up @@ -1371,6 +1373,7 @@ impl PyLogicalPlanBuilder {
#[pyo3(signature = (
root_dir,
write_mode,
write_success_file,
file_format,
format_option=None,
partition_cols=None,
Expand All @@ -1381,6 +1384,7 @@ impl PyLogicalPlanBuilder {
&self,
root_dir: &str,
write_mode: WriteMode,
write_success_file: bool,
file_format: FileFormat,
format_option: Option<PyFormatSinkOption>,
partition_cols: Option<Vec<PyExpr>>,
Expand All @@ -1392,6 +1396,7 @@ impl PyLogicalPlanBuilder {
.table_write(
root_dir,
write_mode,
write_success_file,
file_format,
format_option.map(|p| p.inner),
partition_cols.map(pyexprs_to_exprs),
Expand Down
5 changes: 5 additions & 0 deletions src/daft-logical-plan/src/sink_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct OutputFileInfo<E = ExprRef> {
pub partition_cols: Option<Vec<E>>,
pub compression: Option<String>,
pub io_config: Option<IOConfig>,
pub write_success_file: bool,
}

#[cfg(feature = "python")]
Expand Down Expand Up @@ -185,6 +186,7 @@ impl<E> OutputFileInfo<E>
where
E: ToString,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
root_dir: String,
write_mode: WriteMode,
Expand All @@ -193,6 +195,7 @@ where
partition_cols: Option<Vec<E>>,
compression: Option<String>,
io_config: Option<IOConfig>,
write_success_file: bool,
) -> Self {
Self {
root_dir,
Expand All @@ -202,6 +205,7 @@ where
partition_cols,
compression,
io_config,
write_success_file,
}
}

Expand Down Expand Up @@ -256,6 +260,7 @@ impl OutputFileInfo {
.transpose()?,
compression: self.compression,
io_config: self.io_config,
write_success_file: self.write_success_file,
})
}
}
Expand Down
59 changes: 59 additions & 0 deletions tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,3 +468,62 @@ def test_write_and_read_empty_parquet(tmp_path_factory):
df.write_parquet(empty_parquet_files, write_mode="overwrite")

assert daft.read_parquet(empty_parquet_files).to_pydict() == {"a": []}


def test_write_parquet_success_file(tmp_path_factory):
import os

# Test with write_success_file=True
output_dir = str(tmp_path_factory.mktemp("parquet_success_file_true"))
df = daft.from_pydict({"a": [1, 2, 3]})
df.write_parquet(output_dir, write_mode="overwrite", write_success_file=True)

success_file_path = os.path.join(output_dir, "_SUCCESS")
assert os.path.exists(success_file_path), f"_SUCCESS file not found at {success_file_path}"
assert os.path.getsize(success_file_path) == 0, (
f"_SUCCESS file should be empty, but has size {os.path.getsize(success_file_path)}"
)

# Test with write_success_file=False (default)
output_dir = str(tmp_path_factory.mktemp("parquet_success_file_false"))
df = daft.from_pydict({"a": [1, 2, 3]})
df.write_parquet(output_dir, write_mode="overwrite", write_success_file=False)

success_file_path = os.path.join(output_dir, "_SUCCESS")
assert not os.path.exists(success_file_path), f"_SUCCESS file should not exist at {success_file_path}"

# Test with append mode
output_dir = str(tmp_path_factory.mktemp("parquet_success_file_append"))
df = daft.from_pydict({"a": [1, 2, 3]})
df.write_parquet(output_dir, write_mode="append", write_success_file=True)

success_file_path = os.path.join(output_dir, "_SUCCESS")
assert os.path.exists(success_file_path), f"_SUCCESS file not found at {success_file_path}"

# Test with overwrite-partitions mode
output_dir = str(tmp_path_factory.mktemp("parquet_success_file_overwrite_partitions"))
df = daft.from_pydict({"a": [1, 2, 3], "b": ["x", "y", "z"]})
df.write_parquet(output_dir, write_mode="overwrite-partitions", write_success_file=True, partition_cols=["b"])

success_file_path = os.path.join(output_dir, "_SUCCESS")
assert os.path.exists(success_file_path), f"_SUCCESS file not found at {success_file_path}"
assert os.path.getsize(success_file_path) == 0, (
f"_SUCCESS file should be empty, but has size {os.path.getsize(success_file_path)}"
)

assert os.path.exists(os.path.join(output_dir, "b=x")), "Partition directory b=x not found"
assert os.path.exists(os.path.join(output_dir, "b=y")), "Partition directory b=y not found"

# Test with partition columns
output_dir = str(tmp_path_factory.mktemp("parquet_success_file_partitioned"))
df = daft.from_pydict({"a": [1, 2, 3], "b": ["x", "y", "z"]})
df.write_parquet(output_dir, write_mode="overwrite", write_success_file=True, partition_cols=["b"])

success_file_path = os.path.join(output_dir, "_SUCCESS")
assert os.path.exists(success_file_path), f"_SUCCESS file not found at {success_file_path}"
assert os.path.getsize(success_file_path) == 0, (
f"_SUCCESS file should be empty, but has size {os.path.getsize(success_file_path)}"
)

assert os.path.exists(os.path.join(output_dir, "b=x")), "Partition directory b=x not found"
assert os.path.exists(os.path.join(output_dir, "b=y")), "Partition directory b=y not found"
Loading