Skip to content

Commit 2ac019c

Browse files
committed
add profile events and persist some settings in the manifest
1 parent 8d3f65b commit 2ac019c

File tree

12 files changed

+108
-34
lines changed

12 files changed

+108
-34
lines changed

src/Common/ProfileEvents.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@
2828
M(AsyncInsertBytes, "Data size in bytes of asynchronous INSERT queries.", ValueType::Bytes) \
2929
M(AsyncInsertRows, "Number of rows inserted by asynchronous INSERT queries.", ValueType::Number) \
3030
M(AsyncInsertCacheHits, "Number of times a duplicate hash id has been found in asynchronous INSERT hash id cache.", ValueType::Number) \
31+
M(PartsExports, "Number of successful part exports.", ValueType::Number) \
32+
M(PartsExportFailures, "Number of failed part exports.", ValueType::Number) \
33+
M(PartsExportDuplicated, "Number of part exports that failed because target already exists.", ValueType::Number) \
34+
M(PartsExportTotalMilliseconds, "Total time spent on part export operations.", ValueType::Milliseconds) \
3135
M(FailedQuery, "Number of failed queries.", ValueType::Number) \
3236
M(FailedSelectQuery, "Same as FailedQuery, but only for SELECT queries.", ValueType::Number) \
3337
M(FailedInsertQuery, "Same as FailedQuery, but only for INSERT queries.", ValueType::Number) \

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6687,6 +6687,9 @@ Allows to change the behaviour of the result type of `dateTrunc` function.
66876687
Possible values:
66886688
- 0 - When the second argument is `DateTime64/Date32` the return type will be `DateTime64/Date32` regardless of the time unit in the first argument.
66896689
- 1 - For `Date32` the result is always `Date`. For `DateTime64` the result is `DateTime` for time units `second` and higher.
6690+
)", 0) \
6691+
DECLARE(Bool, export_merge_tree_part_overwrite_file_if_exists, false, R"(
6692+
Overwrite file if it already exists when exporting a merge tree part
66906693
)", 0) \
66916694
\
66926695
/* ####################################################### */ \

src/Storages/IStorage.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -452,10 +452,12 @@ It is currently only implemented in StorageObjectStorage.
452452
virtual SinkToStoragePtr import(
453453
const std::string & /* file_name */,
454454
Block & /* block_with_partition_values */,
455+
std::string & /* destination_file_path */,
456+
bool /* overwrite_if_exists */,
455457
ContextPtr /* context */)
456-
{
457-
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName());
458-
}
458+
{
459+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName());
460+
}
459461

460462

