Skip to content

Commit b4993cc

Browse files
committed
add parallel formatting again, respect minbytestousedirectio
1 parent ebdf147 commit b4993cc

File tree

1 file changed

+8
-14
lines changed

1 file changed

+8
-14
lines changed

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ namespace Setting
196196
extern const SettingsBool enable_shared_storage_snapshot_in_query;
197197
extern const SettingsUInt64 merge_tree_storage_snapshot_sleep_ms;
198198
extern const SettingsBool allow_experimental_export_merge_tree_part;
199+
extern const SettingsUInt64 min_bytes_to_use_direct_io;
199200
}
200201

201202
namespace MergeTreeSetting
@@ -5964,15 +5965,9 @@ void MergeTreeData::exportPartToTableImpl(
59645965
export_manifests.erase(manifest);
59655966
};
59665967

5967-
auto context_copy = Context::createCopy(local_context);
5968-
5969-
/// Manually disable parallelism because the idea is to control parallelism with tasks, not with formatting
5970-
context_copy->setSetting("output_format_parallel_formatting", false);
5971-
context_copy->setSetting("max_threads", 1);
5972-
59735968
auto metadata_snapshot = getInMemoryMetadataPtr();
59745969
Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
5975-
StorageSnapshotPtr storage_snapshot = getStorageSnapshot(metadata_snapshot, context_copy);
5970+
StorageSnapshotPtr storage_snapshot = getStorageSnapshot(metadata_snapshot, local_context);
59765971

59775972
MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export;
59785973

@@ -5999,7 +5994,7 @@ void MergeTreeData::exportPartToTableImpl(
59995994
auto sink = destination_storage->import(
60005995
manifest.data_part->name,
60015996
block_with_partition_values,
6002-
context_copy,
5997+
local_context,
60035998
part_log_wrapper);
60045999

60056000
/// Most likely the file has already been imported, so we can just return
@@ -6011,9 +6006,8 @@ void MergeTreeData::exportPartToTableImpl(
60116006
return;
60126007
}
60136008

6014-
/// todo implement these settings
60156009
bool apply_deleted_mask = true;
6016-
bool read_with_direct_io = false;
6010+
bool read_with_direct_io = local_context->getSettingsRef()[Setting::min_bytes_to_use_direct_io] > manifest.data_part->getBytesOnDisk();
60176011
bool prefetch = false;
60186012

60196013
MergeTreeData::IMutationsSnapshot::Params params
@@ -6027,7 +6021,7 @@ void MergeTreeData::exportPartToTableImpl(
60276021
auto alter_conversions = MergeTreeData::getAlterConversionsForPart(
60286022
manifest.data_part,
60296023
mutations_snapshot,
6030-
context_copy);
6024+
local_context);
60316025

60326026
QueryPlan plan_for_part;
60336027

@@ -6045,11 +6039,11 @@ void MergeTreeData::exportPartToTableImpl(
60456039
std::nullopt,
60466040
read_with_direct_io,
60476041
prefetch,
6048-
context_copy,
6042+
local_context,
60496043
getLogger("ExportPartition"));
60506044

6051-
QueryPlanOptimizationSettings optimization_settings(context_copy);
6052-
auto pipeline_settings = BuildQueryPipelineSettings(context_copy);
6045+
QueryPlanOptimizationSettings optimization_settings(local_context);
6046+
auto pipeline_settings = BuildQueryPipelineSettings(local_context);
60536047
auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings);
60546048
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
60556049

0 commit comments

Comments
 (0)