Skip to content

Commit 0ca5e28

Browse files
committed
some adjustments
1 parent e225798 commit 0ca5e28

File tree

4 files changed

+8
-188
lines changed

4 files changed

+8
-188
lines changed

src/Storages/MergeTree/ExportPartTask.cpp

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,13 @@ bool ExportPartTask::executeStep()
5050

5151
MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export;
5252

53-
NamesAndTypesList partition_columns;
53+
Block block_with_partition_values;
5454
if (metadata_snapshot->hasPartitionKey())
5555
{
56-
const auto & partition_key = metadata_snapshot->getPartitionKey();
57-
if (!partition_key.column_names.empty())
58-
partition_columns = partition_key.expression->getRequiredColumnsWithTypes();
56+
/// todo arthur do I need to init minmax_idx?
57+
block_with_partition_values = manifest.data_part->minmax_idx->getBlock(storage);
5958
}
6059

61-
auto block_with_partition_values = manifest.data_part->partition.getBlockWithPartitionValues(partition_columns);
62-
6360
auto destination_storage = DatabaseCatalog::instance().tryGetTable(manifest.destination_storage_id, local_context);
6461
if (!destination_storage)
6562
{
@@ -92,7 +89,7 @@ bool ExportPartTask::executeStep()
9289
ProfileEvents::increment(ProfileEvents::PartsExportDuplicated);
9390
}
9491

95-
ProfileEvents::incrementNoTrace(ProfileEvents::PartsExportFailures);
92+
ProfileEvents::increment(ProfileEvents::PartsExportFailures);
9693

9794
std::lock_guard inner_lock(storage.export_manifests_mutex);
9895
storage.export_manifests.erase(manifest);

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 0 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -6272,182 +6272,6 @@ void MergeTreeData::exportPartToTable(
62726272
background_moves_assignee.trigger();
62736273
}
62746274

