Skip to content

Commit 64accc3

Browse files
committed
add setting to control locking behavior (inside/outside task), deny export request in case pool is full
1 parent ef5807b commit 64accc3

File tree

10 files changed

+204
-135
lines changed

10 files changed

+204
-135
lines changed

src/Core/Settings.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6899,6 +6899,10 @@ Possible values:
68996899
- `` (empty value) - use session timezone
69006900
69016901
Default value is `UTC`.
6902+
)", 0) \
6903+
DECLARE(Bool, export_merge_tree_partition_lock_inside_the_task, false, R"(
6904+
Only lock a part when the task is already running. This might help with busy waiting where the scheduler locks a part, but the task ends in the pending list.
6905+
On the other hand, there is a chance once the task executes that part has already been locked by another replica and the task will simply early exit.
69026906
)", 0) \
69036907
\
69046908
/* ####################################################### */ \

src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,33 +13,31 @@ namespace DB
1313
ExportPartFromPartitionExportTask::ExportPartFromPartitionExportTask(
1414
StorageReplicatedMergeTree & storage_,
1515
const std::string & key_,
16-
const MergeTreePartExportManifest & manifest_,
17-
ContextPtr context_)
16+
const MergeTreePartExportManifest & manifest_)
1817
: storage(storage_),
1918
key(key_),
20-
manifest(manifest_),
21-
local_context(context_)
19+
manifest(manifest_)
2220
{
23-
export_part_task = std::make_shared<ExportPartTask>(storage, manifest, local_context);
21+
export_part_task = std::make_shared<ExportPartTask>(storage, manifest);
2422
}
2523

2624
bool ExportPartFromPartitionExportTask::executeStep()
2725
{
2826
const auto zk = storage.getZooKeeper();
2927
const auto part_name = manifest.data_part->name;
3028

31-
LOG_INFO(storage.log, "ExportPartition scheduler task: Attempting to lock part: {}", part_name);
29+
LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Attempting to lock part: {}", part_name);
3230

3331
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
3432
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate);
3533
if (Coordination::Error::ZOK == zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / part_name, storage.replica_name, zkutil::CreateMode::Ephemeral))
3634
{
37-
LOG_INFO(storage.log, "ExportPartition scheduler task: Locked part: {}", part_name);
35+
LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Locked part: {}", part_name);
3836
export_part_task->executeStep();
3937
return false;
4038
}
4139

42-
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", part_name);
40+
LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Failed to lock part {}, skipping", part_name);
4341
return false;
4442
}
4543

src/Storages/MergeTree/ExportPartFromPartitionExportTask.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ class ExportPartFromPartitionExportTask : public IExecutableTask
1414
explicit ExportPartFromPartitionExportTask(
1515
StorageReplicatedMergeTree & storage_,
1616
const std::string & key_,
17-
const MergeTreePartExportManifest & manifest_,
18-
ContextPtr context_);
17+
const MergeTreePartExportManifest & manifest_);
1918
bool executeStep() override;
2019
void onCompleted() override;
2120
StorageID getStorageID() const override;
@@ -28,7 +27,6 @@ class ExportPartFromPartitionExportTask : public IExecutableTask
2827
StorageReplicatedMergeTree & storage;
2928
std::string key;
3029
MergeTreePartExportManifest manifest;
31-
ContextPtr local_context;
3230
std::shared_ptr<ExportPartTask> export_part_task;
3331
};
3432

src/Storages/MergeTree/ExportPartTask.cpp

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <Common/Exception.h>
1414
#include <Common/ProfileEventsScope.h>
1515
#include <Storages/MergeTree/ExportList.h>
16+
#include <Formats/FormatFactory.h>
1617

1718
namespace ProfileEvents
1819
{
@@ -38,10 +39,9 @@ namespace Setting
3839
extern const SettingsUInt64 min_bytes_to_use_direct_io;
3940
}
4041

41-
ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_, ContextPtr context_)
42+
ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_)
4243
: storage(storage_),
43-
manifest(manifest_),
44-
local_context(context_)
44+
manifest(manifest_)
4545
{
4646
}
4747

