Skip to content

Commit 90f689d

Browse files
authored
fix(Copy): max_files was set to 1000 when not provided. (#10813)
* fix(Copy): purge only copied files. * chore(Copy): log max_files. * ci(copy): polish tests.
1 parent f324555 commit 90f689d

File tree

10 files changed

+140
-130
lines changed

10 files changed

+140
-130
lines changed

src/query/catalog/src/table_context.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,12 @@ pub trait TableContext: Send + Sync {
136136
async fn get_table(&self, catalog: &str, database: &str, table: &str)
137137
-> Result<Arc<dyn Table>>;
138138

139-
async fn color_copied_files(
139+
async fn filter_out_copied_files(
140140
&self,
141141
catalog_name: &str,
142142
database_name: &str,
143143
table_name: &str,
144-
files: Vec<StageFileInfo>,
144+
files: &[StageFileInfo],
145145
max_files: Option<usize>,
146146
) -> Result<Vec<StageFileInfo>>;
147147
}

src/query/service/src/interpreters/interpreter_copy.rs

Lines changed: 26 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ use common_meta_types::MetaId;
3535
use common_pipeline_core::processors::processor::ProcessorPtr;
3636
use common_sql::executor::table_read_plan::ToReadDataSourcePlan;
3737
use common_storage::StageFileInfo;
38-
use common_storage::StageFileStatus;
3938
use common_storage::StageFilesInfo;
4039
use common_storages_fuse::io::Files;
4140
use common_storages_stage::StageTable;
@@ -167,7 +166,6 @@ impl CopyInterpreter {
167166
table_name: &str,
168167
query: &Plan,
169168
stage_info: StageInfo,
170-
all_source_file_infos: Vec<StageFileInfo>,
171169
need_copy_file_infos: Vec<StageFileInfo>,
172170
force: bool,
173171
) -> Result<PipelineBuildResult> {
@@ -208,7 +206,6 @@ impl CopyInterpreter {
208206
ctx.clone(),
209207
to_table,
210208
stage_info,
211-
all_source_file_infos,
212209
need_copy_file_infos,
213210
force,
214211
)?;
@@ -253,45 +250,43 @@ impl CopyInterpreter {
253250
Some(max_files)
254251
};
255252

256-
let mut all_source_file_infos = if force {
253+
let all_source_file_infos = if force {
257254
StageTable::list_files(&stage_table_info, max_files).await?
258255
} else {
259256
StageTable::list_files(&stage_table_info, None).await?
260257
};
258+
let num_all_files = all_source_file_infos.len();
261259

262-
info!("end to list files: {}", all_source_file_infos.len());
260+
info!("end to list files: got {} files", num_all_files);
263261

264-
if !force {
262+
let need_copy_file_infos = if force {
263+
all_source_file_infos
264+
} else {
265265
// Status.
266266
{
267-
let status = "begin to color copied files";
267+
let status = "begin to filter out copied files";
268268
ctx.set_status_info(status);
269269
info!(status);
270270
}
271271

272-
all_source_file_infos = table_ctx
273-
.color_copied_files(
272+
let files = table_ctx
273+
.filter_out_copied_files(
274274
catalog_name,
275275
database_name,
276276
table_name,
277-
all_source_file_infos,
277+
&all_source_file_infos,
278278
max_files,
279279
)
280280
.await?;
281281

282-
info!("end to color copied files: {}", all_source_file_infos.len());
283-
}
284-
285-
let mut need_copy_file_infos = vec![];
286-
for file in &all_source_file_infos {
287-
if file.status == StageFileStatus::NeedCopy {
288-
need_copy_file_infos.push(file.clone());
289-
}
290-
}
282+
info!("end filtering out copied files: {}", num_all_files);
283+
files
284+
};
291285

292286
info!(
293-
"copy: read all files finished, all:{}, need copy:{}, elapsed:{}",
294-
all_source_file_infos.len(),
287+
"copy: read files with max_files={:?} finished, all:{}, need copy:{}, elapsed:{}",
288+
max_files,
289+
num_all_files,
295290
need_copy_file_infos.len(),
296291
start.elapsed().as_secs()
297292
);
@@ -383,7 +378,6 @@ impl CopyInterpreter {
383378
ctx.clone(),
384379
to_table,
385380
stage_table_info_clone.stage_info,
386-
all_source_file_infos,
387381
need_copy_file_infos,
388382
force,
389383
)?;
@@ -410,18 +404,18 @@ impl CopyInterpreter {
410404
ctx: Arc<QueryContext>,
411405
to_table: Arc<dyn Table>,
412406
stage_info: StageInfo,
413-
all_source_files: Vec<StageFileInfo>,
414-
need_copy_files: Vec<StageFileInfo>,
407+
copied_files: Vec<StageFileInfo>,
415408
force: bool,
416409
) -> Result<()> {
417-
let mut copied_files = BTreeMap::new();
418-
for file in need_copy_files {
410+
let num_copied_files = copied_files.len();
411+
let mut copied_file_tree = BTreeMap::new();
412+
for file in &copied_files {
419413
// Short the etag to 7 bytes for less space in metasrv.
420414
let short_etag = file.etag.clone().map(|mut v| {
421415
v.truncate(7);
422416
v
423417
});
424-
copied_files.insert(file.path.clone(), TableCopiedFileInfo {
418+
copied_file_tree.insert(file.path.clone(), TableCopiedFileInfo {
425419
etag: short_etag,
426420
content_length: file.size,
427421
last_modified: Some(file.last_modified),
@@ -434,18 +428,17 @@ impl CopyInterpreter {
434428

435429
let table_id = to_table.get_id();
436430
let expire_hours = ctx.get_settings().get_load_file_metadata_expire_hours()?;
437-
let num_copied_files = copied_files.len();
438431

439432
let fail_if_duplicated = !force;
440433
let upsert_copied_files_request = Self::upsert_copied_files_request(
441434
table_id,
442435
expire_hours,
443-
copied_files,
436+
copied_file_tree,
444437
fail_if_duplicated,
445438
);
446439

447440
{
448-
let status = format!("begin commit, number of copied files:{}", num_copied_files,);
441+
let status = format!("begin commit, number of copied files:{}", num_copied_files);
449442
ctx.set_status_info(&status);
450443
info!(status);
451444
}
@@ -482,17 +475,17 @@ impl CopyInterpreter {
482475

483476
// Status.
484477
{
485-
let status = format!("begin to purge files:{}", all_source_files.len());
478+
let status = format!("begin to purge files:{}", num_copied_files);
486479
ctx.set_status_info(&status);
487480
info!(status);
488481
}
489482

490-
CopyInterpreter::try_purge_files(ctx.clone(), &stage_info, &all_source_files).await;
483+
CopyInterpreter::try_purge_files(ctx.clone(), &stage_info, &copied_files).await;
491484

492485
// Status.
493486
info!(
494487
"end to purge files:{}, elapsed:{}",
495-
all_source_files.len(),
488+
num_copied_files,
496489
purge_start.elapsed().as_secs()
497490
);
498491
}
@@ -560,7 +553,6 @@ impl Interpreter for CopyInterpreter {
560553
table_name,
561554
stage_info,
562555
from,
563-
all_source_file_infos,
564556
need_copy_file_infos,
565557
force,
566558
..
@@ -571,7 +563,6 @@ impl Interpreter for CopyInterpreter {
571563
table_name,
572564
from,
573565
*stage_info.clone(),
574-
all_source_file_infos.clone(),
575566
need_copy_file_infos.clone(),
576567
*force,
577568
)

src/query/service/src/sessions/query_ctx.rs

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// limitations under the License.
1414

1515
use std::cmp::min;
16-
use std::collections::BTreeMap;
1716
use std::collections::HashMap;
1817
use std::collections::HashSet;
1918
use std::collections::VecDeque;
@@ -55,7 +54,6 @@ use common_settings::ChangeValue;
5554
use common_settings::Settings;
5655
use common_storage::DataOperator;
5756
use common_storage::StageFileInfo;
58-
use common_storage::StageFileStatus;
5957
use common_storage::StorageMetrics;
6058
use common_storages_fuse::TableContext;
6159
use common_storages_parquet::ParquetTable;
@@ -494,46 +492,43 @@ impl TableContext for QueryContext {
494492
self.shared.get_table(catalog, database, table).await
495493
}
496494

497-
async fn color_copied_files(
495+
async fn filter_out_copied_files(
498496
&self,
499497
catalog_name: &str,
500498
database_name: &str,
501499
table_name: &str,
502-
files: Vec<StageFileInfo>,
500+
files: &[StageFileInfo],
503501
max_files: Option<usize>,
504502
) -> Result<Vec<StageFileInfo>> {
505503
let tenant = self.get_tenant();
506-
let files = files.clone();
507504
let catalog = self.get_catalog(catalog_name)?;
508505
let table = catalog
509506
.get_table(&tenant, database_name, table_name)
510507
.await?;
511508
let table_id = table.get_id();
512509

513510
let mut limit: usize = 0;
514-
let max_files = max_files.unwrap_or(MAX_QUERY_COPIED_FILES_NUM);
515-
let max_copied_files = min(MAX_QUERY_COPIED_FILES_NUM, max_files);
516-
let mut copied_files = BTreeMap::new();
511+
let max_files = max_files.unwrap_or(usize::MAX);
512+
let batch_size = min(MAX_QUERY_COPIED_FILES_NUM, max_files);
517513

518514
let mut results = Vec::with_capacity(files.len());
519515

520-
for chunk in files.chunks(max_copied_files) {
516+
for chunk in files.chunks(batch_size) {
521517
let files = chunk.iter().map(|v| v.path.clone()).collect::<Vec<_>>();
522518
let req = GetTableCopiedFileReq { table_id, files };
523-
let resp = catalog
519+
let copied_files = catalog
524520
.get_table_copied_file_info(&tenant, database_name, req)
525-
.await?;
526-
copied_files.extend(resp.file_info);
521+
.await?
522+
.file_info;
527523
// Colored
528524
for file in chunk {
529-
let mut file = file.clone();
530525
if let Some(copied_file) = copied_files.get(&file.path) {
531526
match &copied_file.etag {
532527
Some(copied_etag) => {
533528
if let Some(file_etag) = &file.etag {
534529
// Check the 7 bytes etag prefix.
535530
if file_etag.starts_with(copied_etag) {
536-
file.status = StageFileStatus::AlreadyCopied;
531+
continue;
537532
}
538533
}
539534
}
@@ -542,17 +537,16 @@ impl TableContext for QueryContext {
542537
if copied_file.content_length == file.size
543538
&& copied_file.last_modified == Some(file.last_modified)
544539
{
545-
file.status = StageFileStatus::AlreadyCopied;
540+
continue;
546541
}
547542
}
548543
}
549544
}
550-
if file.status == StageFileStatus::NeedCopy {
551-
results.push(file);
552-
limit += 1;
553-
if limit == max_files {
554-
return Ok(results);
555-
}
545+
546+
results.push(file.clone());
547+
limit += 1;
548+
if limit == max_files {
549+
return Ok(results);
556550
}
557551
}
558552
}

src/query/service/tests/it/storages/fuse/operations/commit.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -509,12 +509,12 @@ impl TableContext for CtxDelegation {
509509
todo!()
510510
}
511511

512-
async fn color_copied_files(
512+
async fn filter_out_copied_files(
513513
&self,
514514
_catalog_name: &str,
515515
_database_name: &str,
516516
_table_name: &str,
517-
_files: Vec<StageFileInfo>,
517+
_files: &[StageFileInfo],
518518
_max_files: Option<usize>,
519519
) -> Result<Vec<StageFileInfo>> {
520520
todo!()

src/query/sql/src/planner/binder/copy.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -557,11 +557,11 @@ impl<'a> Binder {
557557

558558
files = self
559559
.ctx
560-
.color_copied_files(
560+
.filter_out_copied_files(
561561
dst_catalog_name,
562562
dst_database_name,
563563
dst_table_name,
564-
files,
564+
&files,
565565
None,
566566
)
567567
.await?;
@@ -625,7 +625,6 @@ impl<'a> Binder {
625625
schema: dst_table.schema(),
626626
from: Box::new(query_plan),
627627
stage_info: Box::new(stage_info),
628-
all_source_file_infos: files,
629628
need_copy_file_infos,
630629
validation_mode,
631630
force: stmt.force,

src/query/sql/src/planner/plans/copy.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ pub enum CopyPlan {
7575
stage_info: Box<StageInfo>,
7676
validation_mode: ValidationMode,
7777
from: Box<Plan>,
78-
all_source_file_infos: Vec<StageFileInfo>,
7978
need_copy_file_infos: Vec<StageFileInfo>,
8079
force: bool,
8180
},
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
--- force = false, purge = false
2+
4
3+
remain 3 files
4+
6
5+
remain 3 files
6+
6
7+
remain 3 files
8+
--- force = false, purge = true
9+
4
10+
remain 1 files
11+
6
12+
remain 0 files
13+
6
14+
remain 0 files
15+
--- force = true, purge = false
16+
4
17+
remain 3 files
18+
8
19+
remain 3 files
20+
12
21+
remain 3 files
22+
--- force = true, purge = true
23+
4
24+
remain 1 files
25+
6
26+
remain 0 files
27+
6
28+
remain 0 files

0 commit comments

Comments
 (0)