Skip to content

Commit d51d703

Browse files
committed
tmp
1 parent fadf2f7 commit d51d703

File tree

6 files changed

+43
-1
lines changed

6 files changed

+43
-1
lines changed

src/Storages/MergeTree/BackgroundJobsAssignee.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ bool BackgroundJobsAssignee::scheduleCommonTask(ExecutableTaskPtr common_task, b
9595
return schedule_res;
9696
}
9797

98+
std::size_t BackgroundJobsAssignee::getAvailableMoveExecutors() const
99+
{
100+
return getContext()->getMovesExecutor()->getAvailableSlots();
101+
}
98102

99103
String BackgroundJobsAssignee::toString(Type type)
100104
{

src/Storages/MergeTree/BackgroundJobsAssignee.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ class BackgroundJobsAssignee : public WithContext
5555
bool scheduleMoveTask(ExecutableTaskPtr move_task);
5656
bool scheduleCommonTask(ExecutableTaskPtr common_task, bool need_trigger);
5757

58+
std::size_t getAvailableMoveExecutors() const;
59+
5860
/// Just call finish
5961
~BackgroundJobsAssignee();
6062

src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,14 @@ bool ExportPartFromPartitionExportTask::executeStep()
2727
{
2828
const auto zk = storage.getZooKeeper();
2929
const auto part_name = manifest.data_part->name;
30-
30+
31+
LOG_INFO(storage.log, "ExportPartition scheduler task: Attempting to lock part: {}", part_name);
32+
3133
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
3234
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate);
3335
if (Coordination::Error::ZOK == zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / part_name, storage.replica_name, zkutil::CreateMode::Ephemeral))
3436
{
37+
LOG_INFO(storage.log, "ExportPartition scheduler task: Locked part: {}", part_name);
3538
export_part_task->executeStep();
3639
return false;
3740
}

src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,30 @@ ExportPartitionTaskScheduler::ExportPartitionTaskScheduler(StorageReplicatedMerg
6060

6161
void ExportPartitionTaskScheduler::run()
6262
{
63+
const auto available_move_executors = storage.background_moves_assignee.getAvailableMoveExecutors();
64+
if (available_move_executors == 0)
65+
{
66+
LOG_INFO(storage.log, "ExportPartition scheduler task: No available move executors, skipping");
67+
return;
68+
}
69+
70+
LOG_INFO(storage.log, "ExportPartition scheduler task: Available move executors: {}", available_move_executors);
71+
72+
std::size_t scheduled_exports_count = 0;
73+
6374
std::lock_guard lock(storage.export_merge_tree_partition_mutex);
6475

6576
auto zk = storage.getZooKeeper();
6677

6778
// Iterate sorted by create_time
6879
for (auto & entry : storage.export_merge_tree_partition_task_entries_by_create_time)
6980
{
81+
if (scheduled_exports_count >= available_move_executors)
82+
{
83+
LOG_INFO(storage.log, "ExportPartition scheduler task: Scheduled exports count is greater than available move executors, skipping");
84+
break;
85+
}
86+
7087
const auto & manifest = entry.manifest;
7188
const auto key = entry.getCompositeKey();
7289
const auto database = storage.getContext()->resolveDatabase(manifest.destination_database);
@@ -144,6 +161,12 @@ void ExportPartitionTaskScheduler::run()
144161

145162
for (const auto & zk_part_name : parts_in_processing_or_pending)
146163
{
164+
if (scheduled_exports_count >= available_move_executors)
165+
{
166+
LOG_INFO(storage.log, "ExportPartition scheduler task: Scheduled exports count is greater than available move executors, skipping");
167+
break;
168+
}
169+
147170
if (locked_parts_set.contains(zk_part_name))
148171
{
149172
LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is locked, skipping", zk_part_name);
@@ -157,6 +180,8 @@ void ExportPartitionTaskScheduler::run()
157180
continue;
158181
}
159182

183+
LOG_INFO(storage.log, "ExportPartition scheduler task: Scheduling part export: {}", zk_part_name);
184+
160185
std::lock_guard part_export_lock(storage.export_manifests_mutex);
161186

162187
auto context = getContextCopyWithTaskSettings(storage.getContext(), manifest);

src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,12 @@ size_t MergeTreeBackgroundExecutor<Queue>::getMaxTasksCount() const
136136
return max_tasks_count.load(std::memory_order_relaxed);
137137
}
138138

139+
template <class Queue>
140+
size_t MergeTreeBackgroundExecutor<Queue>::getAvailableSlots() const
141+
{
142+
return getMaxTasksCount() - CurrentMetrics::values[metric].load(std::memory_order_relaxed);
143+
}
144+
139145
template <class Queue>
140146
bool MergeTreeBackgroundExecutor<Queue>::trySchedule(ExecutableTaskPtr task)
141147
{

src/Storages/MergeTree/MergeTreeBackgroundExecutor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,8 @@ class MergeTreeBackgroundExecutor final : boost::noncopyable
279279
/// can lead only to some postponing, not logical error.
280280
size_t getMaxTasksCount() const;
281281

282+
size_t getAvailableSlots() const;
283+
282284
bool trySchedule(ExecutableTaskPtr task);
283285
void removeTasksCorrespondingToStorage(StorageID id);
284286
void wait();

0 commit comments

Comments
 (0)