Skip to content

Commit ef5807b

Browse files
committed
okish
1 parent d51d703 commit ef5807b

File tree

2 files changed

+14
-4
lines changed

2 files changed

+14
-4
lines changed

src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ ExportPartitionTaskScheduler::ExportPartitionTaskScheduler(StorageReplicatedMerg
6161
void ExportPartitionTaskScheduler::run()
6262
{
6363
const auto available_move_executors = storage.background_moves_assignee.getAvailableMoveExecutors();
64+
65+
/// this is subject to TOCTOU - but for now we choose to live with it.
6466
if (available_move_executors == 0)
6567
{
6668
LOG_INFO(storage.log, "ExportPartition scheduler task: No available move executors, skipping");
@@ -71,6 +73,9 @@ void ExportPartitionTaskScheduler::run()
7173

7274
std::size_t scheduled_exports_count = 0;
7375

76+
const uint32_t seed = uint32_t(std::hash<std::string>{}(storage.replica_name)) ^ uint32_t(scheduled_exports_count);
77+
std::mt19937 rng(seed);
78+
7479
std::lock_guard lock(storage.export_merge_tree_partition_mutex);
7580

7681
auto zk = storage.getZooKeeper();
@@ -147,6 +152,9 @@ void ExportPartitionTaskScheduler::run()
147152
continue;
148153
}
149154

155+
/// shuffle the parts to reduce the risk of lock collisions
156+
std::shuffle(parts_in_processing_or_pending.begin(), parts_in_processing_or_pending.end(), rng);
157+
150158
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
151159
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren);
152160
std::vector<std::string> locked_parts;
@@ -200,9 +208,11 @@ void ExportPartitionTaskScheduler::run()
200208
{
201209
handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result);
202210
});
211+
212+
scheduled_exports_count++;
203213

204-
storage.background_moves_assignee.scheduleMoveTask(
205-
std::make_shared<ExportPartFromPartitionExportTask>(storage, key, part_export_manifest, getContextCopyWithTaskSettings(storage.getContext(), manifest)));
214+
part_export_manifest.task = std::make_shared<ExportPartFromPartitionExportTask>(storage, key, part_export_manifest, getContextCopyWithTaskSettings(storage.getContext(), manifest));
215+
storage.background_moves_assignee.scheduleMoveTask(part_export_manifest.task);
206216
}
207217
}
208218
}

src/Storages/MergeTree/MergeTreePartExportManifest.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace DB
1111

1212
class Exception;
1313

14-
class ExportPartTask;
14+
class IExecutableTask;
1515

1616
struct MergeTreePartExportManifest
1717
{
@@ -73,7 +73,7 @@ struct MergeTreePartExportManifest
7373

7474
time_t create_time;
7575
mutable bool in_progress = false;
76-
mutable std::shared_ptr<ExportPartTask> task = nullptr;
76+
mutable std::shared_ptr<IExecutableTask> task = nullptr;
7777

7878
bool operator<(const MergeTreePartExportManifest & rhs) const
7979
{

0 commit comments

Comments
 (0)