@@ -52,6 +52,12 @@ const MergeTreePartExportManifest & ExportPartTask::getManifest() const
5252

5353
bool ExportPartTask::executeStep()
5454
{
55+
auto local_context = Context::createCopy(storage.getContext());
56+
local_context->makeQueryContextForExportPart();
57+
local_context->setCurrentQueryId(manifest.transaction_id);
58+
local_context->setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType::EXPORT_PART);
59+
local_context->setSettings(manifest.settings);
60+
5561
const auto & metadata_snapshot = manifest.metadata_snapshot;
5662

5763
Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
@@ -96,7 +102,7 @@ bool ExportPartTask::executeStep()
96102
block_with_partition_values,
97103
(*exports_list_entry)->destination_file_path,
98104
manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::overwrite,
99-
manifest.format_settings,
105+
getFormatSettings(local_context),
100106
local_context);
101107
}
102108
catch (const Exception & e)
@@ -131,10 +137,21 @@ bool ExportPartTask::executeStep()
131137
}
132138
}
133139

134-
tryLogCurrentException(__PRETTY_FUNCTION__);
140+
LOG_INFO(getLogger("ExportPartTask"), "Export part {} failed: {}", manifest.data_part->name, e.message());
135141

136142
ProfileEvents::increment(ProfileEvents::PartsExportFailures);
137143

144+
storage.writePartLog(
145+
PartLogElement::Type::EXPORT_PART,
146+
ExecutionStatus::fromCurrentException("", true),
147+
static_cast<UInt64>((*exports_list_entry)->elapsed * 1000000000),
148+
manifest.data_part->name,
149+
manifest.data_part,
150+
{manifest.data_part},
151+
nullptr,
152+
nullptr,
153+
exports_list_entry.get());
154+
138155
std::lock_guard inner_lock(storage.export_manifests_mutex);
139156
storage.export_manifests.erase(manifest);
140157

@@ -264,6 +281,7 @@ bool ExportPartTask::executeStep()
264281

265282
void ExportPartTask::cancel() noexcept
266283
{
284+
LOG_INFO(getLogger("ExportPartTask"), "Export part {} task cancel() method called", manifest.data_part->name);
267285
cancel_requested.store(true);
268286
pipeline.cancel();
269287
}

src/Storages/MergeTree/ExportPartTask.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@ class ExportPartTask : public IExecutableTask
1212
public:
1313
explicit ExportPartTask(
1414
MergeTreeData & storage_,
15-
const MergeTreePartExportManifest & manifest_,
16-
ContextPtr context_);
15+
const MergeTreePartExportManifest & manifest_);
1716
bool executeStep() override;
1817
void onCompleted() override;
1918
StorageID getStorageID() const override;
@@ -26,7 +25,6 @@ class ExportPartTask : public IExecutableTask
2625
private:
2726
MergeTreeData & storage;
2827
MergeTreePartExportManifest manifest;
29-
ContextPtr local_context;
3028
QueryPipeline pipeline;
3129
std::atomic<bool> cancel_requested = false;
3230

src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@ std::vector<ReplicatedPartitionExportInfo> ExportPartitionManifestUpdatingTask::
100100
{
101101
std::vector<ReplicatedPartitionExportInfo> infos;
102102
const auto zk = storage.getZooKeeper();
103+
104+
if (!zk->isFeatureEnabled(DB::KeeperFeatureFlag::MULTI_READ))
105+
{
106+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MULTI_READ feature flag is not enabled");
107+
}
108+
103109
const auto exports_path = fs::path(storage.zookeeper_path) / "exports";
104110

105111
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
@@ -657,6 +663,7 @@ void ExportPartitionManifestUpdatingTask::handleStatusChanges()
657663
{
658664
try
659665
{
666+
LOG_INFO(storage.log, "ExportPartition Manifest Updating task: killing export partition for task {}", key);
660667
storage.killExportPart(it->manifest.transaction_id);
661668
}
662669
catch (...)

0 commit comments

Comments
 (0)