Skip to content

Commit 55cc3f9

Browse files
Enmkarthurpassos
authored andcommitted
Merge pull request #1017 from Altinity/simple_export_part
improve observability a bit, simplify sink
1 parent 5cbcae4 commit 55cc3f9

23 files changed

+387
-287
lines changed

src/Common/CurrentMetrics.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
M(Merge, "Number of executing background merges") \
1111
M(MergeParts, "Number of source parts participating in current background merges") \
1212
M(Move, "Number of currently executing moves") \
13+
M(Export, "Number of currently executing exports") \
1314
M(PartMutation, "Number of mutations (ALTER DELETE/UPDATE)") \
1415
M(ReplicatedFetch, "Number of data parts being fetched from replica") \
1516
M(ReplicatedSend, "Number of data parts being sent to replicas") \

src/Common/ProfileEvents.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@
2828
M(AsyncInsertBytes, "Data size in bytes of asynchronous INSERT queries.", ValueType::Bytes) \
2929
M(AsyncInsertRows, "Number of rows inserted by asynchronous INSERT queries.", ValueType::Number) \
3030
M(AsyncInsertCacheHits, "Number of times a duplicate hash id has been found in asynchronous INSERT hash id cache.", ValueType::Number) \
31+
M(PartsExports, "Number of successful part exports.", ValueType::Number) \
32+
M(PartsExportFailures, "Number of failed part exports.", ValueType::Number) \
33+
M(PartsExportDuplicated, "Number of part exports that failed because target already exists.", ValueType::Number) \
34+
M(PartsExportTotalMilliseconds, "Total time spent on part export operations.", ValueType::Milliseconds) \
3135
M(FailedQuery, "Number of failed queries.", ValueType::Number) \
3236
M(FailedSelectQuery, "Same as FailedQuery, but only for SELECT queries.", ValueType::Number) \
3337
M(FailedInsertQuery, "Same as FailedQuery, but only for INSERT queries.", ValueType::Number) \

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6854,6 +6854,9 @@ Possible values:
68546854
)", 0) \
68556855
DECLARE(Bool, use_roaring_bitmap_iceberg_positional_deletes, false, R"(
68566856
Use roaring bitmap for iceberg positional deletes.
6857+
)", 0) \
6858+
DECLARE(Bool, export_merge_tree_part_overwrite_file_if_exists, false, R"(
6859+
Overwrite file if it already exists when exporting a merge tree part
68576860
)", 0) \
68586861
\
68596862
/* ####################################################### */ \

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
141141
{"object_storage_max_nodes", 0, 0, "New setting"},
142142
{"object_storage_remote_initiator", false, false, "New setting."},
143143
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
144+
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
144145
});
145146
addSettingsChanges(settings_changes_history, "25.6",
146147
{

src/Interpreters/Context.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include <Storages/MarkCache.h>
3636
#include <Storages/MergeTree/MergeList.h>
3737
#include <Storages/MergeTree/MovesList.h>
38+
#include <Storages/MergeTree/ExportList.h>
3839
#include <Storages/MergeTree/ReplicatedFetchList.h>
3940
#include <Storages/MergeTree/MergeTreeData.h>
4041
#include <Storages/MergeTree/MergeTreeSettings.h>
@@ -504,6 +505,7 @@ struct ContextSharedPart : boost::noncopyable
504505
GlobalOvercommitTracker global_overcommit_tracker;
505506
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
506507
MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree)
508+
ExportsList exports_list; /// The list of executing exports (for (Replicated)?MergeTree)
507509
ReplicatedFetchList replicated_fetch_list;
508510
RefreshSet refresh_set; /// The list of active refreshes (for MaterializedView)
509511
ConfigurationPtr users_config TSA_GUARDED_BY(mutex); /// Config with the users, profiles and quotas sections.
@@ -1214,6 +1216,8 @@ MergeList & Context::getMergeList() { return shared->merge_list; }
12141216
const MergeList & Context::getMergeList() const { return shared->merge_list; }
12151217
MovesList & Context::getMovesList() { return shared->moves_list; }
12161218
const MovesList & Context::getMovesList() const { return shared->moves_list; }
1219+
ExportsList & Context::getExportsList() { return shared->exports_list; }
1220+
const ExportsList & Context::getExportsList() const { return shared->exports_list; }
12171221
ReplicatedFetchList & Context::getReplicatedFetchList() { return shared->replicated_fetch_list; }
12181222
const ReplicatedFetchList & Context::getReplicatedFetchList() const { return shared->replicated_fetch_list; }
12191223
RefreshSet & Context::getRefreshSet() { return shared->refresh_set; }

src/Interpreters/Context.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ class AsynchronousMetrics;
8989
class BackgroundSchedulePool;
9090
class MergeList;
9191
class MovesList;
92+
class ExportsList;
9293
class ReplicatedFetchList;
9394
class RefreshSet;
9495
class Cluster;
@@ -1165,6 +1166,9 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
11651166
MovesList & getMovesList();
11661167
const MovesList & getMovesList() const;
11671168

1169+
ExportsList & getExportsList();
1170+
const ExportsList & getExportsList() const;
1171+
11681172
ReplicatedFetchList & getReplicatedFetchList();
11691173
const ReplicatedFetchList & getReplicatedFetchList() const;
11701174

src/Storages/IStorage.h

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -455,24 +455,20 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
455455
return false;
456456
}
457457

