Skip to content

Commit c64498e

Browse files
al13n321mkmkme
authored andcommitted
Merge pull request ClickHouse#87220 from ClickHouse/pqice
Fixes for parquet reader v3, _row_number, and iceberg positioned deletes
1 parent 91e6a82 commit c64498e

33 files changed

+474
-168
lines changed

src/Common/ProfileEvents.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1122,7 +1122,7 @@ The server successfully detected this situation and will download merged part fr
11221122
M(MemoryWorkerRun, "Number of runs done by MemoryWorker in background", ValueType::Number) \
11231123
M(MemoryWorkerRunElapsedMicroseconds, "Total time spent by MemoryWorker for background work", ValueType::Microseconds) \
11241124
\
1125-
M(ParquetFetchWaitTimeMicroseconds, "Time of waiting fetching parquet data", ValueType::Microseconds) \
1125+
M(ParquetFetchWaitTimeMicroseconds, "Time of waiting for parquet file reads from decoding threads (not prefetching threads)", ValueType::Microseconds) \
11261126
M(ParquetReadRowGroups, "The total number of row groups read from parquet data", ValueType::Number) \
11271127
M(ParquetPrunedRowGroups, "The total number of row groups pruned from parquet data", ValueType::Number) \
11281128
M(ParquetDecodingTasks, "Tasks issued by parquet reader", ValueType::Number) \

src/Common/futex.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,16 @@ inline Int64 futexWait(void * address, UInt32 value)
1919
return syscall(SYS_futex, address, FUTEX_WAIT_PRIVATE, value, nullptr, nullptr, 0);
2020
}
2121

22+
inline Int64 futexTimedWait(void * address, UInt32 value, UInt64 nanos)
23+
{
24+
const UInt64 nanos_per_sec = 1'000'000'000;
25+
UInt64 sec = nanos / nanos_per_sec;
26+
struct timespec timeout;
27+
timeout.tv_sec = time_t(std::min(sec, UInt64(std::numeric_limits<time_t>::max())));
28+
timeout.tv_nsec = int64_t(nanos % nanos_per_sec);
29+
return syscall(SYS_futex, address, FUTEX_WAIT_PRIVATE, value, &timeout, nullptr, 0);
30+
}
31+
2232
inline Int64 futexWake(void * address, int count)
2333
{
2434
return syscall(SYS_futex, address, FUTEX_WAKE_PRIVATE, count, nullptr, nullptr, 0);
@@ -37,7 +47,7 @@ inline void futexWakeOne(std::atomic<UInt32> & address)
3747

3848
inline void futexWakeAll(std::atomic<UInt32> & address)
3949
{
40-
futexWake(&address, INT_MAX);
50+
futexWake(&address, INT_MAX);
4151
}
4252

4353
constexpr UInt32 lowerHalf(UInt64 value)

src/Common/threadPoolCallbackRunner.cpp

Lines changed: 83 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,6 @@ void ThreadPoolCallbackRunnerFast::initThreadPool(ThreadPool & pool_, size_t max
2020
max_threads = max_threads_;
2121
thread_name = thread_name_;
2222
thread_group = thread_group_;
23-
24-
/// We could dynamically add and remove threads based on load, but it's not clear whether it's
25-
/// worth the added complexity.
26-
for (size_t i = 0; i < max_threads; ++i)
27-
{
28-
pool->scheduleOrThrowOnError([this] { threadFunction(); });
29-
++threads; // only if scheduleOrThrowOnError didn't throw
30-
}
3123
}
3224

3325
ThreadPoolCallbackRunnerFast::ThreadPoolCallbackRunnerFast(Mode mode_) : mode(mode_)
@@ -58,19 +50,30 @@ void ThreadPoolCallbackRunnerFast::shutdown()
5850
chassert(active_tasks.load() == queue.size());
5951
}
6052