461463
/** Writes the data to a table in distributed manner.

src/Storages/MergeTree/ExportList.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ ExportsListElement::ExportsListElement(
88
const StorageID & destination_table_id_,
99
UInt64 part_size_,
1010
const String & part_name_,
11+
const String & target_file_name_,
1112
UInt64 total_rows_to_read_,
1213
UInt64 total_size_bytes_compressed_,
1314
UInt64 total_size_bytes_uncompressed_,
@@ -17,6 +18,7 @@ ExportsListElement::ExportsListElement(
1718
, destination_table_id(destination_table_id_)
1819
, part_size(part_size_)
1920
, part_name(part_name_)
21+
, destination_file_path(target_file_name_)
2022
, total_rows_to_read(total_rows_to_read_)
2123
, total_size_bytes_compressed(total_size_bytes_compressed_)
2224
, total_size_bytes_uncompressed(total_size_bytes_uncompressed_)
@@ -33,6 +35,7 @@ ExportInfo ExportsListElement::getInfo() const
3335
res.destination_database = destination_table_id.database_name;
3436
res.destination_table = destination_table_id.table_name;
3537
res.part_name = part_name;
38+
res.destination_file_path = destination_file_path;
3639
res.rows_read = rows_read;
3740
res.total_rows_to_read = total_rows_to_read;
3841
res.total_size_bytes_compressed = total_size_bytes_compressed;

src/Storages/MergeTree/ExportList.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ struct ExportInfo
2323
String destination_database;
2424
String destination_table;
2525
String part_name;
26+
String destination_file_path;
2627
UInt64 rows_read;
2728
UInt64 total_rows_to_read;
2829
UInt64 total_size_bytes_compressed;
@@ -40,6 +41,7 @@ struct ExportsListElement : private boost::noncopyable
4041
const StorageID destination_table_id;
4142
const UInt64 part_size;
4243
const String part_name;
44+
const String destination_file_path;
4345
UInt64 rows_read {0};
4446
UInt64 total_rows_to_read {0};
4547
UInt64 total_size_bytes_compressed {0};
@@ -56,6 +58,7 @@ struct ExportsListElement : private boost::noncopyable
5658
const StorageID & destination_table_id_,
5759
UInt64 part_size_,
5860
const String & part_name_,
61+
const String & destination_file_path_,
5962
UInt64 total_rows_to_read_,
6063
UInt64 total_size_bytes_compressed_,
6164
UInt64 total_size_bytes_uncompressed_,

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
#include <thread>
114114
#include <unordered_set>
115115
#include <filesystem>
116+
#include <utility>
116117

117118
#include <fmt/format.h>
118119
#include <Poco/Logger.h>
@@ -154,6 +155,10 @@ namespace ProfileEvents
154155
extern const Event LoadedDataPartsMicroseconds;
155156
extern const Event RestorePartsSkippedFiles;
156157
extern const Event RestorePartsSkippedBytes;
158+
extern const Event PartsExports;
159+
extern const Event PartsExportTotalMilliseconds;
160+
extern const Event PartsExportFailures;
161+
extern const Event PartsExportDuplicated;
157162
}
158163

159164
namespace CurrentMetrics
@@ -198,6 +203,8 @@ namespace Setting
198203
extern const SettingsUInt64 merge_tree_storage_snapshot_sleep_ms;
199204
extern const SettingsBool allow_experimental_export_merge_tree_part;
200205
extern const SettingsUInt64 min_bytes_to_use_direct_io;
206+
extern const SettingsBool export_merge_tree_part_overwrite_file_if_exists;
207+
extern const SettingsBool output_format_parallel_formatting;
201208
}
202209

203210
namespace MergeTreeSetting
@@ -310,6 +317,7 @@ namespace ErrorCodes
310317
extern const int CANNOT_FORGET_PARTITION;
311318
extern const int DATA_TYPE_CANNOT_BE_USED_IN_KEY;
312319
extern const int UNKNOWN_TABLE;
320+
extern const int FILE_ALREADY_EXISTS;
313321
}
314322

315323
static void checkSuspiciousIndices(const ASTFunction * index_function)
@@ -5932,9 +5940,15 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP
59325940
part_name, getStorageID().getFullTableName());
59335941

59345942
{
5943+
MergeTreeExportManifest manifest(
5944+
dest_storage->getStorageID(),
5945+
part,
5946+
query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists],
5947+
query_context->getSettingsRef()[Setting::output_format_parallel_formatting]);
5948+
59355949
std::lock_guard lock(export_manifests_mutex);
59365950

5937-
if (!export_manifests.emplace(dest_storage->getStorageID(), part).second)
5951+
if (!export_manifests.emplace(std::move(manifest)).second)
59385952
{
59395953
throw Exception(ErrorCodes::ABORTED, "Data part '{}' is already being exported to table '{}'",
59405954
part_name, dest_storage->getStorageID().getFullTableName());
@@ -5948,19 +5962,6 @@ void MergeTreeData::exportPartToTableImpl(
59485962
const MergeTreeExportManifest & manifest,
59495963
ContextPtr local_context)
59505964
{
5951-
auto exports_list_entry = getContext()->getExportsList().insert(
5952-
getStorageID(),
5953-
manifest.destination_storage_id,
5954-
manifest.data_part->getBytesOnDisk(),
5955-
manifest.data_part->name,
5956-
manifest.data_part->rows_count,
5957-
manifest.data_part->getBytesOnDisk(),
5958-
manifest.data_part->getBytesUncompressedOnDisk(),
5959-
manifest.create_time,
5960-
local_context);
5961-
5962-
ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, "");
5963-
59645965
auto metadata_snapshot = getInMemoryMetadataPtr();
59655966
Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
59665967
StorageSnapshotPtr storage_snapshot = getStorageSnapshot(metadata_snapshot, local_context);
@@ -5987,16 +5988,29 @@ void MergeTreeData::exportPartToTableImpl(
59875988
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Failed to reconstruct destination storage: {}", destination_storage_id_name);
59885989
}
59895990

5990-
auto sink = destination_storage->import(
5991-
manifest.data_part->name,
5992-
block_with_partition_values,
5993-
local_context);
5991+
SinkToStoragePtr sink;
5992+
std::string destination_file_path;
59945993

5995-
/// Most likely the file has already been imported, so we can just return
5996-
if (!sink)
5994+
try
59975995
{
5998-
std::lock_guard inner_lock(export_manifests_mutex);
5996+
auto context_copy = Context::createCopy(local_context);
5997+
context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting);
59995998

5999+
sink = destination_storage->import(
6000+
manifest.data_part->name,
6001+
block_with_partition_values,
6002+
destination_file_path,
6003+
manifest.overwrite_file_if_exists,
6004+
context_copy);
6005+
}
6006+
catch (const Exception & e)
6007+
{
6008+
if (e.code() == ErrorCodes::FILE_ALREADY_EXISTS)
6009+
{
6010+
ProfileEvents::increment(ProfileEvents::PartsExportDuplicated);
6011+
}
6012+
6013+
std::lock_guard inner_lock(export_manifests_mutex);
60006014
export_manifests.erase(manifest);
60016015
return;
60026016
}
@@ -6037,6 +6051,20 @@ void MergeTreeData::exportPartToTableImpl(
60376051
local_context,
60386052
getLogger("ExportPartition"));
60396053

6054+
auto exports_list_entry = getContext()->getExportsList().insert(
6055+
getStorageID(),
6056+
manifest.destination_storage_id,
6057+
manifest.data_part->getBytesOnDisk(),
6058+
manifest.data_part->name,
6059+
destination_file_path,
6060+
manifest.data_part->rows_count,
6061+
manifest.data_part->getBytesOnDisk(),
6062+
manifest.data_part->getBytesUncompressedOnDisk(),
6063+
manifest.create_time,
6064+
local_context);
6065+
6066+
ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, "");
6067+
60406068
QueryPlanOptimizationSettings optimization_settings(local_context);
60416069
auto pipeline_settings = BuildQueryPipelineSettings(local_context);
60426070
auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings);
@@ -6070,10 +6098,15 @@ void MergeTreeData::exportPartToTableImpl(
60706098
exports_list_entry.get());
60716099

60726100
export_manifests.erase(manifest);
6101+
6102+
ProfileEvents::increment(ProfileEvents::PartsExports);
6103+
ProfileEvents::increment(ProfileEvents::PartsExportTotalMilliseconds, static_cast<UInt64>((*exports_list_entry)->elapsed * 1000));
60736104
}
6074-
catch (const Exception &)
6105+
catch (...)
60756106
{
6076-
tryLogCurrentException(__PRETTY_FUNCTION__, "Exception is in export part task");
6107+
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while exporting the part {}. User should retry.", manifest.data_part->name));
6108+
6109+
ProfileEvents::increment(ProfileEvents::PartsExportFailures);
60776110

60786111
std::lock_guard inner_lock(export_manifests_mutex);
60796112
writePartLog(
@@ -8751,6 +8784,7 @@ try
87518784
part_log_elem.rows_read = (*exports_entry)->rows_read;
87528785
part_log_elem.bytes_read_uncompressed = (*exports_entry)->bytes_read_uncompressed;
87538786
part_log_elem.peak_memory_usage = (*exports_entry)->getPeakMemoryUsage();
8787+
part_log_elem.path_on_disk = (*exports_entry)->destination_file_path;
87548788
}
87558789

87568790
if (profile_counters)

src/Storages/MergeTree/MergeTreeExportManifest.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,24 @@ struct MergeTreeExportManifest
88
{
99
using DataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
1010

11+
12+
MergeTreeExportManifest(
13+
const StorageID & destination_storage_id_,
14+
const DataPartPtr & data_part_,
15+
bool overwrite_file_if_exists_,
16+
bool parallel_formatting_)
17+
: destination_storage_id(destination_storage_id_),
18+
data_part(data_part_),
19+
overwrite_file_if_exists(overwrite_file_if_exists_),
20+
parallel_formatting(parallel_formatting_),
21+
create_time(time(nullptr)) {}
22+
1123
StorageID destination_storage_id;
1224
DataPartPtr data_part;
13-
time_t create_time = time(nullptr);
25+
bool overwrite_file_if_exists;
26+
bool parallel_formatting;
27+
28+
time_t create_time;
1429
mutable bool in_progress = false;
1530

1631
bool operator<(const MergeTreeExportManifest & rhs) const

src/Storages/ObjectStorage/StorageObjectStorage.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ namespace ErrorCodes
4949
extern const int NOT_IMPLEMENTED;
5050
extern const int LOGICAL_ERROR;
5151
extern const int INCORRECT_DATA;
52+
extern const int FILE_ALREADY_EXISTS;
5253
}
5354

5455
String StorageObjectStorage::getPathSample(ContextPtr context)
@@ -453,6 +454,8 @@ bool StorageObjectStorage::supportsImport() const
453454
SinkToStoragePtr StorageObjectStorage::import(
454455
const std::string & file_name,
455456
Block & block_with_partition_values,
457+
std::string & destination_file_path,
458+
bool overwrite_if_exists,
456459
ContextPtr local_context)
457460
{
458461
std::string partition_key;
@@ -467,16 +470,15 @@ SinkToStoragePtr StorageObjectStorage::import(
467470
}
468471
}
469472

470-
const auto file_path = configuration->getPathForWrite(partition_key, file_name).path;
473+
destination_file_path = configuration->getPathForWrite(partition_key, file_name).path;
471474

472-
if (object_storage->exists(StoredObject(file_path)))
475+
if (!overwrite_if_exists && object_storage->exists(StoredObject(destination_file_path)))
473476
{
474-
LOG_INFO(getLogger("StorageObjectStorage"), "File {} already exists, skipping import", file_path);
475-
return nullptr;
477+
throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "File {} already exists", destination_file_path);
476478
}
477479

478480
return std::make_shared<StorageObjectStorageSink>(
479-
file_path,
481+
destination_file_path,
480482
object_storage,
481483
configuration,
482484
format_settings,

src/Storages/ObjectStorage/StorageObjectStorage.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ class StorageObjectStorage : public IStorage
106106
SinkToStoragePtr import(
107107
const std::string & /* file_name */,
108108
Block & /* block_with_partition_values */,
109+
std::string & /* destination_file_path */,
110+
bool /* overwrite_if_exists */,
109111
ContextPtr /* context */) override;
110112

111113
void truncate(

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -612,12 +612,14 @@ bool StorageObjectStorageCluster::supportsImport() const
612612
SinkToStoragePtr StorageObjectStorageCluster::import(
613613
const std::string & file_name,
614614
Block & block_with_partition_values,
615+
std::string & destination_file_path,
616+
bool overwrite_if_exists,
615617
ContextPtr context)
616618
{
617619
if (pure_storage)
618-
return pure_storage->import(file_name, block_with_partition_values, context);
620+
return pure_storage->import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, context);
619621

620-
return IStorageCluster::import(file_name, block_with_partition_values, context);
622+
return IStorageCluster::import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, context);
621623
}
622624

623625

0 commit comments

Comments
 (0)