Skip to content

Commit 28dda55

Browse files
feat: Add unstable LazyFrame.sink_iceberg (#26799)
1 parent 8ddb5e1 commit 28dda55

File tree

17 files changed

+933
-134
lines changed

17 files changed

+933
-134
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/polars-core/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ pub use datatypes::SchemaExtPl;
4040
pub use hashing::IdBuildHasher;
4141
use rayon::{ThreadPool, ThreadPoolBuilder};
4242

43-
// A secret ID used to limit deserialization of raw pointers to those
44-
// generated by this instance of Polars.
43+
/// A secret ID used to limit deserialization of raw pointers to those
44+
/// generated by this instance of Polars.
4545
pub static PROCESS_ID: LazyLock<u128> = LazyLock::new(|| {
4646
let mut bytes = [0u8; 16];
4747
getrandom::fill(&mut bytes).unwrap();

crates/polars-plan/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ polars-utils = { workspace = true }
2828

2929
arrow = { workspace = true }
3030
bitflags = { workspace = true }
31-
blake3 = { workspace = true, optional = true }
31+
blake3 = { workspace = true }
3232
bytemuck = { workspace = true }
3333
bytes = { workspace = true, features = ["serde"] }
3434
chrono = { workspace = true, optional = true }
@@ -196,7 +196,7 @@ meta = []
196196
pivot = ["polars-core/rows", "polars-ops/pivot"]
197197
top_k = ["polars-ops/top_k"]
198198
semi_anti_join = ["polars-ops/semi_anti_join"]
199-
cse = ["blake3"]
199+
cse = []
200200
propagate_nans = ["polars-ops/propagate_nans"]
201201
coalesce = []
202202
fused = ["polars-ops/fused"]

crates/polars-plan/dsl-schema-hashes.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
"ExtraColumnsPolicy": "eb81efadce58eb148e658db4f2b5c1f38155d617431b81121043e9f9c21acd30",
6161
"Field": "dd95c2b6d7aa44004b900ef31fcf18e70f862d97488ef46c67b7c64c226b50d8",
6262
"Field2": "5a81d8772b4c18be0a0de8fc79d433d4b1d54b4008e211f2ca9217c15cf5611c",
63-
"FileProviderType": "330497d01ca3e4ab79e838481eeab485603958a74534e214a9962e396c5b1770",
63+
"FileProviderType": "1ddca3724d728cceed106b44479a92d317d8d612ef28e3e77797bef02d466090",
6464
"FileScanDsl": "ed77d7dc8af8845915011232bab3c31a90a1755c8e3897cd1473c435eb75accd",
6565
"FileSinkOptions": "edebcf5e3965add5e4fd1be14ca6bdddc55fa22e6e829dca04beb321de0c992c",
6666
"FileWriteFormat": "1a685aba7dd5d6c0aefc99a9060d1b57f166ea44ef57ad0d0d0c565dbabda811",
@@ -77,6 +77,7 @@
7777
"IcebergColumn": "171ff56c222358389754a7ff774eec6fc958478df2317720c63b4addc8f9a4c5",
7878
"IcebergColumnType": "e612983b0dfce78d172af2e4bb4726e3303ede09ea3c1de8ec40e12ee7922dac",
7979
"IcebergIdentityTransformedPartitionFields": "a9ea26367a6a3a97560aa9010f711a211cabfbffb6a318cb834ceccc672d3ae1",
80+
"IcebergPathProvider": "20732e7c4d3e6386d7e2a9675973ea4527a4e474a194c9a837a73839acbef715",
8081
"IcebergSchema": "2341b76e5aca7780e28fcee6bd7a2650ce7a9df61e043b839dd3e74bd95efb3b",
8182
"IntDataTypeExpr": "cd66dcd9c44cdddd8864c0fe642e5fcef5263f6f142cce906011a0180e0fd161",
8283
"IntegerType": "2e73fb811a2830b8b114dfe914512bfa6031325da9ea5513875a6e49b6ab1a58",

crates/polars-plan/src/dsl/options/file_provider.rs

Lines changed: 91 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::hash::Hash;
22
use std::sync::Arc;
33

44
use polars_core::frame::DataFrame;
5-
use polars_core::prelude::Column;
5+
use polars_core::prelude::{Column, DataType};
66
use polars_error::PolarsResult;
77
use polars_io::hive::HivePathFormatter;
88
use polars_io::utils::file::Writeable;
@@ -29,6 +29,7 @@ pub type FileProviderFunction = PlanCallback<FileProviderArgs, FileProviderRetur
2929
#[derive(Clone, Debug, Hash, PartialEq)]
3030
pub enum FileProviderType {
3131
Hive(HivePathProvider),
32+
Iceberg(IcebergPathProvider),
3233
Function(FileProviderFunction),
3334
}
3435

@@ -40,10 +41,28 @@ pub struct HivePathProvider {
4041
}
4142

4243
impl FileProviderType {
44+
/// Get a mutable reference to the file part prefix for this file provider.
45+
///
46+
/// File part prefixes are inserted after the partition prefix, before the file part number.
47+
///
48+
/// # Returns
49+
/// Returns `None` if this file provider does not support attaching file part prefixes.
50+
pub fn file_part_prefix_mut(&mut self) -> Option<&mut String> {
51+
use FileProviderType::*;
52+
53+
match self {
54+
Iceberg(p) => Some(p.file_part_prefix_mut()),
55+
Hive(_) | Function(_) => None,
56+
}
57+
}
58+
4359
pub fn get_path_or_file(&self, args: FileProviderArgs) -> PolarsResult<FileProviderReturn> {
60+
use FileProviderType::*;
61+
4462
match self {
45-
Self::Hive(v) => v.get_path(args).map(FileProviderReturn::Path),
46-
Self::Function(v) => v.get_path_or_file(args),
63+
Hive(p) => p.get_path(args).map(FileProviderReturn::Path),
64+
Iceberg(p) => p.get_path(args).map(FileProviderReturn::Path),
65+
Function(p) => p.get_path_or_file(args),
4766
}
4867
}
4968
}
@@ -59,22 +78,82 @@ impl HivePathProvider {
5978
partition_keys,
6079
} = args;
6180

62-
let mut partition_parts = String::new();
81+
let mut path = String::new();
6382

6483
let partition_keys: &[Column] = partition_keys.columns();
6584

66-
write!(
67-
&mut partition_parts,
68-
"{}",
69-
HivePathFormatter::new(partition_keys)
70-
)
71-
.unwrap();
85+
write!(&mut path, "{}", HivePathFormatter::new(partition_keys)).unwrap();
7286

7387
assert!(index_in_partition <= 0xffff_ffff);
7488

75-
write!(&mut partition_parts, "{index_in_partition:08x}.{extension}").unwrap();
89+
write!(&mut path, "{index_in_partition:08x}.{extension}").unwrap();
90+
91+
Ok(path)
92+
}
93+
}
94+
95+
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
96+
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
97+
#[derive(Clone, Debug, Hash, PartialEq)]
98+
pub struct IcebergPathProvider {
99+
pub extension: PlSmallStr,
100+
pub file_part_prefix: String,
101+
}
102+
103+
impl IcebergPathProvider {
104+
pub fn file_part_prefix_mut(&mut self) -> &mut String {
105+
&mut self.file_part_prefix
106+
}
107+
108+
/// # Panics
109+
/// Panics if `self.file_part_prefix` is `None`.
110+
pub fn get_path(&self, args: FileProviderArgs) -> PolarsResult<String> {
111+
use std::fmt::Write;
112+
113+
let IcebergPathProvider {
114+
extension,
115+
file_part_prefix,
116+
} = self;
117+
118+
assert!(!file_part_prefix.is_empty());
119+
120+
let FileProviderArgs {
121+
index_in_partition,
122+
partition_keys,
123+
} = args;
124+
125+
let mut partition_keys_hash = None;
126+
127+
if partition_keys.width() != 0 {
128+
let mut hasher = blake3::Hasher::new();
129+
130+
for column in partition_keys.columns() {
131+
let column = column.cast(&DataType::String).unwrap();
132+
133+
let value = column.str().unwrap().get(0);
134+
135+
hasher.update(&[value.is_some() as u8]);
136+
hasher.update(value.unwrap_or_default().as_bytes());
137+
}
138+
139+
partition_keys_hash = Some(hasher.finalize().to_hex());
140+
}
141+
142+
let partition_key_prefix: &str = partition_keys_hash.as_ref().map_or("", |x| &x[..32]);
143+
144+
let mut path = String::with_capacity(
145+
partition_key_prefix.len() + file_part_prefix.len() + 8 + 1 + extension.len(),
146+
);
147+
148+
assert!(index_in_partition <= 0xffff_ffff);
149+
150+
write!(
151+
&mut path,
152+
"{partition_key_prefix}{file_part_prefix}{index_in_partition:08x}.{extension}"
153+
)
154+
.unwrap();
76155

77-
Ok(partition_parts)
156+
Ok(path)
78157
}
79158
}
80159

crates/polars-python/src/io/sink_output.rs

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
use polars::prelude::file_provider::{FileProviderFunction, FileProviderType};
2-
use polars::prelude::{PartitionStrategy, PlRefPath, SinkDestination, SpecialEq};
1+
use polars::prelude::file_provider::{FileProviderFunction, FileProviderType, IcebergPathProvider};
2+
use polars::prelude::{PartitionStrategy, PlRefPath, PlSmallStr, SinkDestination, SpecialEq};
33
use polars_utils::IdxSize;
44
use polars_utils::python_function::PythonObject;
5+
use pyo3::exceptions::PyValueError;
56
use pyo3::intern;
67
use pyo3::prelude::*;
8+
use pyo3::pybacked::PyBackedStr;
79

810
use crate::PyExpr;
911
use crate::prelude::Wrap;
@@ -68,13 +70,40 @@ impl PyFileSinkDestination<'_> {
6870
PartitionStrategy::FileSize
6971
};
7072

