Skip to content

Commit fd71e7a

Browse files
committed
feat(inspect): add core metadata tables (refs, history, metadata_log_entries, files, all_manifests, all_files)
Implement 10 new metadata table types for the inspect module: - Group A (metadata-only): RefsTable, HistoryTable, MetadataLogEntriesTable - Group B (file-level): FilesTable, DataFilesTable, DeleteFilesTable - Group C (all-snapshot): AllManifestsTable, AllFilesTable, AllDataFilesTable, AllDeleteFilesTable Shared infrastructure in files.rs handles dynamic partition structs, content filtering, and manifest deduplication across snapshots. DataFusion integration updated with all new variants. Part of #823.
1 parent 6165cd9 commit fd71e7a

File tree

10 files changed

+3032
-1
lines changed

10 files changed

+3032
-1
lines changed

crates/iceberg/src/inspect/all_files.rs

Lines changed: 684 additions & 0 deletions
Large diffs are not rendered by default.

crates/iceberg/src/inspect/all_manifests.rs

Lines changed: 412 additions & 0 deletions
Large diffs are not rendered by default.

crates/iceberg/src/inspect/files.rs

Lines changed: 1275 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashSet;
19+
use std::sync::Arc;
20+
21+
use arrow_array::RecordBatch;
22+
use arrow_array::builder::{BooleanBuilder, PrimitiveBuilder};
23+
use arrow_array::types::{Int64Type, TimestampMicrosecondType};
24+
use futures::{StreamExt, stream};
25+
26+
use crate::Result;
27+
use crate::arrow::schema_to_arrow_schema;
28+
use crate::scan::ArrowRecordBatchStream;
29+
use crate::spec::{NestedField, PrimitiveType, Type};
30+
use crate::table::Table;
31+
32+
/// History metadata table.
33+
///
34+
/// Shows the table's snapshot history log with ancestry information.
35+
pub struct HistoryTable<'a> {
36+
table: &'a Table,
37+
}
38+
39+
impl<'a> HistoryTable<'a> {
40+
/// Create a new History table instance.
41+
pub fn new(table: &'a Table) -> Self {
42+
Self { table }
43+
}
44+
45+
/// Returns the iceberg schema of the history table.
46+
pub fn schema(&self) -> crate::spec::Schema {
47+
let fields = vec![
48+
NestedField::required(
49+
1,
50+
"made_current_at",
51+
Type::Primitive(PrimitiveType::Timestamptz),
52+
),
53+
NestedField::required(2, "snapshot_id", Type::Primitive(PrimitiveType::Long)),
54+
NestedField::optional(3, "parent_id", Type::Primitive(PrimitiveType::Long)),
55+
NestedField::required(
56+
4,
57+
"is_current_ancestor",
58+
Type::Primitive(PrimitiveType::Boolean),
59+
),
60+
];
61+
crate::spec::Schema::builder()
62+
.with_fields(fields.into_iter().map(|f| f.into()))
63+
.build()
64+
.unwrap()
65+
}
66+
67+
/// Scans the history table.
68+
pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
69+
let schema = schema_to_arrow_schema(&self.schema())?;
70+
let metadata = self.table.metadata();
71+
72+
// Build set of ancestor snapshot IDs by walking back from current snapshot
73+
let mut ancestor_ids = HashSet::new();
74+
if let Some(current) = metadata.current_snapshot() {
75+
let mut snapshot_id = Some(current.snapshot_id());
76+
while let Some(id) = snapshot_id {
77+
ancestor_ids.insert(id);
78+
snapshot_id = metadata
79+
.snapshot_by_id(id)
80+
.and_then(|s| s.parent_snapshot_id());
81+
}
82+
}
83+
84+
let mut made_current_at =
85+
PrimitiveBuilder::<TimestampMicrosecondType>::new().with_timezone("+00:00");
86+
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
87+
let mut parent_id = PrimitiveBuilder::<Int64Type>::new();
88+
let mut is_current_ancestor = BooleanBuilder::new();
89+
90+
for log_entry in metadata.history() {
91+
made_current_at.append_value(log_entry.timestamp_ms * 1000);
92+
snapshot_id.append_value(log_entry.snapshot_id);
93+
94+
let parent = metadata
95+
.snapshot_by_id(log_entry.snapshot_id)
96+
.and_then(|s| s.parent_snapshot_id());
97+
parent_id.append_option(parent);
98+
99+
is_current_ancestor.append_value(ancestor_ids.contains(&log_entry.snapshot_id));
100+
}
101+
102+
let batch = RecordBatch::try_new(Arc::new(schema), vec![
103+
Arc::new(made_current_at.finish()),
104+
Arc::new(snapshot_id.finish()),
105+
Arc::new(parent_id.finish()),
106+
Arc::new(is_current_ancestor.finish()),
107+
])?;
108+
109+
Ok(stream::iter(vec![Ok(batch)]).boxed())
110+
}
111+
}
112+
113+
#[cfg(test)]
114+
mod tests {
115+
use expect_test::expect;
116+
use futures::TryStreamExt;
117+
118+
use crate::scan::tests::TableTestFixture;
119+
use crate::test_utils::check_record_batches;
120+
121+
#[tokio::test]
122+
async fn test_history_table() {
123+
let table = TableTestFixture::new().table;
124+
125+
let batch_stream = table.inspect().history().scan().await.unwrap();
126+
127+
check_record_batches(
128+
batch_stream.try_collect::<Vec<_>>().await.unwrap(),
129+
expect![[r#"
130+
Field { "made_current_at": Timestamp(µs, "+00:00"), metadata: {"PARQUET:field_id": "1"} },
131+
Field { "snapshot_id": Int64, metadata: {"PARQUET:field_id": "2"} },
132+
Field { "parent_id": nullable Int64, metadata: {"PARQUET:field_id": "3"} },
133+
Field { "is_current_ancestor": Boolean, metadata: {"PARQUET:field_id": "4"} }"#]],
134+
expect![[r#"
135+
made_current_at: PrimitiveArray<Timestamp(µs, "+00:00")>
136+
[
137+
2018-01-04T21:22:35.770+00:00,
138+
2019-04-12T20:29:15.770+00:00,
139+
],
140+
snapshot_id: PrimitiveArray<Int64>
141+
[
142+
3051729675574597004,
143+
3055729675574597004,
144+
],
145+
parent_id: PrimitiveArray<Int64>
146+
[
147+
null,
148+
3051729675574597004,
149+
],
150+
is_current_ancestor: BooleanArray
151+
[
152+
true,
153+
true,
154+
]"#]],
155+
&[],
156+
Some("made_current_at"),
157+
);
158+
}
159+
}
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use arrow_array::RecordBatch;
21+
use arrow_array::builder::{PrimitiveBuilder, StringBuilder};
22+
use arrow_array::types::{Int32Type, Int64Type, TimestampMicrosecondType};
23+
use futures::{StreamExt, stream};
24+
25+
use crate::Result;
26+
use crate::arrow::schema_to_arrow_schema;
27+
use crate::scan::ArrowRecordBatchStream;
28+
use crate::spec::{NestedField, PrimitiveType, Type};
29+
use crate::table::Table;
30+
31+
/// Metadata log entries table.
32+
///
33+
/// Shows the table's metadata log history.
34+
pub struct MetadataLogEntriesTable<'a> {
35+
table: &'a Table,
36+
}
37+
38+
impl<'a> MetadataLogEntriesTable<'a> {
39+
/// Create a new MetadataLogEntries table instance.
40+
pub fn new(table: &'a Table) -> Self {
41+
Self { table }
42+
}
43+
44+
/// Returns the iceberg schema of the metadata log entries table.
45+
pub fn schema(&self) -> crate::spec::Schema {
46+
let fields = vec![
47+
NestedField::required(
48+
1,
49+
"timestamp",
50+
Type::Primitive(PrimitiveType::Timestamptz),
51+
),
52+
NestedField::required(2, "file", Type::Primitive(PrimitiveType::String)),
53+
NestedField::optional(
54+
3,
55+
"latest_snapshot_id",
56+
Type::Primitive(PrimitiveType::Long),
57+
),
58+
NestedField::optional(
59+
4,
60+
"latest_schema_id",
61+
Type::Primitive(PrimitiveType::Int),
62+
),
63+
NestedField::optional(
64+
5,
65+
"latest_sequence_number",
66+
Type::Primitive(PrimitiveType::Long),
67+
),
68+
];
69+
crate::spec::Schema::builder()
70+
.with_fields(fields.into_iter().map(|f| f.into()))
71+
.build()
72+
.unwrap()
73+
}
74+
75+
/// Scans the metadata log entries table.
76+
pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
77+
let schema = schema_to_arrow_schema(&self.schema())?;
78+
let metadata = self.table.metadata();
79+
80+
let mut timestamp =
81+
PrimitiveBuilder::<TimestampMicrosecondType>::new().with_timezone("+00:00");
82+
let mut file = StringBuilder::new();
83+
let mut latest_snapshot_id = PrimitiveBuilder::<Int64Type>::new();
84+
let mut latest_schema_id = PrimitiveBuilder::<Int32Type>::new();
85+
let mut latest_sequence_number = PrimitiveBuilder::<Int64Type>::new();
86+
87+
// Historical entries from the metadata log
88+
for log_entry in metadata.metadata_log() {
89+
timestamp.append_value(log_entry.timestamp_ms * 1000);
90+
file.append_value(&log_entry.metadata_file);
91+
latest_snapshot_id.append_null();
92+
latest_schema_id.append_null();
93+
latest_sequence_number.append_null();
94+
}
95+
96+
// Current metadata entry
97+
if let Some(metadata_location) = self.table.metadata_location() {
98+
timestamp.append_value(metadata.last_updated_ms * 1000);
99+
file.append_value(metadata_location);
100+
latest_snapshot_id.append_option(metadata.current_snapshot_id);
101+
latest_schema_id.append_value(metadata.current_schema_id);
102+
latest_sequence_number.append_value(metadata.last_sequence_number);
103+
}
104+
105+
let batch = RecordBatch::try_new(Arc::new(schema), vec![
106+
Arc::new(timestamp.finish()),
107+
Arc::new(file.finish()),
108+
Arc::new(latest_snapshot_id.finish()),
109+
Arc::new(latest_schema_id.finish()),
110+
Arc::new(latest_sequence_number.finish()),
111+
])?;
112+
113+
Ok(stream::iter(vec![Ok(batch)]).boxed())
114+
}
115+
}
116+
117+
#[cfg(test)]
118+
mod tests {
119+
use expect_test::expect;
120+
use futures::TryStreamExt;
121+
122+
use crate::scan::tests::TableTestFixture;
123+
use crate::test_utils::check_record_batches;
124+
125+
#[tokio::test]
126+
async fn test_metadata_log_entries_table() {
127+
let table = TableTestFixture::new().table;
128+
129+
let batch_stream = table
130+
.inspect()
131+
.metadata_log_entries()
132+
.scan()
133+
.await
134+
.unwrap();
135+
136+
check_record_batches(
137+
batch_stream.try_collect::<Vec<_>>().await.unwrap(),
138+
expect![[r#"
139+
Field { "timestamp": Timestamp(µs, "+00:00"), metadata: {"PARQUET:field_id": "1"} },
140+
Field { "file": Utf8, metadata: {"PARQUET:field_id": "2"} },
141+
Field { "latest_snapshot_id": nullable Int64, metadata: {"PARQUET:field_id": "3"} },
142+
Field { "latest_schema_id": nullable Int32, metadata: {"PARQUET:field_id": "4"} },
143+
Field { "latest_sequence_number": nullable Int64, metadata: {"PARQUET:field_id": "5"} }"#]],
144+
expect![[r#"
145+
timestamp: PrimitiveArray<Timestamp(µs, "+00:00")>
146+
[
147+
1970-01-01T00:25:15.100+00:00,
148+
2020-10-14T01:22:53.590+00:00,
149+
],
150+
file: (skipped),
151+
latest_snapshot_id: PrimitiveArray<Int64>
152+
[
153+
null,
154+
3055729675574597004,
155+
],
156+
latest_schema_id: PrimitiveArray<Int32>
157+
[
158+
null,
159+
1,
160+
],
161+
latest_sequence_number: PrimitiveArray<Int64>
162+
[
163+
null,
164+
34,
165+
]"#]],
166+
&["file"],
167+
Some("timestamp"),
168+
);
169+
}
170+
}

0 commit comments

Comments
 (0)