Skip to content

Commit 149c773

Browse files
authored
feat: add table option copy_dedup_full_path (#17473)
* add table copy opt `copy_dedup_full_path`. * rm println!
1 parent 99a4cf9 commit 149c773

File tree

18 files changed

+164
-12
lines changed

18 files changed

+164
-12
lines changed

.github/actions/test_sqllogic_stage/action.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,17 @@ inputs:
66
required: true
77
default: ""
88
handlers:
9-
description: "logic test handlers, choices: mysql,http,clickhouse"
9+
description: "logic test handlers, choices: hybrid,http"
1010
required: true
1111
default: ""
1212
storage:
1313
description: "storage backend for stage, choices: s3,fs"
1414
required: true
1515
default: ""
16+
deducp:
17+
description: "path type for dedup when copy, choices: full_path,sub_path"
18+
required: true
19+
default: ""
1620
runs:
1721
using: "composite"
1822
steps:
@@ -43,4 +47,5 @@ runs:
4347
env:
4448
TEST_HANDLERS: ${{ inputs.handlers }}
4549
TEST_STAGE_STORAGE: ${{ inputs.storage }}
50+
TEST_STAGE_DEDUP: ${{ inputs.dedup}}
4651
run: bash ./scripts/ci/ci-run-sqllogic-tests-without-sandbox.sh ${{ inputs.dirs }}

.github/workflows/reuse.sqllogic.yml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,19 +206,26 @@ jobs:
206206
storage:
207207
- "s3"
208208
- "fs"
209+
handler:
210+
- "hybrid"
211+
- "http"
212+
dedup:
213+
- "full_path"
214+
- "sub_path"
209215
steps:
210216
- uses: actions/checkout@v4
211217
- uses: ./.github/actions/test_sqllogic_stage
212218
timeout-minutes: 15
213219
with:
214220
storage: ${{ matrix.storage }}
215221
dirs: stage
216-
handlers: http,hybrid
222+
handlers: ${{ matrix.handler }}
223+
dedup: ${{ matrix.dedup }}
217224
- name: Upload failure
218225
if: failure()
219226
uses: ./.github/actions/artifact_failure
220227
with:
221-
name: test-sqllogic-stage-${{ matrix.storage }}
228+
name: test-sqllogic-stage-${{ matrix.storage }}-${{ matrix.handler }}-${{ matrix.dedup }}
222229

223230
standalone_no_table_meta_cache:
224231
runs-on: [self-hosted, X64, Linux, 2c8g, "${{ inputs.runner_provider }}"]

src/query/catalog/src/table_context.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ pub trait TableContext: Send + Sync {
280280
database_name: &str,
281281
table_name: &str,
282282
files: &[StageFileInfo],
283+
path_prefix: Option<String>,
283284
max_files: Option<usize>,
284285
) -> Result<FilteredCopyFiles>;
285286

src/query/service/src/interpreters/common/table_option_validation.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE;
3737
use databend_storages_common_table_meta::table::OPT_KEY_COMMENT;
3838
use databend_storages_common_table_meta::table::OPT_KEY_CONNECTION_NAME;
3939
use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
40+
use databend_storages_common_table_meta::table::OPT_KEY_ENABLE_COPY_DEDUP_FULL_PATH;
4041
use databend_storages_common_table_meta::table::OPT_KEY_ENGINE;
4142
use databend_storages_common_table_meta::table::OPT_KEY_LOCATION;
4243
use databend_storages_common_table_meta::table::OPT_KEY_RANDOM_MAX_ARRAY_LEN;
@@ -72,6 +73,7 @@ pub static CREATE_FUSE_OPTIONS: LazyLock<HashSet<&'static str>> = LazyLock::new(
7273

7374
r.insert("transient");
7475
r.insert(OPT_KEY_TEMP_PREFIX);
76+
r.insert(OPT_KEY_ENABLE_COPY_DEDUP_FULL_PATH);
7577
r
7678
});
7779

