Skip to content

Commit 49da701

Browse files
authored
Merge pull request #10609 from Xuanwo/refactor-not-store-opendal-metadata
refactor: Merge FileWithMetadata into StageFileInfo
2 parents d83360c + 0923ab8 commit 49da701

File tree

8 files changed

+29
-61
lines changed

8 files changed

+29
-61
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/storage/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ anyhow = { workspace = true }
2121
async-trait = "0.1"
2222
bytes = "1"
2323
chrono = { workspace = true }
24+
flagset = "0.4"
2425
futures = "0.3"
2526
opendal = { workspace = true }
2627
regex = "1.6.0"

src/common/storage/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ pub use parquet::read_parquet_schema_async;
5555

5656
mod stage;
5757
pub use stage::init_stage_operator;
58-
pub use stage::FileWithMeta;
5958
pub use stage::StageFileInfo;
6059
pub use stage::StageFileStatus;
6160
pub use stage::StageFilesInfo;

src/common/storage/src/stage.rs

Lines changed: 17 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,6 @@ use regex::Regex;
3333
use crate::init_operator;
3434
use crate::DataOperator;
3535

36-
pub struct FileWithMeta {
37-
pub path: String,
38-
pub metadata: Metadata,
39-
}
40-
41-
impl FileWithMeta {
42-
fn new(path: &str, meta: Metadata) -> Self {
43-
Self {
44-
path: path.to_string(),
45-
metadata: meta,
46-
}
47-
}
48-
}
49-
5036
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
5137
pub enum StageFileStatus {
5238
NeedCopy,
@@ -79,11 +65,10 @@ impl StageFileInfo {
7965
creator: None,
8066
}
8167
}
82-
}
8368

84-
impl From<FileWithMeta> for StageFileInfo {
85-
fn from(value: FileWithMeta) -> Self {
86-
StageFileInfo::new(value.path, &value.metadata)
69+
/// NOTE: update this query when add new meta
70+
pub fn meta_query() -> flagset::FlagSet<Metakey> {
71+
Metakey::ContentLength | Metakey::ContentMd5 | Metakey::LastModified | Metakey::Etag
8772
}
8873
}
8974

@@ -122,7 +107,7 @@ impl StageFilesInfo {
122107
}
123108
}
124109