458-
struct ImportStats
459-
{
460-
ExecutionStatus status;
461-
std::size_t elapsed_ns = 0;
462-
std::size_t bytes_on_disk = 0;
463-
std::size_t read_rows = 0;
464-
std::size_t read_bytes = 0;
465-
std::string file_path = "";
466-
};
467-
458+
/*
459+
It is currently only implemented in StorageObjectStorage.
460+
It is meant to be used to import merge tree data parts into object storage. It is similar to the write API,
461+
but it won't re-partition the data and should allow the filename to be set by the caller.
462+
*/
468463
virtual SinkToStoragePtr import(
469464
const std::string & /* file_name */,
470465
Block & /* block_with_partition_values */,
471-
ContextPtr /* context */,
472-
std::function<void(ImportStats)> /* stats_log */)
473-
{
474-
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName());
475-
}
466+
std::string & /* destination_file_path */,
467+
bool /* overwrite_if_exists */,
468+
ContextPtr /* context */)
469+
{
470+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName());
471+
}
476472

477473

478474
/** Writes the data to a table in distributed manner.
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#include <Storages/MergeTree/ExportList.h>
2+
3+
namespace DB
4+
{
5+
6+
ExportsListElement::ExportsListElement(
7+
const StorageID & source_table_id_,
8+
const StorageID & destination_table_id_,
9+
UInt64 part_size_,
10+
const String & part_name_,
11+
const String & target_file_name_,
12+
UInt64 total_rows_to_read_,
13+
UInt64 total_size_bytes_compressed_,
14+
UInt64 total_size_bytes_uncompressed_,
15+
time_t create_time_,
16+
const ContextPtr & context)
17+
: source_table_id(source_table_id_)
18+
, destination_table_id(destination_table_id_)
19+
, part_size(part_size_)
20+
, part_name(part_name_)
21+
, destination_file_path(target_file_name_)
22+
, total_rows_to_read(total_rows_to_read_)
23+
, total_size_bytes_compressed(total_size_bytes_compressed_)
24+
, total_size_bytes_uncompressed(total_size_bytes_uncompressed_)
25+
, create_time(create_time_)
26+
{
27+
thread_group = ThreadGroup::createForBackgroundProcess(context);
28+
}
29+
30+
ExportsListElement::~ExportsListElement()
31+
{
32+
background_memory_tracker.adjustOnBackgroundTaskEnd(&thread_group->memory_tracker);
33+
}
34+
35+
ExportInfo ExportsListElement::getInfo() const
36+
{
37+
ExportInfo res;
38+
res.source_database = source_table_id.database_name;
39+
res.source_table = source_table_id.table_name;
40+
res.destination_database = destination_table_id.database_name;
41+
res.destination_table = destination_table_id.table_name;
42+
res.part_name = part_name;
43+
res.destination_file_path = destination_file_path;
44+
res.rows_read = rows_read;
45+
res.total_rows_to_read = total_rows_to_read;
46+
res.total_size_bytes_compressed = total_size_bytes_compressed;
47+
res.total_size_bytes_uncompressed = total_size_bytes_uncompressed;
48+
res.bytes_read_uncompressed = bytes_read_uncompressed;
49+
res.memory_usage = getMemoryUsage();
50+
res.peak_memory_usage = getPeakMemoryUsage();
51+
res.create_time = create_time;
52+
res.elapsed = elapsed;
53+
return res;
54+
}
55+
56+
UInt64 ExportsListElement::getMemoryUsage() const
57+
{
58+
return thread_group->memory_tracker.get();
59+
}
60+
61+
UInt64 ExportsListElement::getPeakMemoryUsage() const
62+
{
63+
return thread_group->memory_tracker.getPeak();
64+
}
65+
66+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
#pragma once
2+
3+
#include <Storages/MergeTree/BackgroundProcessList.h>
4+
#include <Interpreters/StorageID.h>
5+
#include <Common/Stopwatch.h>
6+
#include <Common/CurrentMetrics.h>
7+
#include <Common/ThreadStatus.h>
8+
#include <Poco/URI.h>
9+
#include <boost/noncopyable.hpp>
10+
11+
namespace CurrentMetrics
12+
{
13+
extern const Metric Export;
14+
}
15+
16+
namespace DB
17+
{
18+
19+
struct ExportInfo
20+
{
21+
String source_database;
22+
String source_table;
23+
String destination_database;
24+
String destination_table;
25+
String part_name;
26+
String destination_file_path;
27+
UInt64 rows_read;
28+
UInt64 total_rows_to_read;
29+
UInt64 total_size_bytes_compressed;
30+
UInt64 total_size_bytes_uncompressed;
31+
UInt64 bytes_read_uncompressed;
32+
UInt64 memory_usage;
33+
UInt64 peak_memory_usage;
34+
time_t create_time = 0;
35+
Float64 elapsed;
36+
};
37+
38+
struct ExportsListElement : private boost::noncopyable
39+
{
40+
const StorageID source_table_id;
41+
const StorageID destination_table_id;
42+
const UInt64 part_size;
43+
const String part_name;
44+
const String destination_file_path;
45+
UInt64 rows_read {0};
46+
UInt64 total_rows_to_read {0};
47+
UInt64 total_size_bytes_compressed {0};
48+
UInt64 total_size_bytes_uncompressed {0};
49+
UInt64 bytes_read_uncompressed {0};
50+
time_t create_time {0};
51+
Float64 elapsed {0};
52+
53+
Stopwatch watch;
54+
ThreadGroupPtr thread_group;
55+
56+
ExportsListElement(
57+
const StorageID & source_table_id_,
58+
const StorageID & destination_table_id_,
59+
UInt64 part_size_,
60+
const String & part_name_,
61+
const String & destination_file_path_,
62+
UInt64 total_rows_to_read_,
63+
UInt64 total_size_bytes_compressed_,
64+
UInt64 total_size_bytes_uncompressed_,
65+
time_t create_time_,
66+
const ContextPtr & context);
67+
68+
~ExportsListElement();
69+
70+
ExportInfo getInfo() const;
71+
72+
UInt64 getMemoryUsage() const;
73+
UInt64 getPeakMemoryUsage() const;
74+
};
75+
76+
77+
class ExportsList final : public BackgroundProcessList<ExportsListElement, ExportInfo>
78+
{
79+
private:
80+
using Parent = BackgroundProcessList<ExportsListElement, ExportInfo>;
81+
82+
public:
83+
ExportsList()
84+
: Parent(CurrentMetrics::Export)
85+
{}
86+
};
87+
88+
using ExportsListEntry = BackgroundProcessListEntry<ExportsListElement, ExportInfo>;
89+
90+
}

0 commit comments

Comments
 (0)