Skip to content

Commit e79627c

Browse files
authored
feat: Iceberg Table supports equality delete of merge on read when reading (#18015)
* feat: feat: Iceberg Table supports equality delete of merge on read when reading * test: add iceberg driver for equality delete file * test: add a test to insert new data after deletion * chore: simplify code
1 parent a010d96 commit e79627c

File tree

21 files changed

+693
-97
lines changed

21 files changed

+693
-97
lines changed

src/query/expression/src/function.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ pub enum FunctionEval {
154154
},
155155
}
156156

157-
#[derive(Clone)]
157+
#[derive(Clone, Debug)]
158158
pub struct FunctionContext {
159159
pub tz: TimeZone,
160160
pub now: Zoned,

src/query/storages/iceberg/src/partition.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@
1313
// limitations under the License.
1414

1515
use databend_common_catalog::plan::PartInfo;
16+
use databend_common_storages_parquet::DeleteTask;
17+
use databend_common_storages_parquet::DeleteType;
1618
use databend_common_storages_parquet::ParquetFilePart;
1719
use databend_common_storages_parquet::ParquetPart;
1820
use databend_storages_common_stage::SingleFilePartition;
21+
use iceberg::spec::DataContentType;
1922
use iceberg::spec::DataFileFormat;
2023

2124
pub(crate) fn convert_file_scan_task(task: iceberg::scan::FileScanTask) -> Box<dyn PartInfo> {
@@ -42,7 +45,18 @@ pub(crate) fn convert_file_scan_task(task: iceberg::scan::FileScanTask) -> Box<d
4245
deletes: task
4346
.deletes
4447
.iter()
45-
.map(|file| file.file_path.clone())
48+
.filter_map(|file| {
49+
let ty = match file.file_type {
50+
DataContentType::Data => return None,
51+
DataContentType::PositionDeletes => DeleteType::Position,
52+
DataContentType::EqualityDeletes => DeleteType::Equality,
53+
};
54+
Some(DeleteTask {
55+
path: file.file_path.clone(),
56+
ty,
57+
equality_ids: file.equality_ids.clone(),
58+
})
59+
})
4660
.collect(),
4761
})
4862
} else {

src/query/storages/parquet/src/copy_into_table/reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl RowGroupReaderForCopy {
5454
InMemoryRowGroup::new(&part.location, op.clone(), &part.meta, None, *read_settings);
5555
let mut _sorter = None;
5656
self.row_group_reader_builder
57-
.fetch_and_build(row_group, None, &mut _sorter, None, batch_size)
57+
.fetch_and_build(row_group, None, &mut _sorter, None, batch_size, None)
5858
.await
5959
}
6060

src/query/storages/parquet/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ mod meta;
4444
mod schema;
4545

4646
pub use copy_into_table::ParquetTableForCopy;
47+
pub use parquet_part::DeleteTask;
48+
pub use parquet_part::DeleteType;
4749
pub use parquet_part::ParquetFilePart;
4850
pub use parquet_part::ParquetPart;
4951
pub use parquet_reader::InmMemoryFile;

src/query/storages/parquet/src/parquet_part.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,28 @@ use databend_common_exception::Result;
2727

2828
use crate::partition::ParquetRowGroupPart;
2929

30+
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq, Debug, Clone)]
31+
pub enum DeleteType {
32+
Position,
33+
Equality,
34+
}
35+
36+
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq, Debug, Clone)]
37+
pub struct DeleteTask {
38+
pub path: String,
39+
pub ty: DeleteType,
40+
/// equality ids for equality deletes (empty for positional deletes)
41+
pub equality_ids: Vec<i32>,
42+
}
43+
3044
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug, Clone)]
3145
pub enum ParquetPart {
3246
File(ParquetFilePart),
3347
SmallFiles(Vec<ParquetFilePart>),
3448
RowGroup(ParquetRowGroupPart),
3549
FileWithDeletes {
3650
inner: ParquetFilePart,
37-
deletes: Vec<String>,
51+
deletes: Vec<DeleteTask>,
3852
},
3953
}
4054

@@ -114,7 +128,7 @@ impl PartInfo for ParquetPart {
114128
let mut paths = Vec::with_capacity(deletes.len() + 1);
115129
paths.push(&inner.file);
116130
for delete in deletes {
117-
paths.push(delete);
131+
paths.push(&delete.path);
118132
}
119133
paths
120134
}

src/query/storages/parquet/src/parquet_reader/predicate.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use super::utils::transform_record_batch;
3939
use super::utils::FieldPaths;
4040
use crate::parquet_reader::utils::compute_output_field_paths;
4141

