Skip to content

Commit 1cbfcdf

Browse files
committed
feat(inspect): add core metadata tables (refs, history, metadata_log_entries, files, all_manifests, all_files)
Implement 10 new metadata table types for the inspect module: - Group A (metadata-only): RefsTable, HistoryTable, MetadataLogEntriesTable - Group B (file-level): FilesTable, DataFilesTable, DeleteFilesTable - Group C (all-snapshot): AllManifestsTable, AllFilesTable, AllDataFilesTable, AllDeleteFilesTable Shared infrastructure in files.rs handles dynamic partition structs, content filtering, and manifest deduplication across snapshots. DataFusion integration updated with all new variants. Part of #823.
1 parent 6165cd9 commit 1cbfcdf

File tree

11 files changed

+3001
-1
lines changed

11 files changed

+3001
-1
lines changed

crates/iceberg/src/inspect/all_files.rs

Lines changed: 683 additions & 0 deletions
Large diffs are not rendered by default.

crates/iceberg/src/inspect/all_manifests.rs

Lines changed: 409 additions & 0 deletions
Large diffs are not rendered by default.

crates/iceberg/src/inspect/files.rs

Lines changed: 1253 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashSet;
19+
use std::sync::Arc;
20+
21+
use arrow_array::RecordBatch;
22+
use arrow_array::builder::{BooleanBuilder, PrimitiveBuilder};
23+
use arrow_array::types::{Int64Type, TimestampMicrosecondType};
24+
use futures::{StreamExt, stream};
25+
26+
use crate::Result;
27+
use crate::arrow::schema_to_arrow_schema;
28+
use crate::scan::ArrowRecordBatchStream;
29+
use crate::spec::{NestedField, PrimitiveType, Type};
30+
use crate::table::Table;
31+
32+
/// History metadata table.
33+
///
34+
/// Shows the table's snapshot history log with ancestry information.
35+
pub struct HistoryTable<'a> {
36+
table: &'a Table,
37+
}
38+
39+
impl<'a> HistoryTable<'a> {
40+
/// Create a new History table instance.
41+
pub fn new(table: &'a Table) -> Self {
42+
Self { table }
43+
}
44+
45+
/// Returns the iceberg schema of the history table.
46+
pub fn schema(&self) -> crate::spec::Schema {
47+
let fields = vec![
48+
NestedField::required(
49+
1,
50+
"made_current_at",
51+
Type::Primitive(PrimitiveType::Timestamptz),
52+
),
53+
NestedField::required(2, "snapshot_id", Type::Primitive(PrimitiveType::Long)),
54+
NestedField::optional(3, "parent_id", Type::Primitive(PrimitiveType::Long)),
55+
NestedField::required(
56+
4,
57+
"is_current_ancestor",
58+
Type::Primitive(PrimitiveType::Boolean),
59+
),
60+
];
61+
crate::spec::Schema::builder()
62+
.with_fields(fields.into_iter().map(|f| f.into()))
63+
.build()
64+
.unwrap()
65+
}
66+
67+
/// Scans the history table.
68+
pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
69+
let schema = schema_to_arrow_schema(&self.schema())?;
70+
let metadata = self.table.metadata();
71+
72+
// Build set of ancestor snapshot IDs by walking back from current snapshot
73+
let mut ancestor_ids = HashSet::new();
74+
if let Some(current) = metadata.current_snapshot() {
75+
let mut snapshot_id = Some(current.snapshot_id());
76+
while let Some(id) = snapshot_id {
77+
ancestor_ids.insert(id);
78+
snapshot_id = metadata
79+
.snapshot_by_id(id)
80+
.and_then(|s| s.parent_snapshot_id());
81+
}
82+
}
83+
84+
let mut made_current_at =
85+
PrimitiveBuilder::<TimestampMicrosecondType>::new().with_timezone("+00:00");
86+
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
87+
let mut parent_id = PrimitiveBuilder::<Int64Type>::new();
88+
let mut is_current_ancestor = BooleanBuilder::new();
89+
90+
for log_entry in metadata.history() {
91+
made_current_at.append_value(log_entry.timestamp_ms * 1000);
92+
snapshot_id.append_value(log_entry.snapshot_id);
93+
94+
let parent = metadata
95+
.snapshot_by_id(log_entry.snapshot_id)
96+
.and_then(|s| s.parent_snapshot_id());
97+
parent_id.append_option(parent);
98+
99+
is_current_ancestor.append_value(ancestor_ids.contains(&log_entry.snapshot_id));
100+
}
101+
102+
let batch = RecordBatch::try_new(Arc::new(schema), vec![
103+
Arc::new(made_current_at.finish()),
104+
Arc::new(snapshot_id.finish()),
105+
Arc::new(parent_id.finish()),
106+
Arc::new(is_current_ancestor.finish()),
107+
])?;
108+
109+
Ok(stream::iter(vec![Ok(batch)]).boxed())
110+
}
111+
}
112+
113+
#[cfg(test)]
114+
mod tests {
115+
use expect_test::expect;
116+
use futures::TryStreamExt;
117+
118+
use crate::scan::tests::TableTestFixture;
119+
use crate::test_utils::check_record_batches;
120+
121+
#[tokio::test]
122+
async fn test_history_table() {
123+
let table = TableTestFixture::new().table;
124+
125+
let batch_stream = table.inspect().history().scan().await.unwrap();
126+
127+
check_record_batches(
128+
batch_stream.try_collect::<Vec<_>>().await.unwrap(),
129+
expect![[r#"
130+
Field { "made_current_at": Timestamp(µs, "+00:00"), metadata: {"PARQUET:field_id": "1"} },
131+
Field { "snapshot_id": Int64, metadata: {"PARQUET:field_id": "2"} },
132+
Field { "parent_id": nullable Int64, metadata: {"PARQUET:field_id": "3"} },
133+
Field { "is_current_ancestor": Boolean, metadata: {"PARQUET:field_id": "4"} }"#]],
134+
expect![[r#"
135+
made_current_at: PrimitiveArray<Timestamp(µs, "+00:00")>
136+
[
137+
2018-01-04T21:22:35.770+00:00,
138+
2019-04-12T20:29:15.770+00:00,
139+
],
140+
snapshot_id: PrimitiveArray<Int64>
141+
[
142+
3051729675574597004,
143+
3055729675574597004,
144+
],
145+
parent_id: PrimitiveArray<Int64>
146+
[
147+
null,
148+
3051729675574597004,
149+
],
150+
is_current_ancestor: BooleanArray
151+
[
152+
true,
153+
true,
154+
]"#]],
155+
&[],
156+
Some("made_current_at"),
157+
);
158+
}
159+
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use arrow_array::RecordBatch;
21+
use arrow_array::builder::{PrimitiveBuilder, StringBuilder};
22+
use arrow_array::types::{Int32Type, Int64Type, TimestampMicrosecondType};
23+
use futures::{StreamExt, stream};
24+
25+
use crate::Result;
26+
use crate::arrow::schema_to_arrow_schema;
27+
use crate::scan::ArrowRecordBatchStream;
28+
use crate::spec::{NestedField, PrimitiveType, Type};
29+
use crate::table::Table;
30+
31+
/// Metadata log entries table.
32+
///
33+
/// Shows the table's metadata log history.
34+
pub struct MetadataLogEntriesTable<'a> {
35+
table: &'a Table,
36+
}
37+
38+
impl<'a> MetadataLogEntriesTable<'a> {
39+
/// Create a new MetadataLogEntries table instance.
40+
pub fn new(table: &'a Table) -> Self {
41+
Self { table }
42+
}
43+
44+
/// Returns the iceberg schema of the metadata log entries table.
45+
pub fn schema(&self) -> crate::spec::Schema {
46+
let fields = vec![
47+
NestedField::required(1, "timestamp", Type::Primitive(PrimitiveType::Timestamptz)),
48+
NestedField::required(2, "file", Type::Primitive(PrimitiveType::String)),
49+
NestedField::optional(
50+
3,
51+
"latest_snapshot_id",
52+
Type::Primitive(PrimitiveType::Long),
53+
),
54+
NestedField::optional(4, "latest_schema_id", Type::Primitive(PrimitiveType::Int)),
55+
NestedField::optional(
56+
5,
57+
"latest_sequence_number",
58+
Type::Primitive(PrimitiveType::Long),
59+
),
60+
];
61+
crate::spec::Schema::builder()
62+
.with_fields(fields.into_iter().map(|f| f.into()))
63+
.build()
64+
.unwrap()
65+
}
66+
67+
/// Scans the metadata log entries table.
68+
pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
69+
let schema = schema_to_arrow_schema(&self.schema())?;
70+
let metadata = self.table.metadata();
71+
72+
let mut timestamp =
73+
PrimitiveBuilder::<TimestampMicrosecondType>::new().with_timezone("+00:00");
74+
let mut file = StringBuilder::new();
75+
let mut latest_snapshot_id = PrimitiveBuilder::<Int64Type>::new();
76+
let mut latest_schema_id = PrimitiveBuilder::<Int32Type>::new();
77+
let mut latest_sequence_number = PrimitiveBuilder::<Int64Type>::new();
78+
79+
// Historical entries from the metadata log
80+
for log_entry in metadata.metadata_log() {
81+
timestamp.append_value(log_entry.timestamp_ms * 1000);
82+
file.append_value(&log_entry.metadata_file);
83+
latest_snapshot_id.append_null();
84+
latest_schema_id.append_null();
85+
latest_sequence_number.append_null();
86+
}
87+
88+
// Current metadata entry
89+
if let Some(metadata_location) = self.table.metadata_location() {
90+
timestamp.append_value(metadata.last_updated_ms * 1000);
91+
file.append_value(metadata_location);
92+
latest_snapshot_id.append_option(metadata.current_snapshot_id);
93+
latest_schema_id.append_value(metadata.current_schema_id);
94+
latest_sequence_number.append_value(metadata.last_sequence_number);
95+
}
96+
97+
let batch = RecordBatch::try_new(Arc::new(schema), vec![
98+
Arc::new(timestamp.finish()),
99+
Arc::new(file.finish()),
100+
Arc::new(latest_snapshot_id.finish()),
101+
Arc::new(latest_schema_id.finish()),
102+
Arc::new(latest_sequence_number.finish()),
103+
])?;
104+
105+
Ok(stream::iter(vec![Ok(batch)]).boxed())
106+
}
107+
}
108+
109+
#[cfg(test)]
110+
mod tests {
111+
use expect_test::expect;
112+
use futures::TryStreamExt;
113+
114+
use crate::scan::tests::TableTestFixture;
115+
use crate::test_utils::check_record_batches;
116+
117+
#[tokio::test]
118+
async fn test_metadata_log_entries_table() {
119+
let table = TableTestFixture::new().table;
120+
121+
let batch_stream = table.inspect().metadata_log_entries().scan().await.unwrap();
122+
123+
check_record_batches(
124+
batch_stream.try_collect::<Vec<_>>().await.unwrap(),
125+
expect![[r#"
126+
Field { "timestamp": Timestamp(µs, "+00:00"), metadata: {"PARQUET:field_id": "1"} },
127+
Field { "file": Utf8, metadata: {"PARQUET:field_id": "2"} },
128+
Field { "latest_snapshot_id": nullable Int64, metadata: {"PARQUET:field_id": "3"} },
129+
Field { "latest_schema_id": nullable Int32, metadata: {"PARQUET:field_id": "4"} },
130+
Field { "latest_sequence_number": nullable Int64, metadata: {"PARQUET:field_id": "5"} }"#]],
131+
expect![[r#"
132+
timestamp: PrimitiveArray<Timestamp(µs, "+00:00")>
133+
[
134+
1970-01-01T00:25:15.100+00:00,
135+
2020-10-14T01:22:53.590+00:00,
136+
],
137+
file: (skipped),
138+
latest_snapshot_id: PrimitiveArray<Int64>
139+
[
140+
null,
141+
3055729675574597004,
142+
],
143+
latest_schema_id: PrimitiveArray<Int32>
144+
[
145+
null,
146+
1,
147+
],
148+
latest_sequence_number: PrimitiveArray<Int64>
149+
[
150+
null,
151+
34,
152+
]"#]],
153+
&["file"],
154+
Some("timestamp"),
155+
);
156+
}
157+
}

0 commit comments

Comments
 (0)