Skip to content

Commit 37f37c9

Browse files
authored
feat(query): copy with transform support max_files (#10958)
* feat(query): copy with transform support max_files * modify debug test
1 parent b2da0d2 commit 37f37c9

File tree

8 files changed

+130
-18
lines changed

8 files changed

+130
-18
lines changed

src/common/storage/src/stage.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ impl StageFilesInfo {
159159
}
160160

161161
pub fn blocking_first_file(&self, operator: &Operator) -> Result<StageFileInfo> {
162-
let mut files = self.blocking_list(operator, true)?;
162+
let mut files = self.blocking_list(operator, true, None)?;
163163
match files.pop() {
164164
None => Err(ErrorCode::BadArguments("no file found")),
165165
Some(f) => Ok(f),
@@ -170,7 +170,10 @@ impl StageFilesInfo {
170170
&self,
171171
operator: &Operator,
172172
first_only: bool,
173+
max_files: Option<usize>,
173174
) -> Result<Vec<StageFileInfo>> {
175+
let max_files = max_files.unwrap_or(usize::MAX);
176+
let mut limit = 0;
174177
if let Some(files) = &self.files {
175178
let mut res = Vec::new();
176179
for file in files {
@@ -189,11 +192,15 @@ impl StageFilesInfo {
189192
if first_only {
190193
break;
191194
}
195+
limit += 1;
196+
if limit == max_files {
197+
return Ok(res);
198+
}
192199
}
193200
Ok(res)
194201
} else {
195202
let pattern = self.get_pattern()?;
196-
blocking_list_files_with_pattern(operator, &self.path, pattern, first_only)
203+
blocking_list_files_with_pattern(operator, &self.path, pattern, first_only, max_files)
197204
}
198205
}
199206

@@ -260,6 +267,7 @@ fn blocking_list_files_with_pattern(
260267
path: &str,
261268
pattern: Option<Regex>,
262269
first_only: bool,
270+
max_files: usize,
263271
) -> Result<Vec<StageFileInfo>> {
264272
let operator = operator.blocking();
265273

@@ -282,6 +290,7 @@ fn blocking_list_files_with_pattern(
282290
// path is a dir
283291
let mut files = Vec::new();
284292
let list = operator.list(path)?;
293+
let mut limit = 0;
285294
for obj in list {
286295
let obj = obj?;
287296
let meta = operator.metadata(&obj, StageFileInfo::meta_query())?;
@@ -290,6 +299,10 @@ fn blocking_list_files_with_pattern(
290299
if first_only {
291300
return Ok(files);
292301
}
302+
limit += 1;
303+
if limit == max_files {
304+
return Ok(files);
305+
}
293306
}
294307
}
295308
Ok(files)

src/query/sql/src/planner/binder/copy.rs

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ use common_exception::Result;
4141
use common_meta_app::principal::OnErrorMode;
4242
use common_meta_app::principal::StageInfo;
4343
use common_storage::init_stage_operator;
44-
use common_storage::StageFileStatus;
4544
use common_storage::StageFilesInfo;
4645
use common_users::UserApiProvider;
4746
use tracing::debug;
@@ -547,15 +546,30 @@ impl<'a> Binder {
547546
}
548547

549548
let operator = init_stage_operator(&stage_info)?;
549+
let max_files = stage_info.copy_options.max_files;
550+
let max_files = if max_files == 0 {
551+
None
552+
} else {
553+
Some(max_files)
554+
};
550555
let mut files = if operator.info().can_blocking() {
551-
files_info.blocking_list(&operator, false)
556+
if stmt.force {
557+
files_info.blocking_list(&operator, false, max_files)
558+
} else {
559+
files_info.blocking_list(&operator, false, None)
560+
}
561+
} else if stmt.force {
562+
files_info.list(&operator, false, max_files).await
552563
} else {
553564
files_info.list(&operator, false, None).await
554565
}?;
555566

556-
info!("end to list files: {}", files.len());
567+
let num_all_files = files.len();
568+
info!("end to list files: {}", num_all_files);
557569

558-
if !stmt.force {
570+
let need_copy_file_infos = if stmt.force {
571+
files
572+
} else {
559573
// Status.
560574
{
561575
let status = "begin to color copied files";
@@ -570,24 +584,19 @@ impl<'a> Binder {
570584
dst_database_name,
571585
dst_table_name,
572586
&files,
573-
None,
587+
max_files,
574588
)
575589
.await?;
576590

577591
info!("end to color copied files: {}", files.len());
578-
}
579-
580-
let mut need_copy_file_infos = vec![];
581-
for file in &files {
582-
if file.status == StageFileStatus::NeedCopy {
583-
need_copy_file_infos.push(file.clone());
584-
}
585-
}
592+
files
593+
};
586594

587595
info!(
588-
"copy: read all files finished, all:{}, need copy:{}, elapsed:{}",
589-
files.len(),
596+
"copy: read all files finished, all:{}, need copy:{}, max_files:{:?}, elapsed:{}",
597+
num_all_files,
590598
need_copy_file_infos.len(),
599+
max_files,
591600
start.elapsed().as_secs()
592601
);
593602

src/query/storages/parquet/src/parquet_table/partition.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ impl ParquetTable {
114114
.map(|f| (f.path.clone(), f.size))
115115
.collect::<Vec<_>>(),
116116
None => if self.operator.info().can_blocking() {
117-
self.files_info.blocking_list(&self.operator, false)
117+
self.files_info.blocking_list(&self.operator, false, None)
118118
} else {
119119
self.files_info.list(&self.operator, false, None).await
120120
}?

tests/data/00_0005/data/f1.parquet

296 Bytes
Binary file not shown.

tests/data/00_0005/data/f2.parquet

296 Bytes
Binary file not shown.

tests/data/00_0005/data/f3.parquet

296 Bytes
Binary file not shown.
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
--- force = false, purge = false
2+
6
3+
4
4+
remain 3 files
5+
6
6+
remain 3 files
7+
ERROR 1105 (HY000) at line 1: Code: 1016, Text = no file need to copy.
8+
6
9+
remain 3 files
10+
--- force = false, purge = true
11+
6
12+
4
13+
remain 1 files
14+
6
15+
remain 0 files
16+
ERROR 1105 (HY000) at line 1: Code: 1016, Text = no file need to copy.
17+
6
18+
remain 0 files
19+
--- force = true, purge = false
20+
6
21+
4
22+
remain 3 files
23+
8
24+
remain 3 files
25+
12
26+
remain 3 files
27+
--- force = true, purge = true
28+
6
29+
4
30+
remain 1 files
31+
6
32+
remain 0 files
33+
ERROR 1105 (HY000) at line 1: Code: 1016, Text = no file need to copy.
34+
6
35+
remain 0 files
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#!/usr/bin/env bash
2+
3+
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
4+
. "$CURDIR"/../../../shell_env.sh
5+
6+
7+
# Should be <root>/tests/data/
8+
9+
for force in 'false' 'true'
10+
do
11+
for purge in 'false' 'true'
12+
do
13+
table="test_max_files_force_${force}_purge_${purge}"
14+
echo "drop table if exists ${table}" | $MYSQL_CLIENT_CONNECT
15+
echo "CREATE TABLE ${table} (
16+
id INT,
17+
c1 INT
18+
) ENGINE=FUSE;" | $MYSQL_CLIENT_CONNECT
19+
done
20+
done
21+
22+
gen_files() {
23+
rm -rf /tmp/00_0005
24+
cp -r "$CURDIR"/../../../data/00_0005 /tmp
25+
}
26+
27+
echo "drop stage if exists s5;" | $MYSQL_CLIENT_CONNECT
28+
echo "create stage s5 url = 'fs:///tmp/00_0005/data/' FILE_FORMAT = (type = PARQUET)" | $MYSQL_CLIENT_CONNECT
29+
30+
31+
for force in 'false' 'true'
32+
do
33+
for purge in 'false' 'true'
34+
do
35+
gen_files
36+
echo "--- force = ${force}, purge = ${purge}"
37+
echo "select count(*) from @s5" | $MYSQL_CLIENT_CONNECT
38+
for i in {1..3}
39+
do
40+
table="test_max_files_force_${force}_purge_${purge}"
41+
echo "copy into ${table} from (select * from @s5 t) max_files=2 force=${force} purge=${purge}" | $MYSQL_CLIENT_CONNECT
42+
echo "select count(*) from ${table}" | $MYSQL_CLIENT_CONNECT
43+
remain=$(ls -1 /tmp/00_0005/data/ | wc -l | sed 's/ //g')
44+
echo "remain ${remain} files"
45+
done
46+
done
47+
done
48+
49+
for force in 'false' 'true'
50+
do
51+
for purge in 'false' 'true'
52+
do
53+
echo "drop table if exists test_max_files_force_${force}_purge_${purge}" | $MYSQL_CLIENT_CONNECT
54+
done
55+
done

0 commit comments

Comments
 (0)