Skip to content

Commit 78af18a

Browse files
authored
Merge pull request ClickHouse#63513 from ClickHouse/threadpool-runner-local-prefetch-read-pool
Make `MergeTreePrefetchedReadPool` safer
2 parents c34ae6d + 173d5d0 commit 78af18a

File tree

4 files changed

+35
-46
lines changed

4 files changed

+35
-46
lines changed

src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp

Lines changed: 14 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -46,63 +46,38 @@ bool MergeTreePrefetchedReadPool::TaskHolder::operator<(const TaskHolder & other
4646
}
4747

4848

49-
MergeTreePrefetchedReadPool::PrefetchedReaders::~PrefetchedReaders()
50-
{
51-
for (auto & prefetch_future : prefetch_futures)
52-
if (prefetch_future.valid())
53-
prefetch_future.wait();
54-
}
55-
5649
MergeTreePrefetchedReadPool::PrefetchedReaders::PrefetchedReaders(
50+
ThreadPool & pool,
5751
MergeTreeReadTask::Readers readers_,
5852
Priority priority_,
59-
MergeTreePrefetchedReadPool & pool_)
53+
MergeTreePrefetchedReadPool & read_prefetch)
6054
: is_valid(true)
6155
, readers(std::move(readers_))
56+
, prefetch_runner(pool, "ReadPrepare")
6257
{
63-
try
64-
{
65-
prefetch_futures.reserve(1 + readers.prewhere.size());
66-
67-
prefetch_futures.push_back(pool_.createPrefetchedFuture(readers.main.get(), priority_));
58+
prefetch_runner(read_prefetch.createPrefetchedTask(readers.main.get(), priority_));
6859

69-
for (const auto & reader : readers.prewhere)
70-
prefetch_futures.push_back(pool_.createPrefetchedFuture(reader.get(), priority_));
60+
for (const auto & reader : readers.prewhere)
61+
prefetch_runner(read_prefetch.createPrefetchedTask(reader.get(), priority_));
7162

72-
fiu_do_on(FailPoints::prefetched_reader_pool_failpoint,
73-
{
74-
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Failpoint for prefetched reader enabled");
75-
});
76-
}
77-
catch (...) /// in case of memory exceptions we have to wait
63+
fiu_do_on(FailPoints::prefetched_reader_pool_failpoint,
7864
{
79-
for (auto & prefetch_future : prefetch_futures)
80-
if (prefetch_future.valid())
81-
prefetch_future.wait();
82-
83-
throw;
84-
}
65+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Failpoint for prefetched reader enabled");
66+
});
8567
}
8668

8769
void MergeTreePrefetchedReadPool::PrefetchedReaders::wait()
8870
{
8971
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::WaitPrefetchTaskMicroseconds);
90-
for (auto & prefetch_future : prefetch_futures)
91-
prefetch_future.wait();
72+
prefetch_runner.waitForAllToFinish();
9273
}
9374

9475
MergeTreeReadTask::Readers MergeTreePrefetchedReadPool::PrefetchedReaders::get()
9576
{
9677
SCOPE_EXIT({ is_valid = false; });
9778
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::WaitPrefetchTaskMicroseconds);
9879

99-
/// First wait for completion of all futures.
100-
for (auto & prefetch_future : prefetch_futures)
101-
prefetch_future.wait();
102-
103-
/// Then rethrow first exception if any.
104-
for (auto & prefetch_future : prefetch_futures)
105-
prefetch_future.get();
80+
prefetch_runner.waitForAllToFinishAndRethrowFirstError();
10681

10782
return std::move(readers);
10883
}
@@ -139,22 +114,20 @@ MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool(
139114
fillPerThreadTasks(pool_settings.threads, pool_settings.sum_marks);
140115
}
141116

142-
std::future<void> MergeTreePrefetchedReadPool::createPrefetchedFuture(IMergeTreeReader * reader, Priority priority)
117+
std::function<void()> MergeTreePrefetchedReadPool::createPrefetchedTask(IMergeTreeReader * reader, Priority priority)
143118
{
144119
/// In order to make a prefetch we need to wait for marks to be loaded. But we just created
145120
/// a reader (which starts loading marks in its constructor), then if we do prefetch right
146121
/// after creating a reader, it will be very inefficient. We can do prefetch for all parts
147122
/// only inside this MergeTreePrefetchedReadPool, where read tasks are created and distributed,
148123
/// and we cannot block either, therefore make prefetch inside the pool and put the future
149124
/// into the thread task. When a thread calls getTask(), it will wait for it is not ready yet.
150-
auto task = [=, context = getContext()]() mutable
125+
return [=, context = getContext()]() mutable
151126
{
152127
/// For async read metrics in system.query_log.
153128
PrefetchIncrement watch(context->getAsyncReadCounters());
154129
reader->prefetchBeginOfRange(priority);
155130
};
156-
157-
return scheduleFromThreadPoolUnsafe<void>(std::move(task), prefetch_threadpool, "ReadPrepare", priority);
158131
}
159132