53+
void ThreadPoolCallbackRunnerFast::startMoreThreadsIfNeeded(size_t active_tasks_, std::unique_lock<std::mutex> &)
54+
{
55+
while (threads < max_threads && threads < active_tasks_ && !shutdown_requested)
56+
{
57+
pool->scheduleOrThrow([this] { threadFunction(); });
58+
++threads; // only if scheduleOrThrow didn't throw
59+
}
60+
}
61+
6162
void ThreadPoolCallbackRunnerFast::operator()(std::function<void()> f)
6263
{
6364
if (mode == Mode::Disabled)
6465
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread pool runner is not initialized");
6566

67+
size_t active_tasks_ = 1 + active_tasks.fetch_add(1, std::memory_order_relaxed);
68+
6669
{
6770
std::unique_lock lock(mutex);
6871
queue.push_back(std::move(f));
72+
startMoreThreadsIfNeeded(active_tasks_, lock);
6973
}
7074

7175
if (mode == Mode::ThreadPool)
7276
{
73-
active_tasks.fetch_add(1, std::memory_order_relaxed);
7477
#ifdef OS_LINUX
7578
UInt32 prev_size = queue_size.fetch_add(1, std::memory_order_release);
7679
if (prev_size < max_threads)
@@ -89,14 +92,16 @@ void ThreadPoolCallbackRunnerFast::bulkSchedule(std::vector<std::function<void()
8992
if (mode == Mode::Disabled)
9093
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread pool runner is not initialized");
9194

95+
size_t active_tasks_ = fs.size() + active_tasks.fetch_add(fs.size(), std::memory_order_relaxed);
96+
9297
{
9398
std::unique_lock lock(mutex);
9499
queue.insert(queue.end(), std::move_iterator(fs.begin()), std::move_iterator(fs.end()));
100+
startMoreThreadsIfNeeded(active_tasks_, lock);
95101
}
96102

97103
if (mode == Mode::ThreadPool)
98104
{
99-
active_tasks.fetch_add(fs.size(), std::memory_order_relaxed);
100105
#ifdef OS_LINUX
101106
UInt32 prev_size = queue_size.fetch_add(fs.size(), std::memory_order_release);
102107
if (prev_size < max_threads)
@@ -127,68 +132,94 @@ bool ThreadPoolCallbackRunnerFast::runTaskInline()
127132

128133
void ThreadPoolCallbackRunnerFast::threadFunction()
129134
{
135+
std::optional<ThreadGroupSwitcher> switcher;
136+
switcher.emplace(thread_group, thread_name.c_str());
137+
138+
while (true)
130139
{
131-
ThreadGroupSwitcher switcher(thread_group, thread_name.c_str());
140+
bool timed_out = false;
132141

142+
#ifdef OS_LINUX
143+
UInt32 x = queue_size.load(std::memory_order_relaxed);
133144
while (true)
134145
{
135-
#ifdef OS_LINUX
136-
UInt32 x = queue_size.load(std::memory_order_relaxed);
137-
while (true)
146+
if (x == 0)
138147
{
139-
if (x == 0)
148+
Int64 waited = futexTimedWait(&queue_size, 0, THREAD_IDLE_TIMEOUT_NS);
149+
x = queue_size.load(std::memory_order_relaxed);
150+
151+
if (waited < 0 && errno == ETIMEDOUT && x == 0)
140152
{
141-
futexWait(&queue_size, 0);
142-
x = queue_size.load(std::memory_order_relaxed);
143-
}
144-
else if (queue_size.compare_exchange_weak(
145-
x, x - 1, std::memory_order_acquire, std::memory_order_relaxed))
153+
timed_out = true;
146154
break;
155+
}
147156
}
148-
#endif
157+
else if (queue_size.compare_exchange_weak(
158+
x, x - 1, std::memory_order_acquire, std::memory_order_relaxed))
159+
break;
160+
}
161+
#endif
149162

150-
std::function<void()> f;
151-
{
152-
std::unique_lock lock(mutex);
163+
std::function<void()> f;
164+
{
165+
std::unique_lock lock(mutex);
153166

154-
#ifndef OS_LINUX
155-
queue_cv.wait(lock, [&] { return shutdown_requested || !queue.empty(); });
156-
#endif
167+
#ifdef OS_LINUX
168+
/// Important to never stop the last thread if queue is not empty (checked under the
169+
/// same `lock` as decrementing `threads`). Otherwise we'll deadlock like this:
170+
/// 0. `threads` == 1, queue is empty.
171+
/// 1. The worker thread times out; it didn't lock mutex or decrement `threads` yet.
172+
/// 2. A manager thread enqueues a task. It sees active_tasks == 1 and `threads` == 1,
173+
/// so it doesn't start another thread.
174+
/// 3. The worker thread exits.
175+
/// 4. There are no threads, but the queue is not empty, oops.
176+
if (timed_out && !queue.empty() && !shutdown_requested)
177+
/// We can't just proceed to `queue.pop_front()` here because we haven't
178+
/// decremented queue_size.
179+
continue;
180+
#else
181+
timed_out = !queue_cv.wait_for(
182+
lock, std::chrono::nanoseconds(THREAD_IDLE_TIMEOUT_NS),
183+
[&] { return shutdown_requested || !queue.empty(); });
184+
#endif
157185

158-
if (shutdown_requested)
159-
break;
186+
if (shutdown_requested || timed_out)
187+
{
188+
/// Important that we destroy the `ThreadGroupSwitcher` before decrementing `threads`.
189+
/// Otherwise ~ThreadGroupSwitcher may access global Context after the query is
190+
/// finished, which may race with mutating Context (specifically, Settings) at the
191+
/// start of next query.
192+
switcher.reset();
160193

161-
chassert(!queue.empty());
194+
threads -= 1;
195+
if (threads == 0)
196+
shutdown_cv.notify_all();
162197

163-
f = std::move(queue.front());
164-
queue.pop_front();
198+
return;
165199
}
166200

167-
try
168-
{
169-
f();
201+
chassert(!queue.empty());
170202

171-
CurrentThread::updatePerformanceCountersIfNeeded();
172-
}
173-
catch (...)
174-
{
175-
tryLogCurrentException("FastThreadPool");
176-
chassert(false);
177-
}
203+
f = std::move(queue.front());
204+
queue.pop_front();
205+
}
206+
207+
try
208+
{
209+
f();
178210

179-
active_tasks.fetch_sub(1, std::memory_order_relaxed);
211+
CurrentThread::updatePerformanceCountersIfNeeded();
212+
}
213+
catch (...)
214+
{
215+
tryLogCurrentException("FastThreadPool");
216+
chassert(false);
180217
}
181-
}
182218

183-
/// Important that we destroy the `ThreadGroupSwitcher` before decrementing `threads`.
184-
/// Otherwise ~ThreadGroupSwitcher may access global Context after the query is finished, which
185-
/// may race with mutating Context (specifically, Settings) at the start of next query.
186-
{
187-
std::unique_lock lock(mutex);
188-
threads -= 1;
189-
if (threads == 0)
190-
shutdown_cv.notify_all();
219+
active_tasks.fetch_sub(1, std::memory_order_relaxed);
191220
}
221+
222+
chassert(false);
192223
}
193224

194225
bool ShutdownHelper::try_lock_shared()

src/Common/threadPoolCallbackRunner.h

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,8 +249,6 @@ class ThreadPoolCallbackRunnerFast
249249
Disabled,
250250
};
251251

252-
/// TODO [parquet]: Add metrics for queue size and active threads, and maybe event for tasks executed.
253-
254252
ThreadPoolCallbackRunnerFast();
255253

256254
void initManual()
@@ -282,6 +280,9 @@ class ThreadPoolCallbackRunnerFast
282280
bool isIdle() const { return active_tasks.load(std::memory_order_relaxed) == 0; }
283281

284282
private:
283+
/// Stop thread if it had nothing to do for this long.
284+
static constexpr UInt64 THREAD_IDLE_TIMEOUT_NS = 3'000'000; // 3 ms
285+
285286
Mode mode = Mode::Disabled;
286287
ThreadPool * pool = nullptr;
287288
size_t max_threads = 0;
@@ -309,6 +310,25 @@ class ThreadPoolCallbackRunnerFast
309310
std::condition_variable queue_cv;
310311
#endif
311312

313+
/// We dynamically start more threads when queue grows and stop idle threads after a timeout.
314+
///
315+
/// Interestingly, this is required for correctness, not just performance.
316+
/// If we kept max_threads threads at all times, we may deadlock because the "threads" that we
317+
/// schedule on ThreadPool are not necessarily running, they may be sitting in ThreadPool's
318+
/// queue, blocking other "threads" from running. E.g. this may happen:
319+
/// 1. Iceberg reader creates many parquet readers, and their ThreadPoolCallbackRunnerFast(s)
320+
/// occupy all slots in the shared ThreadPool (getFormatParsingThreadPool()).
321+
/// 2. Iceberg reader creates some more parquet readers for positional deletes, using separate
322+
/// ThreadPoolCallbackRunnerFast-s (because the ones from above are mildly inconvenient to
323+
/// propagate to that code site). Those ThreadPoolCallbackRunnerFast-s make
324+
/// pool->scheduleOrThrowOnError calls, but ThreadPool just adds them to queue, no actual
325+
/// ThreadPoolCallbackRunnerFast::threadFunction()-s are started.
326+
/// 3. The readers from step 2 are stuck because their ThreadPoolCallbackRunnerFast-s have no
327+
/// threads. The readers from step 1 are idle but not destroyed (keep occupying threads)
328+
/// because the iceberg reader is waiting for positional deletes to be read (by readers
329+
/// from step 2). We're stuck.
330+
void startMoreThreadsIfNeeded(size_t active_tasks_, std::unique_lock<std::mutex> &);
331+
312332
void threadFunction();
313333
};
314334

src/Core/FormatFactorySettings.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ Ignore case when matching ORC columns with CH columns.
168168
Ignore case when matching Parquet columns with CH columns.
169169
)", 0) \
170170
DECLARE(Bool, input_format_parquet_preserve_order, false, R"(
171-
Avoid reordering rows when reading from Parquet files. Usually makes it much slower. Not recommended as row ordering is generally not guaranteed, and other parts of query pipeline may break it.
171+
Avoid reordering rows when reading from Parquet files. Not recommended as row ordering is generally not guaranteed, and other parts of query pipeline may break it. Use `ORDER BY _row_number` instead.
172172
)", 0) \
173173
DECLARE(Bool, input_format_parquet_filter_push_down, true, R"(
174174
When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and min/max statistics in the Parquet metadata.

src/Processors/Formats/IInputFormat.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,17 @@
66
namespace DB
77
{
88

9+
ChunkInfoRowNumbers::ChunkInfoRowNumbers(size_t row_num_offset_, std::optional<IColumnFilter> applied_filter_)
10+
: row_num_offset(row_num_offset_), applied_filter(std::move(applied_filter_)) { }
11+
12+
ChunkInfoRowNumbers::Ptr ChunkInfoRowNumbers::clone() const
13+
{
14+
auto res = std::make_shared<ChunkInfoRowNumbers>(row_num_offset);
15+
if (applied_filter.has_value())
16+
res->applied_filter.emplace(applied_filter->begin(), applied_filter->end());
17+
return res;
18+
}
19+
920
IInputFormat::IInputFormat(SharedHeader header, ReadBuffer * in_) : ISource(std::move(header)), in(in_)
1021
{
1122
column_mapping = std::make_shared<ColumnMapping>();

src/Processors/Formats/IInputFormat.h

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <Formats/ColumnMapping.h>
44
#include <IO/ReadBuffer.h>
55
#include <Processors/Formats/InputFormatErrorsLogger.h>
6+
#include <Common/PODArray.h>
67
#include <Core/BlockMissingValues.h>
78
#include <Processors/ISource.h>
89
#include <Core/Settings.h>
@@ -14,13 +15,35 @@ namespace DB
1415
struct SelectQueryInfo;
1516

1617
using ColumnMappingPtr = std::shared_ptr<ColumnMapping>;
17-
18-
struct ChunkInfoRowNumOffset : public ChunkInfoCloneable<ChunkInfoRowNumOffset>
18+
using IColumnFilter = PaddedPODArray<UInt8>;
19+
20+
/// Most (all?) file formats have a natural order of rows within the file.
21+
/// But our format readers and query pipeline may reorder or filter rows. This struct is used to
22+
/// propagate the original row numbers, e.g. for _row_number virtual column or for iceberg
23+
/// positional deletes.
24+
///
25+
/// Warning: we currently don't correctly update this info in most transforms. E.g. things like
26+
/// FilterTransform and SortingTransform logically should remove this ChunkInfo, but don't; we don't
27+
/// have a mechanism to systematically find all code sites that would need to do that or to detect
28+
/// if one was missed.
29+
/// So this is only used in a few specific situations, and the builder of query pipeline must be
30+
/// careful to never put a step that uses this info after a step that breaks it.
31+
///
32+
/// If row numbers in a chunk are consecutive, this contains just the first row number.
33+
/// If row numbers are not consecutive as a result of filtering, this additionally contains the mask
34+
/// that was used for filtering, from which row numbers can be recovered.
35+
struct ChunkInfoRowNumbers : public ChunkInfo
1936
{
20-
ChunkInfoRowNumOffset(const ChunkInfoRowNumOffset & other) = default;
21-
explicit ChunkInfoRowNumOffset(size_t row_num_offset_) : row_num_offset(row_num_offset_) { }
37+
explicit ChunkInfoRowNumbers(size_t row_num_offset_, std::optional<IColumnFilter> applied_filter_ = std::nullopt);
38+
39+
Ptr clone() const override;
2240

2341
const size_t row_num_offset;
42+
/// If nullopt, row numbers are consecutive.
43+
/// If not empty, the number of '1' elements is equal to the number of rows in the chunk;
44+
/// row i in the chunk has row number:
45+
/// row_num_offset + {index of the i-th '1' element in applied_filter}.
46+
std::optional<IColumnFilter> applied_filter;
2447
};
2548

2649
/** Input format is a source, that reads data from ReadBuffer.

0 commit comments

Comments
 (0)