Skip to content

Commit 83b74d4

Browse files
committed
read ThriftFileMetaData instead of FileMetaData
1 parent 7b22a92 commit 83b74d4

File tree

6 files changed

+132
-17
lines changed

6 files changed

+132
-17
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/service/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ enable-histogram-metrics = [
4545
common-arrow = { path = "../../common/arrow" }
4646
common-ast = { path = "../ast" }
4747
common-base = { path = "../../common/base" }
48+
common-cache = { path = "../../common/cache" }
4849
common-catalog = { path = "../catalog" }
49-
common-cache = {path = "../../common/cache" }
5050
common-compress = { path = "../../common/compress" }
5151
common-config = { path = "../config" }
5252
common-exception = { path = "../../common/exception" }
@@ -161,13 +161,13 @@ num = "0.4.0"
161161
p256 = "0.13"
162162
pretty_assertions = "1.3.0"
163163
reqwest = { workspace = true }
164+
sysinfo = "0.28.3"
164165
temp-env = "0.3.0"
165166
tempfile = "3.4.0"
166167
toml = { version = "0.7.3", default-features = false }
167168
url = "2.3.1"
168169
walkdir = "2.3.2"
169170
wiremock = "0.5.14"
170-
sysinfo = "0.28.3"
171171

172172
[build-dependencies]
173173
common-building = { path = "../../common/building" }

src/query/service/tests/it/storages/fuse/bloom_index_meta_size.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,7 @@ async fn test_index_meta_cache_size_bloom_meta() -> common_exception::Result<()>
8484

8585
let cache_number = 300_000;
8686

87-
let meta: FileMetaData = FileMetaData::try_from_thrift(thrift_file_meta)?;
88-
let bloom_index_meta = BloomIndexMeta::try_from(meta)?;
87+
let bloom_index_meta = BloomIndexMeta::try_from(thrift_file_meta)?;
8988

9089
let sys = System::new_all();
9190
let pid = get_current_pid().unwrap();

src/query/storages/common/index/src/bloom_index.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::collections::HashMap;
1616
use std::ops::Deref;
1717
use std::sync::Arc;
1818

19-
use common_arrow::parquet::metadata::FileMetaData;
19+
use common_arrow::parquet::metadata::ThriftFileMetaData;
2020
use common_exception::ErrorCode;
2121
use common_exception::Result;
2222
use common_exception::Span;
@@ -60,12 +60,10 @@ pub struct BloomIndexMeta {
6060
pub columns: Vec<(String, SingleColumnMeta)>,
6161
}
6262

63-
impl TryFrom<FileMetaData> for BloomIndexMeta {
63+
impl TryFrom<ThriftFileMetaData> for BloomIndexMeta {
6464
type Error = common_exception::ErrorCode;
6565

66-
fn try_from(meta: FileMetaData) -> std::result::Result<Self, Self::Error> {
67-
// TODO from FileMeta directly or pass in Thrift
68-
let mut meta = meta.into_thrift();
66+
fn try_from(mut meta: ThriftFileMetaData) -> std::result::Result<Self, Self::Error> {
6967
let rg = meta.row_groups.remove(0);
7068
let mut col_metas = Vec::with_capacity(rg.columns.len());
7169
for x in &rg.columns {
@@ -92,7 +90,9 @@ impl TryFrom<FileMetaData> for BloomIndexMeta {
9290
col_metas.push((column_name, res));
9391
}
9492
None => {
95-
panic!("")
93+
panic!(
94+
"expecting chunk meta data while converting ThriftFileMetaData to BloomIndexMeta"
95+
)
9696
}
9797
}
9898
}

src/query/storages/fuse/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ futures = "0.3.24"
4646
futures-util = "0.3.24"
4747
metrics = "0.20.1"
4848
opendal = { workspace = true }
49+
parquet-format-safe = "0.2"
4950
serde = { workspace = true }
5051
serde_json = { workspace = true }
5152
sha2 = "0.10.6"

src/query/storages/fuse/src/io/read/meta/meta_readers.rs

Lines changed: 121 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,16 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use common_arrow::parquet::read::read_metadata_async;
15+
use std::io::SeekFrom;
16+
17+
use common_arrow::parquet::metadata::ThriftFileMetaData;
1618
use common_exception::ErrorCode;
1719
use common_exception::Result;
1820
use common_expression::TableSchemaRef;
21+
use futures::AsyncRead;
22+
use futures::AsyncSeek;
23+
use futures_util::AsyncReadExt;
24+
use futures_util::AsyncSeekExt;
1925
use opendal::Operator;
2026
use opendal::Reader;
2127
use storages_common_cache::InMemoryItemCacheReader;
@@ -31,6 +37,7 @@ use storages_common_table_meta::meta::TableSnapshotStatistics;
3137
use storages_common_table_meta::meta::TableSnapshotStatisticsVersion;
3238

3339
use super::versioned_reader::VersionedReader;
40+
use crate::io::read::meta::meta_readers::thrift_file_meta_read::read_thrift_file_metadata;
3441

3542
pub type TableSnapshotStatisticsReader =
3643
InMemoryItemCacheReader<TableSnapshotStatistics, LoaderWrapper<Operator>>;
@@ -107,12 +114,15 @@ impl Loader<SegmentInfo> for LoaderWrapper<(Operator, TableSchemaRef)> {
107114
impl Loader<BloomIndexMeta> for LoaderWrapper<Operator> {
108115
async fn load(&self, params: &LoadParams) -> Result<BloomIndexMeta> {
109116
let mut reader = bytes_reader(&self.0, params.location.as_str(), params.len_hint).await?;
110-
let meta = read_metadata_async(&mut reader).await.map_err(|err| {
111-
ErrorCode::Internal(format!(
112-
"read file meta failed, {}, {:?}",
113-
params.location, err
114-
))
115-
})?;
117+
// read the ThriftFileMetaData, omit unnecessary conversions
118+
let meta = read_thrift_file_metadata(&mut reader)
119+
.await
120+
.map_err(|err| {
121+
ErrorCode::StorageOther(format!(
122+
"read file meta failed, {}, {:?}",
123+
params.location, err
124+
))
125+
})?;
116126

117127
BloomIndexMeta::try_from(meta)
118128
}
@@ -126,3 +136,107 @@ async fn bytes_reader(op: &Operator, path: &str, len_hint: Option<u64>) -> Resul
126136
};
127137
Ok(reader)
128138
}
139+
140+
mod thrift_file_meta_read {
141+
use common_arrow::parquet::error::Error;
142+
use parquet_format_safe::thrift::protocol::TCompactInputProtocol;
143+
144+
use super::*;
145+
146+
// the following code is copied from crate `parquet2`, with slight modification:
147+
// return a ThriftFileMetaData instead of FileMetaData while reader parquet metadata,
148+
// to avoid unnecessary conversions.
149+
//
150+
// It takes about 18s, to convert one million ThriftFileMetaData objects to parquet::metadata::FileMetaData,
151+
// and the memory usage of FileMetaData is also larger.
152+
153+
const HEADER_SIZE: u64 = PARQUET_MAGIC.len() as u64;
154+
const FOOTER_SIZE: u64 = 8;
155+
const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1'];
156+
157+
/// The number of bytes read at the end of the parquet file on first read
158+
const DEFAULT_FOOTER_READ_SIZE: u64 = 64 * 1024;
159+
160+
async fn stream_len(
161+
seek: &mut (impl AsyncSeek + std::marker::Unpin),
162+
) -> std::result::Result<u64, std::io::Error> {
163+
let old_pos = seek.seek(SeekFrom::Current(0)).await?;
164+
let len = seek.seek(SeekFrom::End(0)).await?;
165+
166+
// Avoid seeking a third time when we were already at the end of the
167+
// stream. The branch is usually way cheaper than a seek operation.
168+
if old_pos != len {
169+
seek.seek(SeekFrom::Start(old_pos)).await?;
170+
}
171+
172+
Ok(len)
173+
}
174+
175+
fn metadata_len(buffer: &[u8], len: usize) -> i32 {
176+
i32::from_le_bytes(buffer[len - 8..len - 4].try_into().unwrap())
177+
}
178+
179+
pub async fn read_thrift_file_metadata<R: AsyncRead + AsyncSeek + Send + std::marker::Unpin>(
180+
reader: &mut R,
181+
) -> common_arrow::parquet::error::Result<ThriftFileMetaData> {
182+
let file_size = stream_len(reader).await?;
183+
184+
if file_size < HEADER_SIZE + FOOTER_SIZE {
185+
return Err(Error::OutOfSpec(
186+
"A parquet file must containt a header and footer with at least 12 bytes".into(),
187+
));
188+
}
189+
190+
// read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer
191+
let default_end_len = std::cmp::min(DEFAULT_FOOTER_READ_SIZE, file_size) as usize;
192+
reader
193+
.seek(SeekFrom::End(-(default_end_len as i64)))
194+
.await?;
195+
196+
let mut buffer = vec![];
197+
buffer.try_reserve(default_end_len)?;
198+
reader
199+
.take(default_end_len as u64)
200+
.read_to_end(&mut buffer)
201+
.await?;
202+
203+
// check this is indeed a parquet file
204+
if buffer[default_end_len - 4..] != PARQUET_MAGIC {
205+
return Err(Error::OutOfSpec(
206+
"Invalid Parquet file. Corrupt footer".into(),
207+
));
208+
}
209+
210+
let metadata_len = metadata_len(&buffer, default_end_len);
211+
let metadata_len: u64 = metadata_len.try_into()?;
212+
213+
let footer_len = FOOTER_SIZE + metadata_len;
214+
if footer_len > file_size {
215+
return Err(Error::OutOfSpec(
216+
"The footer size must be smaller or equal to the file's size".into(),
217+
));
218+
}
219+
220+
let reader = if (footer_len as usize) < buffer.len() {
221+
// the whole metadata is in the bytes we already read
222+
let remaining = buffer.len() - footer_len as usize;
223+
&buffer[remaining..]
224+
} else {
225+
// the end of file read by default is not long enough, read again including the metadata.
226+
reader.seek(SeekFrom::End(-(footer_len as i64))).await?;
227+
228+
buffer.clear();
229+
buffer.try_reserve(footer_len as usize)?;
230+
reader.take(footer_len).read_to_end(&mut buffer).await?;
231+
232+
&buffer
233+
};
234+
235+
// a highly nested but sparse struct could result in many allocations
236+
let max_size = reader.len() * 2 + 1024;
237+
238+
let mut prot = TCompactInputProtocol::new(reader, max_size);
239+
let meta = ThriftFileMetaData::read_from_in_protocol(&mut prot)?;
240+
Ok(meta)
241+
}
242+
}

0 commit comments

Comments
 (0)