Skip to content

Commit 6a14011

Browse files
authored
Merge pull request #10536 from youngsofun/trans
refactor(stage): unify list files and read parquet metas
2 parents d6c0ca6 + cb42922 commit 6a14011

File tree

28 files changed

+445
-482
lines changed

28 files changed

+445
-482
lines changed

Cargo.lock

Lines changed: 2 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/storage/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ common-meta-app = { path = "../../meta/app" }
2020
anyhow = { workspace = true }
2121
async-trait = "0.1"
2222
bytes = "1"
23+
chrono = { workspace = true }
2324
futures = "0.3"
2425
opendal = { workspace = true }
2526
regex = "1.6.0"

src/common/storage/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,12 @@ pub use column_node::ColumnNode;
5050
pub use column_node::ColumnNodes;
5151

5252
mod parquet;
53+
pub use parquet::read_parquet_metas_in_parallel;
5354
pub use parquet::read_parquet_schema_async;
5455

5556
mod stage;
5657
pub use stage::init_stage_operator;
58+
pub use stage::FileWithMeta;
59+
pub use stage::StageFileInfo;
60+
pub use stage::StageFileStatus;
5761
pub use stage::StageFilesInfo;

src/common/storage/src/parquet.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
use common_arrow::arrow::datatypes::Schema as ArrowSchema;
1616
use common_arrow::arrow::io::parquet::read as pread;
17+
use common_arrow::parquet::metadata::FileMetaData;
18+
use common_base::runtime::execute_futures_in_parallel;
1719
use common_exception::ErrorCode;
1820
use common_exception::Result;
1921
use opendal::Operator;
@@ -27,3 +29,51 @@ pub async fn read_parquet_schema_async(operator: &Operator, path: &str) -> Resul
2729
let arrow_schema = pread::infer_schema(&meta)?;
2830
Ok(arrow_schema)
2931
}
32+
33+
async fn read_parquet_metas_batch(
34+
file_infos: Vec<(String, u64)>,
35+
op: Operator,
36+
) -> Result<Vec<FileMetaData>> {
37+
// todo(youngsofun): we should use size in StageFileInfo, but parquet2 do not have the interface for now
38+
let mut metas = vec![];
39+
for (path, _size) in file_infos {
40+
let mut reader = op.reader(&path).await?;
41+
metas.push(pread::read_metadata_async(&mut reader).await?)
42+
}
43+
Ok(metas)
44+
}
45+
46+
pub async fn read_parquet_metas_in_parallel(
47+
op: Operator,
48+
file_infos: Vec<(String, u64)>,
49+
thread_nums: usize,
50+
permit_nums: usize,
51+
) -> Result<Vec<FileMetaData>> {
52+
let batch_size = 1000;
53+
if file_infos.len() <= batch_size {
54+
read_parquet_metas_batch(file_infos, op.clone()).await
55+
} else {
56+
let mut chunks = file_infos.chunks(batch_size);
57+
58+
let tasks = std::iter::from_fn(move || {
59+
chunks
60+
.next()
61+
.map(|location| read_parquet_metas_batch(location.to_vec(), op.clone()))
62+
});
63+
64+
let result = execute_futures_in_parallel(
65+
tasks,
66+
thread_nums,
67+
permit_nums,
68+
"read-parquet-metas-worker".to_owned(),
69+
)
70+
.await?
71+
.into_iter()
72+
.collect::<Result<Vec<Vec<_>>>>()?
73+
.into_iter()
74+
.flatten()
75+
.collect();
76+
77+
Ok(result)
78+
}
79+
}

src/common/storage/src/stage.rs

Lines changed: 113 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@
1414

1515
use std::path::Path;
1616

17+
use chrono::DateTime;
18+
use chrono::TimeZone;
19+
use chrono::Utc;
1720
use common_exception::ErrorCode;
1821
use common_exception::Result;
1922
use common_meta_app::principal::StageInfo;
2023
use common_meta_app::principal::StageType;
24+
use common_meta_app::principal::UserIdentity;
2125
use futures::TryStreamExt;
26+
use opendal::Entry;
2227
use opendal::EntryMode;
28+
use opendal::Metadata;
2329
use opendal::Metakey;
2430
use opendal::Operator;
2531
use regex::Regex;
@@ -29,16 +35,57 @@ use crate::DataOperator;
2935

3036
pub struct FileWithMeta {
3137
pub path: String,
38+
pub metadata: Metadata,
3239
}
3340