125-
pub async fn list(&self, operator: &Operator, first_only: bool) -> Result<Vec<FileWithMeta>> {
110+
pub async fn list(&self, operator: &Operator, first_only: bool) -> Result<Vec<StageFileInfo>> {
126111
if let Some(files) = &self.files {
127112
let mut res = Vec::new();
128113
for file in files {
@@ -132,7 +117,7 @@ impl StageFilesInfo {
132117
.to_string();
133118
let meta = operator.stat(&full_path).await?;
134119
if meta.mode().is_file() {
135-
res.push(FileWithMeta::new(&full_path, meta))
120+
res.push(StageFileInfo::new(full_path, &meta))
136121
} else {
137122
return Err(ErrorCode::BadArguments(format!(
138123
"{full_path} is not a file"
@@ -149,15 +134,15 @@ impl StageFilesInfo {
149134
}
150135
}
151136

152-
pub async fn first_file(&self, operator: &Operator) -> Result<FileWithMeta> {
137+
pub async fn first_file(&self, operator: &Operator) -> Result<StageFileInfo> {
153138
let mut files = self.list(operator, true).await?;
154139
match files.pop() {
155140
None => Err(ErrorCode::BadArguments("no file found")),
156141
Some(f) => Ok(f),
157142
}
158143
}
159144

160-
pub fn blocking_first_file(&self, operator: &Operator) -> Result<FileWithMeta> {
145+
pub fn blocking_first_file(&self, operator: &Operator) -> Result<StageFileInfo> {
161146
let mut files = self.blocking_list(operator, true)?;
162147
match files.pop() {
163148
None => Err(ErrorCode::BadArguments("no file found")),
@@ -169,7 +154,7 @@ impl StageFilesInfo {
169154
&self,
170155
operator: &Operator,
171156
first_only: bool,
172-
) -> Result<Vec<FileWithMeta>> {
157+
) -> Result<Vec<StageFileInfo>> {
173158
if let Some(files) = &self.files {
174159
let mut res = Vec::new();
175160
for file in files {
@@ -179,7 +164,7 @@ impl StageFilesInfo {
179164
.to_string();
180165
let meta = operator.blocking().stat(&full_path)?;
181166
if meta.mode().is_file() {
182-
res.push(FileWithMeta::new(&full_path, meta))
167+
res.push(StageFileInfo::new(full_path, &meta))
183168
} else {
184169
return Err(ErrorCode::BadArguments(format!(
185170
"{full_path} is not a file"
@@ -201,11 +186,11 @@ impl StageFilesInfo {
201186
path: &str,
202187
pattern: Option<Regex>,
203188
first_only: bool,
204-
) -> Result<Vec<FileWithMeta>> {
189+
) -> Result<Vec<StageFileInfo>> {
205190
let root_meta = operator.stat(path).await;
206191
match root_meta {
207192
Ok(meta) => match meta.mode() {
208-
EntryMode::FILE => return Ok(vec![FileWithMeta::new(path, meta)]),
193+
EntryMode::FILE => return Ok(vec![StageFileInfo::new(path.to_string(), &meta)]),
209194
EntryMode::DIR => {}
210195
EntryMode::Unknown => {
211196
return Err(ErrorCode::BadArguments("object mode is unknown"));
@@ -224,10 +209,9 @@ impl StageFilesInfo {
224209
let mut files = Vec::new();
225210
let mut list = operator.scan(path).await?;
226211
while let Some(obj) = list.try_next().await? {
227-
// todo(youngsofun): not always need Metakey::Complete
228-
let meta = operator.metadata(&obj, Metakey::Complete).await?;
212+
let meta = operator.metadata(&obj, StageFileInfo::meta_query()).await?;
229213
if check_file(obj.path(), meta.mode(), &pattern) {
230-
files.push(FileWithMeta::new(obj.path(), meta));
214+
files.push(StageFileInfo::new(obj.path().to_string(), &meta));
231215
if first_only {
232216
return Ok(files);
233217
}
@@ -253,13 +237,13 @@ fn blocking_list_files_with_pattern(
253237
path: &str,
254238
pattern: Option<Regex>,
255239
first_only: bool,
256-
) -> Result<Vec<FileWithMeta>> {
240+
) -> Result<Vec<StageFileInfo>> {
257241
let operator = operator.blocking();
258242

259243
let root_meta = operator.stat(path);
260244
match root_meta {
261245
Ok(meta) => match meta.mode() {
262-
EntryMode::FILE => return Ok(vec![FileWithMeta::new(path, meta)]),
246+
EntryMode::FILE => return Ok(vec![StageFileInfo::new(path.to_string(), &meta)]),
263247
EntryMode::DIR => {}
264248
EntryMode::Unknown => return Err(ErrorCode::BadArguments("object mode is unknown")),
265249
},
@@ -277,9 +261,9 @@ fn blocking_list_files_with_pattern(
277261
let list = operator.list(path)?;
278262
for obj in list {
279263
let obj = obj?;
280-
let meta = operator.metadata(&obj, Metakey::Complete)?;
264+
let meta = operator.metadata(&obj, StageFileInfo::meta_query())?;
281265
if check_file(obj.path(), meta.mode(), &pattern) {
282-
files.push(FileWithMeta::new(obj.path(), meta));
266+
files.push(StageFileInfo::new(obj.path().to_string(), &meta));
283267
if first_only {
284268
return Ok(files);
285269
}

src/query/service/src/interpreters/interpreter_list.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,7 @@ impl Interpreter for ListInterpreter {
6868
files: None,
6969
pattern,
7070
};
71-
let files: Vec<StageFileInfo> = files_info
72-
.list(&op, false)
73-
.await?
74-
.into_iter()
75-
.map(|file_with_meta| file_with_meta.into())
76-
.collect::<Vec<_>>();
71+
let files: Vec<StageFileInfo> = files_info.list(&op, false).await?;
7772

7873
let names: Vec<Vec<u8>> = files
7974
.iter()

src/query/sql/src/planner/binder/copy.rs

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ use common_exception::Result;
4141
use common_meta_app::principal::OnErrorMode;
4242
use common_meta_app::principal::StageInfo;
4343
use common_storage::init_stage_operator;
44-
use common_storage::StageFileInfo;
4544
use common_storage::StageFileStatus;
4645
use common_storage::StageFilesInfo;
4746
use common_users::UserApiProvider;
@@ -540,18 +539,13 @@ impl<'a> Binder {
540539
}
541540

542541
let operator = init_stage_operator(&stage_info)?;
543-
let files = if operator.info().can_blocking() {
542+
let mut files = if operator.info().can_blocking() {
544543
files_info.blocking_list(&operator, false)
545544
} else {
546545
files_info.list(&operator, false).await
547546
}?;
548547

549-
let mut all_source_file_infos = files
550-
.into_iter()
551-
.map(|file_with_meta| StageFileInfo::new(file_with_meta.path, &file_with_meta.metadata))
552-
.collect::<Vec<_>>();
553-
554-
info!("end to list files: {}", all_source_file_infos.len());
548+
info!("end to list files: {}", files.len());
555549

556550
if !stmt.force {
557551
// Status.
@@ -561,29 +555,24 @@ impl<'a> Binder {
561555
info!(status);
562556
}
563557

564-
all_source_file_infos = self
558+
files = self
565559
.ctx
566-
.color_copied_files(
567-
dst_catalog_name,
568-
dst_database_name,
569-
dst_table_name,
570-
all_source_file_infos,
571-
)
560+
.color_copied_files(dst_catalog_name, dst_database_name, dst_table_name, files)
572561
.await?;
573562

574-
info!("end to color copied files: {}", all_source_file_infos.len());
563+
info!("end to color copied files: {}", files.len());
575564
}
576565

577566
let mut need_copy_file_infos = vec![];
578-
for file in &all_source_file_infos {
567+
for file in &files {
579568
if file.status == StageFileStatus::NeedCopy {
580569
need_copy_file_infos.push(file.clone());
581570
}
582571
}
583572

584573
info!(
585574
"copy: read all files finished, all:{}, need copy:{}, elapsed:{}",
586-
all_source_file_infos.len(),
575+
files.len(),
587576
need_copy_file_infos.len(),
588577
start.elapsed().as_secs()
589578
);
@@ -630,7 +619,7 @@ impl<'a> Binder {
630619
schema: dst_table.schema(),
631620
from: Box::new(query_plan),
632621
stage_info: Box::new(stage_info),
633-
all_source_file_infos,
622+
all_source_file_infos: files,
634623
need_copy_file_infos,
635624
validation_mode,
636625
})))

src/query/storages/parquet/src/parquet_table/partition.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ impl ParquetTable {
118118
self.files_info.list(&self.operator, false).await
119119
}?
120120
.into_iter()
121-
.map(|f| (f.path, f.metadata.content_length()))
121+
.map(|f| (f.path, f.size))
122122
.collect::<Vec<_>>(),
123123
};
124124

src/query/storages/stage/src/stage_table.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ impl StageTable {
7878
.list(&op, false)
7979
.await?
8080
.into_iter()
81-
.map(|file_with_meta| StageFileInfo::new(file_with_meta.path, &file_with_meta.metadata))
8281
.collect::<Vec<_>>();
8382
Ok(infos)
8483
}

0 commit comments

Comments
 (0)