@@ -109,6 +111,7 @@ pub static UNSET_TABLE_OPTIONS_WHITE_LIST: LazyLock<HashSet<&'static str>> = Laz
109111
r.insert(FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD);
110112
r.insert(FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD);
111113
r.insert(FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS);
114+
r.insert(OPT_KEY_ENABLE_COPY_DEDUP_FULL_PATH);
112115
r
113116
});
114117

src/query/service/src/interpreters/interpreter_copy_into_table.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ impl CopyIntoTableInterpreter {
232232
duplicated_files_detected: Vec<String>,
233233
update_stream_meta: Vec<UpdateStreamMetaReq>,
234234
deduplicated_label: Option<String>,
235+
path_prefix: Option<String>,
235236
) -> Result<()> {
236237
let ctx = self.ctx.clone();
237238
let to_table = ctx
@@ -249,6 +250,7 @@ impl CopyIntoTableInterpreter {
249250
to_table.as_ref(),
250251
&files_to_copy,
251252
&plan.stage_table_info.copy_into_table_options,
253+
path_prefix,
252254
)?;
253255

254256
to_table.commit_insertion(
@@ -371,6 +373,7 @@ impl Interpreter for CopyIntoTableInterpreter {
371373
duplicated_files_detected,
372374
update_stream_meta,
373375
unsafe { self.ctx.get_settings().get_deduplicate_label()? },
376+
self.plan.path_prefix.clone(),
374377
)
375378
.await?;
376379
}

src/query/service/src/interpreters/interpreter_table_create.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::str::FromStr;
1616
use std::sync::Arc;
1717

1818
use chrono::Utc;
19+
use databend_common_ast::ast::Engine;
1920
use databend_common_base::runtime::GlobalIORuntime;
2021
use databend_common_config::GlobalConfig;
2122
use databend_common_exception::ErrorCode;
@@ -50,6 +51,7 @@ use databend_storages_common_cache::LoadParams;
5051
use databend_storages_common_table_meta::meta::TableSnapshot;
5152
use databend_storages_common_table_meta::meta::Versioned;
5253
use databend_storages_common_table_meta::table::OPT_KEY_COMMENT;
54+
use databend_storages_common_table_meta::table::OPT_KEY_ENABLE_COPY_DEDUP_FULL_PATH;
5355
use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
5456
use databend_storages_common_table_meta::table::OPT_KEY_STORAGE_FORMAT;
5557
use databend_storages_common_table_meta::table::OPT_KEY_STORAGE_PREFIX;
@@ -383,6 +385,17 @@ impl CreateTableInterpreter {
383385
};
384386
let schema = TableSchemaRefExt::create(fields);
385387
let mut options = self.plan.options.clone();
388+
389+
if self.plan.engine == Engine::Fuse {
390+
let settings = self.ctx.get_settings();
391+
// change default to 1 when all query server is ready to processing it.
392+
if settings.get_copy_dedup_full_path_by_default()? {
393+
options.insert(
394+
OPT_KEY_ENABLE_COPY_DEDUP_FULL_PATH.to_string(),
395+
"1".to_string(),
396+
);
397+
};
398+
}
386399
if let Some(storage_format) = options.get(OPT_KEY_STORAGE_FORMAT) {
387400
FuseStorageFormat::from_str(storage_format)?;
388401
}
@@ -416,10 +429,12 @@ impl CreateTableInterpreter {
416429
for table_option in table_meta.options.iter() {
417430
let key = table_option.0.to_lowercase();
418431
if !is_valid_create_opt(&key, &self.plan.engine) {
419-
error!("invalid opt for fuse table in create table statement");
420-
return Err(ErrorCode::TableOptionInvalid(format!(
421-
"table option {key} is invalid for create table statement",
422-
)));
432+
let msg = format!(
433+
"table option {key} is invalid for create table statement with engine {}",
434+
self.plan.engine
435+
);
436+
error!("{msg}");
437+
return Err(ErrorCode::TableOptionInvalid(msg));
423438
}
424439
}
425440

src/query/service/src/pipelines/builders/builder_copy_into_table.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ impl PipelineBuilder {
197197
to_table: &dyn Table,
198198
copied_files: &[StageFileInfo],
199199
options: &CopyIntoTableOptions,
200+
path_prefix: Option<String>,
200201
) -> Result<Option<UpsertTableCopiedFileReq>> {
201202
let mut copied_file_tree = BTreeMap::new();
202203
for file in copied_files {
@@ -205,7 +206,12 @@ impl PipelineBuilder {
205206
v.truncate(7);
206207
v
207208
});
208-
copied_file_tree.insert(file.path.clone(), TableCopiedFileInfo {
209+
let path = if let Some(p) = &path_prefix {
210+
format!("{}{}", p, file.path)
211+
} else {
212+
file.path.clone()
213+
};
214+
copied_file_tree.insert(path, TableCopiedFileInfo {
209215
etag: short_etag,
210216
content_length: file.size,
211217
last_modified: Some(file.last_modified),

src/query/service/src/sessions/query_ctx.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,6 +1169,7 @@ impl TableContext for QueryContext {
11691169
database_name: &str,
11701170
table_name: &str,
11711171
files: &[StageFileInfo],
1172+
path_prefix: Option<String>,
11721173
max_files: Option<usize>,
11731174
) -> Result<FilteredCopyFiles> {
11741175
if files.is_empty() {
@@ -1195,8 +1196,20 @@ impl TableContext for QueryContext {
11951196
let mut duplicated_files = Vec::with_capacity(files.len());
11961197

11971198
for chunk in files.chunks(batch_size) {
1198-
let files = chunk.iter().map(|v| v.path.clone()).collect::<Vec<_>>();
1199-
let req = GetTableCopiedFileReq { table_id, files };
1199+
let files = chunk
1200+
.iter()
1201+
.map(|v| {
1202+
if let Some(p) = &path_prefix {
1203+
format!("{}{}", p, v.path)
1204+
} else {
1205+
v.path.clone()
1206+
}
1207+
})
1208+
.collect::<Vec<_>>();
1209+
let req = GetTableCopiedFileReq {
1210+
table_id,
1211+
files: files.clone(),
1212+
};
12001213
let start_request = Instant::now();
12011214
let copied_files = catalog
12021215
.get_table_copied_file_info(&tenant, database_name, req)
@@ -1207,8 +1220,8 @@ impl TableContext for QueryContext {
12071220
Instant::now().duration_since(start_request).as_millis() as u64,
12081221
);
12091222
// Colored
1210-
for file in chunk {
1211-
if !copied_files.contains_key(&file.path) {
1223+
for (file, key) in chunk.iter().zip(files.iter()) {
1224+
if !copied_files.contains_key(key) {
12121225
files_to_copy.push(file.clone());
12131226
result_size += 1;
12141227
if result_size == max_files {

src/query/service/tests/it/sql/exec/get_table_bind_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,7 @@ impl TableContext for CtxDelegation {
840840
_database_name: &str,
841841
_table_name: &str,
842842
_files: &[StageFileInfo],
843+
_path_prefix: Option<String>,
843844
_max_files: Option<usize>,
844845
) -> Result<FilteredCopyFiles> {
845846
todo!()

src/query/service/tests/it/storages/fuse/operations/commit.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,7 @@ impl TableContext for CtxDelegation {
715715
_database_name: &str,
716716
_table_name: &str,
717717
_files: &[StageFileInfo],
718+
_path_prefix: Option<String>,
718719
_max_files: Option<usize>,
719720
) -> Result<FilteredCopyFiles> {
720721
todo!()

0 commit comments

Comments
 (0)