42+
#[derive(Debug)]
4243
pub struct ParquetPredicate {
4344
func_ctx: FunctionContext,
4445

src/query/storages/parquet/src/parquet_reader/read_policy/no_prefetch.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ use parquet::schema::types::SchemaDescriptor;
2828
use super::policy::ReadPolicy;
2929
use super::policy::ReadPolicyBuilder;
3030
use super::policy::ReadPolicyImpl;
31+
use crate::parquet_reader::predicate::ParquetPredicate;
32+
use crate::parquet_reader::read_policy::utils::read_all;
3133
use crate::parquet_reader::row_group::InMemoryRowGroup;
34+
use crate::parquet_reader::utils::bitmap_to_boolean_array;
3235
use crate::parquet_reader::utils::transform_record_batch;
3336
use crate::parquet_reader::utils::FieldPaths;
3437
use crate::transformer::RecordBatchTransformer;
@@ -45,11 +48,47 @@ impl ReadPolicyBuilder for NoPretchPolicyBuilder {
4548
async fn fetch_and_build(
4649
&self,
4750
mut row_group: InMemoryRowGroup<'_>,
48-
row_selection: Option<RowSelection>,
51+
mut row_selection: Option<RowSelection>,
4952
_sorter: &mut Option<TopKSorter>,
5053
transformer: Option<RecordBatchTransformer>,
5154
batch_size: usize,
55+
filter: Option<Arc<ParquetPredicate>>,
5256
) -> Result<Option<ReadPolicyImpl>> {
57+
if let Some(predicate) = filter {
58+
row_group
59+
.fetch(predicate.projection(), row_selection.as_ref())
60+
.await?;
61+
62+
let num_rows = row_selection
63+
.as_ref()
64+
.map(|x| x.row_count())
65+
.unwrap_or(row_group.row_count());
66+
67+
let block = read_all(
68+
&DataSchema::from(predicate.schema()),
69+
&row_group,
70+
predicate.field_levels(),
71+
row_selection.clone(),
72+
predicate.field_paths(),
73+
num_rows,
74+
)?;
75+
let filter = predicate.evaluate_block(&block)?;
76+
if filter.null_count() == num_rows {
77+
// All rows in current row group are filtered out.
78+
return Ok(None);
79+
}
80+
let filter = bitmap_to_boolean_array(filter);
81+
let sel = RowSelection::from_filters(&[filter]);
82+
match row_selection.as_mut() {
83+
Some(selection) => {
84+
*selection = selection.and_then(&sel);
85+
}
86+
None => {
87+
row_selection = Some(sel);
88+
}
89+
}
90+
}
91+
5392
row_group.fetch(&self.projection, None).await?;
5493
let reader = ParquetRecordBatchReader::try_new_with_row_groups(
5594
&self.field_levels,

src/query/storages/parquet/src/parquet_reader/read_policy/policy.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,14 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
1517
use databend_common_exception::Result;
1618
use databend_common_expression::DataBlock;
1719
use databend_common_expression::TopKSorter;
1820
use parquet::arrow::arrow_reader::RowSelection;
1921

22+
use crate::parquet_reader::predicate::ParquetPredicate;
2023
use crate::parquet_reader::row_group::InMemoryRowGroup;
2124
use crate::transformer::RecordBatchTransformer;
2225

@@ -66,6 +69,7 @@ pub trait ReadPolicyBuilder: Send + Sync {
6669
_sorter: &mut Option<TopKSorter>,
6770
_transformer: Option<RecordBatchTransformer>,
6871
_batch_size: usize,
72+
_filter: Option<Arc<ParquetPredicate>>,
6973
) -> Result<Option<ReadPolicyImpl>> {
7074
unreachable!()
7175
}

src/query/storages/parquet/src/parquet_reader/read_policy/predicate_and_topk.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ impl ReadPolicyBuilder for PredicateAndTopkPolicyBuilder {
161161
sorter: &mut Option<TopKSorter>,
162162
transformer: Option<RecordBatchTransformer>,
163163
batch_size: usize,
164+
_filter: Option<Arc<ParquetPredicate>>,
164165
) -> Result<Option<ReadPolicyImpl>> {
165166
let mut num_rows = selection
166167
.as_ref()

src/query/storages/parquet/src/parquet_reader/read_policy/topk_only.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use super::policy::ReadPolicyBuilder;
3535
use super::policy::ReadPolicyImpl;
3636
use super::utils::evaluate_topk;
3737
use super::utils::read_all;
38+
use crate::parquet_reader::predicate::ParquetPredicate;
3839
use crate::parquet_reader::row_group::InMemoryRowGroup;
3940
use crate::parquet_reader::topk::BuiltTopK;
4041
use crate::parquet_reader::topk::ParquetTopK;
@@ -136,6 +137,7 @@ impl ReadPolicyBuilder for TopkOnlyPolicyBuilder {
136137
sorter: &mut Option<TopKSorter>,
137138
transformer: Option<RecordBatchTransformer>,
138139
batch_size: usize,
140+
_filter: Option<Arc<ParquetPredicate>>,
139141
) -> Result<Option<ReadPolicyImpl>> {
140142
debug_assert!(sorter.is_some());
141143
let sorter = sorter.as_mut().unwrap();

0 commit comments

Comments
 (0)