3441
impl FileWithMeta {
35-
fn new(path: &str) -> Self {
42+
fn new(path: &str, meta: Metadata) -> Self {
3643
Self {
3744
path: path.to_string(),
45+
metadata: meta,
3846
}
3947
}
4048
}
4149

50+
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
51+
pub enum StageFileStatus {
52+
NeedCopy,
53+
AlreadyCopied,
54+
}
55+
56+
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
57+
pub struct StageFileInfo {
58+
pub path: String,
59+
pub size: u64,
60+
pub md5: Option<String>,
61+
pub last_modified: DateTime<Utc>,
62+
pub etag: Option<String>,
63+
pub status: StageFileStatus,
64+
pub creator: Option<UserIdentity>,
65+
}
66+
67+
impl StageFileInfo {
68+
pub fn new(path: String, meta: &Metadata) -> StageFileInfo {
69+
StageFileInfo {
70+
path,
71+
size: meta.content_length(),
72+
md5: meta.content_md5().map(str::to_string),
73+
last_modified: meta
74+
.last_modified()
75+
.map_or(Utc::now(), |t| Utc.timestamp(t.unix_timestamp(), 0)),
76+
etag: meta.etag().map(str::to_string),
77+
status: StageFileStatus::NeedCopy,
78+
creator: None,
79+
}
80+
}
81+
}
82+
83+
impl From<FileWithMeta> for StageFileInfo {
84+
fn from(value: FileWithMeta) -> Self {
85+
StageFileInfo::new(value.path, &value.metadata)
86+
}
87+
}
88+
4289
pub fn init_stage_operator(stage_info: &StageInfo) -> Result<Operator> {
4390
if stage_info.stage_type == StageType::External {
4491
Ok(init_operator(&stage_info.stage_params.storage)?)
@@ -84,7 +131,7 @@ impl StageFilesInfo {
84131
.to_string();
85132
let meta = operator.stat(&full_path).await?;
86133
if meta.mode().is_file() {
87-
res.push(FileWithMeta::new(&full_path))
134+
res.push(FileWithMeta::new(&full_path, meta))
88135
} else {
89136
return Err(ErrorCode::BadArguments(format!(
90137
"{full_path} is not a file"
@@ -97,7 +144,7 @@ impl StageFilesInfo {
97144
Ok(res)
98145
} else {
99146
let pattern = self.get_pattern()?;
100-
list_files_with_pattern(operator, &self.path, pattern, first_only).await
147+
StageFilesInfo::list_files_with_pattern(operator, &self.path, pattern, first_only).await
101148
}
102149
}
103150

@@ -131,7 +178,7 @@ impl StageFilesInfo {
131178
.to_string();
132179
let meta = operator.blocking().stat(&full_path)?;
133180
if meta.mode().is_file() {
134-
res.push(FileWithMeta::new(&full_path))
181+
res.push(FileWithMeta::new(&full_path, meta))
135182
} else {
136183
return Err(ErrorCode::BadArguments(format!(
137184
"{full_path} is not a file"
@@ -147,43 +194,46 @@ impl StageFilesInfo {
147194
blocking_list_files_with_pattern(operator, &self.path, pattern, first_only)
148195
}
149196
}
150-
}
151197

152-
async fn list_files_with_pattern(
153-
operator: &Operator,
154-
path: &str,
155-
pattern: Option<Regex>,
156-
first_only: bool,
157-
) -> Result<Vec<FileWithMeta>> {
158-
let root_meta = operator.stat(path).await;
159-
match root_meta {
160-
Ok(meta) => match meta.mode() {
161-
EntryMode::FILE => return Ok(vec![FileWithMeta::new(path)]),
162-
EntryMode::DIR => {}
163-
EntryMode::Unknown => return Err(ErrorCode::BadArguments("object mode is unknown")),
164-
},
165-
Err(e) => {
166-
if e.kind() == opendal::ErrorKind::NotFound {
167-
return Ok(vec![]);
168-
} else {
169-
return Err(e.into());
198+
pub async fn list_files_with_pattern(
199+
operator: &Operator,
200+
path: &str,
201+
pattern: Option<Regex>,
202+
first_only: bool,
203+
) -> Result<Vec<FileWithMeta>> {
204+
let root_meta = operator.stat(path).await;
205+
match root_meta {
206+
Ok(meta) => match meta.mode() {
207+
EntryMode::FILE => return Ok(vec![FileWithMeta::new(path, meta)]),
208+
EntryMode::DIR => {}
209+
EntryMode::Unknown => {
210+
return Err(ErrorCode::BadArguments("object mode is unknown"));
211+
}
212+
},
213+
Err(e) => {
214+
if e.kind() == opendal::ErrorKind::NotFound {
215+
return Ok(vec![]);
216+
} else {
217+
return Err(e.into());
218+
}
170219
}
171-
}
172-
};
220+
};
173221

174-
// path is a dir
175-
let mut files = Vec::new();
176-
let mut list = operator.scan(path).await?;
177-
while let Some(obj) = list.try_next().await? {
178-
let meta = operator.metadata(&obj, Metakey::Mode).await?;
179-
if check_file(obj.path(), meta.mode(), &pattern) {
180-
files.push(FileWithMeta::new(obj.path()));
181-
if first_only {
182-
return Ok(files);
222+
// path is a dir
223+
let mut files = Vec::new();
224+
let mut list = operator.scan(path).await?;
225+
while let Some(obj) = list.try_next().await? {
226+
// todo(youngsofun): not always need Metakey::Complete
227+
let meta = operator.metadata(&obj, Metakey::Complete).await?;
228+
if check_file(obj.path(), meta.mode(), &pattern) {
229+
files.push(FileWithMeta::new(obj.path(), meta));
230+
if first_only {
231+
return Ok(files);
232+
}
183233
}
184234
}
235+
Ok(files)
185236
}
186-
Ok(files)
187237
}
188238

189239
fn check_file(path: &str, mode: EntryMode, pattern: &Option<Regex>) -> bool {
@@ -208,7 +258,7 @@ fn blocking_list_files_with_pattern(
208258
let root_meta = operator.stat(path);
209259
match root_meta {
210260
Ok(meta) => match meta.mode() {
211-
EntryMode::FILE => return Ok(vec![FileWithMeta::new(path)]),
261+
EntryMode::FILE => return Ok(vec![FileWithMeta::new(path, meta)]),
212262
EntryMode::DIR => {}
213263
EntryMode::Unknown => return Err(ErrorCode::BadArguments("object mode is unknown")),
214264
},
@@ -226,13 +276,38 @@ fn blocking_list_files_with_pattern(
226276
let list = operator.list(path)?;
227277
for obj in list {
228278
let obj = obj?;
229-
let meta = operator.metadata(&obj, Metakey::Mode)?;
279+
let meta = operator.metadata(&obj, Metakey::Complete)?;
230280
if check_file(obj.path(), meta.mode(), &pattern) {
231-
files.push(FileWithMeta::new(obj.path()));
281+
files.push(FileWithMeta::new(obj.path(), meta));
232282
if first_only {
233283
return Ok(files);
234284
}
235285
}
236286
}
237287
Ok(files)
238288
}
289+
290+
/// # Behavior
291+
///
292+
///
293+
/// - `Ok(Some(v))` if given object is a file and no error happened.
294+
/// - `Ok(None)` if given object is not a file.
295+
/// - `Err(err)` if there is an error happened.
296+
#[allow(unused)]
297+
pub async fn stat_file(op: Operator, de: Entry) -> Result<Option<StageFileInfo>> {
298+
let meta = op
299+
.metadata(&de, {
300+
Metakey::Mode
301+
| Metakey::ContentLength
302+
| Metakey::ContentMd5
303+
| Metakey::LastModified
304+
| Metakey::Etag
305+
})
306+
.await?;
307+
308+
if !meta.mode().is_file() {
309+
return Ok(None);
310+
}
311+
312+
Ok(Some(StageFileInfo::new(de.path().to_string(), &meta)))
313+
}

src/query/ast/src/ast/format/ast_format.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -705,9 +705,9 @@ impl<'ast> Visitor<'ast> for AstFormatVisitor {
705705
children.push(self.children.pop().unwrap());
706706
self.visit_copy_unit(&copy.dst);
707707
children.push(self.children.pop().unwrap());
708-
if !copy.files.is_empty() {
709-
let mut files_children = Vec::with_capacity(copy.files.len());
710-
for file in copy.files.iter() {
708+
if let Some(files) = &copy.files {
709+
let mut files_children = Vec::with_capacity(files.len());
710+
for file in files.iter() {
711711
let file_name = format!("File {}", file);
712712
let file_format_ctx = AstFormatContext::new(file_name);
713713
let file_node = FormatTreeNode::new(file_format_ctx);
@@ -719,8 +719,8 @@ impl<'ast> Visitor<'ast> for AstFormatVisitor {
719719
let files_node = FormatTreeNode::with_children(files_format_ctx, files_children);
720720
children.push(files_node);
721721
}
722-
if !copy.pattern.is_empty() {
723-
let pattern_name = format!("Pattern {}", copy.pattern);
722+
if let Some(pattern) = &copy.pattern {
723+
let pattern_name = format!("Pattern {}", pattern);
724724
let pattern_format_ctx = AstFormatContext::new(pattern_name);
725725
let pattern_node = FormatTreeNode::new(pattern_format_ctx);
726726
children.push(pattern_node);

0 commit comments

Comments
 (0)