Skip to content

Commit 0d183bf

Browse files
committed
improvements
1 parent 85e14f9 commit 0d183bf

File tree

5 files changed

+13
-14
lines changed

5 files changed

+13
-14
lines changed

src/Storages/ExportReplicatedMergeTreePartitionManifest.h

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -162,22 +162,14 @@ struct ExportReplicatedMergeTreePartitionManifest
162162
manifest.max_threads = json->getValue<size_t>("max_threads");
163163
manifest.parallel_formatting = json->getValue<bool>("parallel_formatting");
164164
manifest.parquet_parallel_encoding = json->getValue<bool>("parquet_parallel_encoding");
165-
166-
if (json->has("file_already_exists_policy"))
165+
const auto file_already_exists_policy = magic_enum::enum_cast<MergeTreePartExportManifest::FileAlreadyExistsPolicy>(json->getValue<String>("file_already_exists_policy"));
166+
/// todo what to do if it's not a valid value?
167+
if (file_already_exists_policy)
167168
{
168-
const auto file_already_exists_policy = magic_enum::enum_cast<MergeTreePartExportManifest::FileAlreadyExistsPolicy>(json->getValue<String>("file_already_exists_policy"));
169-
if (file_already_exists_policy)
170-
{
171-
manifest.file_already_exists_policy = file_already_exists_policy.value();
172-
}
173-
174-
/// what to do if it's not a valid value?
169+
manifest.file_already_exists_policy = file_already_exists_policy.value();
175170
}
176171

177-
if (json->has("lock_inside_the_task"))
178-
{
179-
manifest.lock_inside_the_task = json->getValue<bool>("lock_inside_the_task");
180-
}
172+
manifest.lock_inside_the_task = json->getValue<bool>("lock_inside_the_task");
181173

182174
return manifest;
183175
}

src/Storages/MergeTree/ExportPartFromPartitionExportTask.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
namespace DB
99
{
1010

11+
/*
12+
Decorator around the ExportPartTask to lock the part inside the task
13+
*/
1114
class ExportPartFromPartitionExportTask : public IExecutableTask
1215
{
1316
public:

src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ void ExportPartitionTaskScheduler::run()
131131
if (status_in_zk.value() != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING)
132132
{
133133
entry.status = status_in_zk.value();
134-
LOG_INFO(storage.log, "ExportPartition scheduler task: Skipping... Status from zk is {}", entry.status);
134+
LOG_INFO(storage.log, "ExportPartition scheduler task: Skipping {}... Status from zk is {}", key, magic_enum::enum_name(entry.status).data());
135135
continue;
136136
}
137137

src/Storages/MergeTree/MergeTreePartExportManifest.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ struct MergeTreePartExportManifest
7373
std::function<void(CompletionCallbackResult)> completion_callback;
7474

7575
time_t create_time;
76+
/// Required to cancel export tasks
7677
mutable std::shared_ptr<IExecutableTask> task = nullptr;
7778

7879
bool operator<(const MergeTreePartExportManifest & rhs) const

tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,9 @@ def test_kill_export(cluster):
245245
# ZooKeeper operations (KILL) proceed quickly since only S3 is blocked
246246
node.query(f"KILL EXPORT PARTITION WHERE partition_id = '2020' and source_table = '{mt_table}' and destination_table = '{s3_table}'")
247247

248+
# sleep for a while to let the kill to be processed
249+
time.sleep(2)
250+
248251
# wait for 2021 to finish
249252
wait_for_export_status(node, mt_table, s3_table, "2021", "COMPLETED")
250253

0 commit comments

Comments
 (0)