6275-
void MergeTreeData::exportPartToTableImpl(
6276-
const MergeTreePartExportManifest & manifest,
6277-
ContextPtr local_context)
6278-
{
6279-
auto metadata_snapshot = getInMemoryMetadataPtr();
6280-
Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
6281-
StorageSnapshotPtr storage_snapshot = getStorageSnapshot(metadata_snapshot, local_context);
6282-
6283-
MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export;
6284-
6285-
Block block_with_partition_values;
6286-
if (metadata_snapshot->hasPartitionKey())
6287-
{
6288-
/// todo arthur do I need to init minmax_idx?
6289-
block_with_partition_values = manifest.data_part->minmax_idx->getBlock(*this);
6290-
}
6291-
6292-
auto destination_storage = DatabaseCatalog::instance().tryGetTable(manifest.destination_storage_id, getContext());
6293-
if (!destination_storage)
6294-
{
6295-
std::lock_guard inner_lock(export_manifests_mutex);
6296-
6297-
const auto destination_storage_id_name = manifest.destination_storage_id.getNameForLogs();
6298-
export_manifests.erase(manifest);
6299-
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Failed to reconstruct destination storage: {}", destination_storage_id_name);
6300-
}
6301-
6302-
SinkToStoragePtr sink;
6303-
std::string destination_file_path;
6304-
6305-
try
6306-
{
6307-
auto context_copy = Context::createCopy(local_context);
6308-
context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting);
6309-
6310-
sink = destination_storage->import(
6311-
manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(),
6312-
block_with_partition_values,
6313-
destination_file_path,
6314-
manifest.overwrite_file_if_exists,
6315-
context_copy);
6316-
}
6317-
catch (const Exception & e)
6318-
{
6319-
if (e.code() == ErrorCodes::FILE_ALREADY_EXISTS)
6320-
{
6321-
ProfileEvents::increment(ProfileEvents::PartsExportDuplicated);
6322-
}
6323-
6324-
ProfileEvents::increment(ProfileEvents::PartsExportFailures);
6325-
6326-
std::lock_guard inner_lock(export_manifests_mutex);
6327-
export_manifests.erase(manifest);
6328-
6329-
if (manifest.completion_callback)
6330-
manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createFailure(e.message()));
6331-
return;
6332-
}
6333-
6334-
bool apply_deleted_mask = true;
6335-
bool read_with_direct_io = local_context->getSettingsRef()[Setting::min_bytes_to_use_direct_io] > manifest.data_part->getBytesOnDisk();
6336-
bool prefetch = false;
6337-
6338-
MergeTreeData::IMutationsSnapshot::Params params
6339-
{
6340-
.metadata_version = metadata_snapshot->getMetadataVersion(),
6341-
.min_part_metadata_version = manifest.data_part->getMetadataVersion(),
6342-
};
6343-
6344-
auto mutations_snapshot = getMutationsSnapshot(params);
6345-
6346-
auto alter_conversions = MergeTreeData::getAlterConversionsForPart(
6347-
manifest.data_part,
6348-
mutations_snapshot,
6349-
local_context);
6350-
6351-
QueryPlan plan_for_part;
6352-
6353-
createReadFromPartStep(
6354-
read_type,
6355-
plan_for_part,
6356-
*this,
6357-
storage_snapshot,
6358-
RangesInDataPart(manifest.data_part),
6359-
alter_conversions,
6360-
nullptr,
6361-
columns_to_read,
6362-
nullptr,
6363-
apply_deleted_mask,
6364-
std::nullopt,
6365-
read_with_direct_io,
6366-
prefetch,
6367-
local_context,
6368-
getLogger("ExportPartition"));
6369-
6370-
auto exports_list_entry = getContext()->getExportsList().insert(
6371-
getStorageID(),
6372-
manifest.destination_storage_id,
6373-
manifest.data_part->getBytesOnDisk(),
6374-
manifest.data_part->name,
6375-
destination_file_path,
6376-
manifest.data_part->rows_count,
6377-
manifest.data_part->getBytesOnDisk(),
6378-
manifest.data_part->getBytesUncompressedOnDisk(),
6379-
manifest.create_time,
6380-
local_context);
6381-
6382-
ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, "");
6383-
6384-
QueryPlanOptimizationSettings optimization_settings(local_context);
6385-
auto pipeline_settings = BuildQueryPipelineSettings(local_context);
6386-
auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings);
6387-
6388-
builder->setProgressCallback([&exports_list_entry](const Progress & progress)
6389-
{
6390-
(*exports_list_entry)->bytes_read_uncompressed += progress.read_bytes;
6391-
(*exports_list_entry)->rows_read += progress.read_rows;
6392-
(*exports_list_entry)->elapsed = (*exports_list_entry)->watch.elapsedSeconds();
6393-
});
6394-
6395-
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
6396-
6397-
pipeline.complete(sink);
6398-
6399-
try
6400-
{
6401-
CompletedPipelineExecutor exec(pipeline);
6402-
exec.execute();
6403-
6404-
std::lock_guard inner_lock(export_manifests_mutex);
6405-
writePartLog(
6406-
PartLogElement::Type::EXPORT_PART,
6407-
{},
6408-
static_cast<UInt64>((*exports_list_entry)->elapsed * 1000000000),
6409-
manifest.data_part->name,
6410-
manifest.data_part,
6411-
{manifest.data_part},
6412-
nullptr,
6413-
nullptr,
6414-
exports_list_entry.get());
6415-
6416-
export_manifests.erase(manifest);
6417-
6418-
ProfileEvents::increment(ProfileEvents::PartsExports);
6419-
ProfileEvents::increment(ProfileEvents::PartsExportTotalMilliseconds, static_cast<UInt64>((*exports_list_entry)->elapsed * 1000));
6420-
6421-
if (manifest.completion_callback)
6422-
manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createSuccess(destination_file_path));
6423-
}
6424-
catch (const Exception & e)
6425-
{
6426-
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while exporting the part {}. User should retry.", manifest.data_part->name));
6427-
6428-
ProfileEvents::increment(ProfileEvents::PartsExportFailures);
6429-
6430-
std::lock_guard inner_lock(export_manifests_mutex);
6431-
writePartLog(
6432-
PartLogElement::Type::EXPORT_PART,
6433-
ExecutionStatus::fromCurrentException("", true),
6434-
static_cast<UInt64>((*exports_list_entry)->elapsed * 1000000000),
6435-
manifest.data_part->name,
6436-
manifest.data_part,
6437-
{manifest.data_part},
6438-
nullptr,
6439-
nullptr,
6440-
exports_list_entry.get());
6441-
6442-
export_manifests.erase(manifest);
6443-
6444-
if (manifest.completion_callback)
6445-
manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createFailure(e.message()));
6446-
6447-
throw;
6448-
}
6449-
}
6450-
64516275
void MergeTreeData::killExportPart(const String & query_id)
64526276
{
64536277
std::lock_guard lock(export_manifests_mutex);

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -998,10 +998,6 @@ class MergeTreeData : public IStorage, public WithMutableContext
998998
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "EXPORT PARTITION is not implemented");
999999
}
10001000