73+
let file_path_provider = if let Some(file_path_provider) = file_path_provider {
74+
let py = self.0.py();
75+
76+
Some(
77+
match file_path_provider.getattr(py, intern!(py, "pl_path_provider_id")) {
78+
Ok(v) => match &*v.extract::<PyBackedStr>(py)? {
79+
"iceberg" => {
80+
let extension: PyBackedStr = file_path_provider
81+
.getattr(py, intern!(py, "extension"))?
82+
.extract(py)?;
83+
84+
FileProviderType::Iceberg(IcebergPathProvider {
85+
extension: PlSmallStr::from_str(&extension),
86+
file_part_prefix: String::new(),
87+
})
88+
},
89+
id => {
90+
return Err(PyValueError::new_err(format!(
91+
"unknown pl_path_provider_id: '{id}'"
92+
)));
93+
},
94+
},
95+
Err(_) => FileProviderType::Function(FileProviderFunction::Python(
96+
SpecialEq::new(PythonObject(file_path_provider).into()),
97+
)),
98+
},
99+
)
100+
} else {
101+
None
102+
};
103+
71104
Ok(SinkDestination::Partitioned {
72105
base_path: base_path.0,
73-
file_path_provider: file_path_provider.map(|x| {
74-
FileProviderType::Function(FileProviderFunction::Python(SpecialEq::new(
75-
PythonObject(x).into(),
76-
)))
77-
}),
106+
file_path_provider,
78107
partition_strategy,
79108
max_rows_per_file: max_rows_per_file.unwrap_or(IdxSize::MAX),
80109
approximate_bytes_per_file,

crates/polars-stream/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ serde = { workspace = true, optional = true }
3434
serde_json = { workspace = true, optional = true }
3535
slotmap = { workspace = true }
3636
tokio = { workspace = true, features = ["sync"] }
37+
uuid = { workspace = true }
3738

3839
arrow = { workspace = true }
3940
memchr = { workspace = true }

crates/polars-stream/src/nodes/io_sinks/components/file_provider.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ pub struct FileProvider {
2121
impl FileProvider {
2222
pub async fn open_file(&self, args: FileProviderArgs) -> PolarsResult<Writeable> {
2323
let provided_path: String = match &self.provider_type {
24-
FileProviderType::Hive(v) => v.get_path(args)?,
24+
FileProviderType::Hive(p) => p.get_path(args)?,
25+
FileProviderType::Iceberg(p) => p.get_path(args)?,
2526
FileProviderType::Function(f) => {
2627
let f = f.clone();
2728

crates/polars-stream/src/nodes/io_sinks/pipeline_initialization/partition_by.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub fn start_partition_sink_pipeline(
5555

5656
let PartitionedTarget {
5757
base_path,
58-
file_path_provider,
58+
mut file_path_provider,
5959
partitioner,
6060
hstack_keys,
6161
include_keys_in_file,
@@ -68,6 +68,13 @@ pub fn start_partition_sink_pipeline(
6868
let in_memory_exec_state = Arc::new(execution_state.in_memory_exec_state.clone());
6969
let io_metrics_is_some = io_metrics.is_some();
7070

71+
if let Some(file_part_prefix) = file_path_provider.file_part_prefix_mut() {
72+
use std::fmt::Write as _;
73+
let uuid = uuid::Uuid::new_v4();
74+
let uuid = uuid.as_simple();
75+
write!(file_part_prefix, "{uuid}").unwrap();
76+
}
77+
7178
let file_provider = Arc::new(FileProvider {
7279
base_path,
7380
cloud_options,
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
from polars.io.iceberg._dataset import IcebergCatalogConfig
12
from polars.io.iceberg.functions import scan_iceberg
23

3-
__all__ = ["scan_iceberg"]
4+
__all__ = [
5+
"IcebergCatalogConfig",
6+
"scan_iceberg",
7+
]

0 commit comments

Comments
 (0)