160133
void MergeTreePrefetchedReadPool::createPrefetchedReadersForTask(ThreadTask & task)
@@ -164,7 +137,7 @@ void MergeTreePrefetchedReadPool::createPrefetchedReadersForTask(ThreadTask & ta
164137

165138
auto extras = getExtras();
166139
auto readers = MergeTreeReadTask::createReaders(task.read_info, extras, task.ranges);
167-
task.readers_future = std::make_unique<PrefetchedReaders>(std::move(readers), task.priority, *this);
140+
task.readers_future = std::make_unique<PrefetchedReaders>(prefetch_threadpool, std::move(readers), task.priority, *this);
168141
}
169142

170143
void MergeTreePrefetchedReadPool::startPrefetches()

src/Storages/MergeTree/MergeTreePrefetchedReadPool.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22
#include <Storages/MergeTree/MergeTreeReadPoolBase.h>
3+
#include <Common/threadPoolCallbackRunner.h>
34
#include <Common/ThreadPool_fwd.h>
45
#include <IO/AsyncReadCounters.h>
56
#include <boost/heap/priority_queue.hpp>
@@ -51,18 +52,18 @@ class MergeTreePrefetchedReadPool : public MergeTreeReadPoolBase, private WithCo
5152
class PrefetchedReaders
5253
{
5354
public:
54-
PrefetchedReaders() = default;
55-
PrefetchedReaders(MergeTreeReadTask::Readers readers_, Priority priority_, MergeTreePrefetchedReadPool & pool_);
55+
PrefetchedReaders(
56+
ThreadPool & pool, MergeTreeReadTask::Readers readers_, Priority priority_, MergeTreePrefetchedReadPool & read_prefetch);
5657

5758
void wait();
5859
MergeTreeReadTask::Readers get();
5960
bool valid() const { return is_valid; }
60-
~PrefetchedReaders();
6161

6262
private:
6363
bool is_valid = false;
6464
MergeTreeReadTask::Readers readers;
65-
std::vector<std::future<void>> prefetch_futures;
65+
66+
ThreadPoolCallbackRunnerLocal<void> prefetch_runner;
6667
};
6768

6869
struct ThreadTask
@@ -108,7 +109,7 @@ class MergeTreePrefetchedReadPool : public MergeTreeReadPoolBase, private WithCo
108109

109110
void startPrefetches();
110111
void createPrefetchedReadersForTask(ThreadTask & task);
111-
std::future<void> createPrefetchedFuture(IMergeTreeReader * reader, Priority priority);
112+
std::function<void()> createPrefetchedTask(IMergeTreeReader * reader, Priority priority);
112113

113114
MergeTreeReadTaskPtr stealTask(size_t thread, MergeTreeReadTask * previous_task);
114115
MergeTreeReadTaskPtr createTask(ThreadTask & thread_task, MergeTreeReadTask * previous_task);

src/Storages/MergeTree/MergeTreeReadPoolBase.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,19 @@ void MergeTreeReadPoolBase::fillPerPartInfos()
5353
MergeTreeReadTaskInfo read_task_info;
5454

5555
read_task_info.data_part = part_with_ranges.data_part;
56+
57+
const auto & data_part = read_task_info.data_part;
58+
if (data_part->isProjectionPart())
59+
{
60+
read_task_info.parent_part = data_part->storage.getPartIfExists(
61+
data_part->getParentPartName(),
62+
{MergeTreeDataPartState::PreActive, MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
63+
64+
if (!read_task_info.parent_part)
65+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Did not find parent part {} for projection part {}",
66+
data_part->getParentPartName(), data_part->getDataPartStorage().getFullPath());
67+
}
68+
5669
read_task_info.part_index_in_query = part_with_ranges.part_index_in_query;
5770
read_task_info.alter_conversions = part_with_ranges.alter_conversions;
5871

src/Storages/MergeTree/MergeTreeReadTask.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ struct MergeTreeReadTaskInfo
5656
{
5757
/// Data part which should be read while performing this task
5858
DataPartPtr data_part;
59+
/// Parent part of the projection part
60+
DataPartPtr parent_part;
5961
/// For `part_index` virtual column
6062
size_t part_index_in_query;
6163
/// Alter converversionss that should be applied on-fly for part.

0 commit comments

Comments
 (0)