1001-
void exportPartToTableImpl(
1002-
const MergeTreePartExportManifest & manifest,
1003-
ContextPtr local_context);
1004-
10051001
/// Checks that Partition could be dropped right now
10061002
/// Otherwise - throws an exception with detailed information.
10071003
/// We do not use mutex because it is not very important that the size could change during the operation.

tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ echo "---- Export 2020_1_1_0 and 2021_2_2_0 to wildcard table"
4949
query "ALTER TABLE $mt_table EXPORT PART '2020_1_1_0' TO TABLE $s3_table_wildcard SETTINGS allow_experimental_export_merge_tree_part = 1"
5050
query "ALTER TABLE $mt_table EXPORT PART '2021_2_2_0' TO TABLE $s3_table_wildcard SETTINGS allow_experimental_export_merge_tree_part = 1"
5151

52+
sleep 3
53+
5254
echo "---- Both data parts should appear"
5355
query "SELECT * FROM s3(s3_conn, filename='$s3_table_wildcard/**.parquet') ORDER BY id"
5456

@@ -60,13 +62,14 @@ query "SELECT * FROM s3(s3_conn, filename='$s3_table_wildcard/**.parquet') ORDER
6062
query "CREATE TABLE $mt_table_partition_expression_with_function (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY toString(year) ORDER BY tuple()"
6163
query "CREATE TABLE $s3_table_wildcard_partition_expression_with_function (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$s3_table_wildcard_partition_expression_with_function/{_partition_id}/{_file}.parquet', format=Parquet, partition_strategy='wildcard') PARTITION BY toString(year)"
6264

63-
# insert
6465
query "INSERT INTO $mt_table_partition_expression_with_function VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)"
6566

6667
echo "---- Export 2020_1_1_0 and 2021_2_2_0 to wildcard table with partition expression with function"
6768
query "ALTER TABLE $mt_table_partition_expression_with_function EXPORT PART 'cb217c742dc7d143b61583011996a160_1_1_0' TO TABLE $s3_table_wildcard_partition_expression_with_function SETTINGS allow_experimental_export_merge_tree_part = 1"
6869
query "ALTER TABLE $mt_table_partition_expression_with_function EXPORT PART '3be6d49ecf9749a383964bc6fab22d10_2_2_0' TO TABLE $s3_table_wildcard_partition_expression_with_function SETTINGS allow_experimental_export_merge_tree_part = 1"
6970

71+
sleep 1
72+
7073
echo "---- Both data parts should appear"
7174
query "SELECT * FROM s3(s3_conn, filename='$s3_table_wildcard_partition_expression_with_function/**.parquet') ORDER BY id"
7275

0 commit comments

Comments
 (0)