Skip to content

Commit 7dfc00f

Browse files
authored
Check RU in read thread of Storage. (#8386)
close #8362
1 parent 922f80c commit 7dfc00f

File tree

10 files changed

+172
-24
lines changed

10 files changed

+172
-24
lines changed

dbms/src/Common/TiFlashMetrics.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,9 +502,12 @@ namespace DB
502502
Counter, \
503503
F(type_sche_no_pool, {"type", "sche_no_pool"}), \
504504
F(type_sche_no_slot, {"type", "sche_no_slot"}), \
505+
F(type_sche_no_ru, {"type", "sche_no_ru"}), \
505506
F(type_sche_no_segment, {"type", "sche_no_segment"}), \
507+
F(type_sche_active_segment_limit, {"type", "sche_active_segment_limit"}), \
506508
F(type_sche_from_cache, {"type", "sche_from_cache"}), \
507509
F(type_sche_new_task, {"type", "sche_new_task"}), \
510+
F(type_ru_exhausted, {"type", "ru_exhausted"}), \
508511
F(type_add_cache_succ, {"type", "add_cache_succ"}), \
509512
F(type_add_cache_stale, {"type", "add_cache_stale"}), \
510513
F(type_get_cache_miss, {"type", "get_cache_miss"}), \

dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,14 @@ Block ColumnFileSetReader::readPKVersion(size_t offset, size_t limit)
131131
return block;
132132
}
133133

134+
static Int64 columnsSize(MutableColumns & columns)
135+
{
136+
Int64 bytes = 0;
137+
for (const auto & col : columns)
138+
bytes += col->byteSize();
139+
return bytes;
140+
}
141+
134142
size_t ColumnFileSetReader::readRows(
135143
MutableColumns & output_columns,
136144
size_t offset,
@@ -156,6 +164,7 @@ size_t ColumnFileSetReader::readRows(
156164
if (end == start)
157165
return 0;
158166

167+
auto bytes_before_read = columnsSize(output_columns);
159168
auto [start_file_index, rows_start_in_start_file] = locatePosByAccumulation(column_file_rows_end, start);
160169
auto [end_file_index, rows_end_in_end_file] = locatePosByAccumulation(column_file_rows_end, end);
161170

@@ -187,13 +196,13 @@ size_t ColumnFileSetReader::readRows(
187196
}
188197
}
189198

190-
UInt64 delta_bytes = 0;
191-
for (const auto & col : output_columns)
192-
delta_bytes += col->byteSize();
193-
194-
lac_bytes_collector.collect(delta_bytes);
195-
if (likely(context.scan_context))
196-
context.scan_context->total_user_read_bytes += delta_bytes;
199+
if (auto delta_bytes = columnsSize(output_columns) - bytes_before_read; delta_bytes > 0)
200+
{
201+
if (row_ids == nullptr)
202+
lac_bytes_collector.collect(delta_bytes);
203+
if (likely(context.scan_context))
204+
context.scan_context->total_user_read_bytes += delta_bytes;
205+
}
197206

198207
return actual_read;
199208
}

dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -960,7 +960,8 @@ BlockInputStreams DeltaMergeStore::readRaw(
960960
after_segment_read,
961961
req_info,
962962
enable_read_thread,
963-
final_num_stream);
963+
final_num_stream,
964+
dm_context->scan_context->resource_group_name);
964965

965966
BlockInputStreams res;
966967
for (size_t i = 0; i < final_num_stream; ++i)
@@ -1062,7 +1063,8 @@ void DeltaMergeStore::readRaw(
10621063
after_segment_read,
10631064
req_info,
10641065
enable_read_thread,
1065-
final_num_stream);
1066+
final_num_stream,
1067+
dm_context->scan_context->resource_group_name);
10661068

10671069
if (enable_read_thread)
10681070
{
@@ -1196,7 +1198,8 @@ BlockInputStreams DeltaMergeStore::read(
11961198
after_segment_read,
11971199
log_tracing_id,
11981200
enable_read_thread,
1199-
final_num_stream);
1201+
final_num_stream,
1202+
dm_context->scan_context->resource_group_name);
12001203

12011204
BlockInputStreams res;
12021205
for (size_t i = 0; i < final_num_stream; ++i)
@@ -1299,7 +1302,8 @@ void DeltaMergeStore::read(
12991302
after_segment_read,
13001303
log_tracing_id,
13011304
enable_read_thread,
1302-
final_num_stream);
1305+
final_num_stream,
1306+
dm_context->scan_context->resource_group_name);
13031307

13041308
if (enable_read_thread)
13051309
{

dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ void MergedTask::initOnce()
4747
setStreamFinished(cur_idx);
4848
continue;
4949
}
50+
if (pool->isRUExhausted())
51+
{
52+
continue;
53+
}
5054
stream = pool->buildInputStream(task);
5155
fiu_do_on(FailPoints::exception_in_merged_task_init, {
5256
throw Exception("Fail point exception_in_merged_task_init is triggered.", ErrorCodes::FAIL_POINT_ERROR);
@@ -74,11 +78,16 @@ int MergedTask::readOneBlock()
7478
continue;
7579
}
7680

77-
if (pool->getFreeBlockSlots() <= 0)
81+
if (pool->getFreeBlockSlots() <= 0 || pool->isRUExhausted())
7882
{
7983
continue;
8084
}
8185

86+
if (stream == nullptr)
87+
{
88+
stream = pool->buildInputStream(task);
89+
}
90+
8291
if (pool->readOneBlock(stream, task->segment))
8392
{
8493
read_block_count++;

dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,39 @@ SegmentReadTaskPools SegmentReadTaskScheduler::getPoolsUnlock(const std::vector<
123123

124124
bool SegmentReadTaskScheduler::needScheduleToRead(const SegmentReadTaskPoolPtr & pool)
125125
{
126-
return pool->getFreeBlockSlots() > 0 && // Block queue is not full and
127-
(merged_task_pool.has(pool->pool_id) || // can schedule a segment from MergedTaskPool or
128-
pool->getFreeActiveSegments() > 0); // schedule a new segment.
126+
if (pool->getFreeBlockSlots() <= 0)
127+
{
128+
GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_slot).Increment();
129+
return false;
130+
}
131+
132+
if (pool->isRUExhausted())
133+
{
134+
GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_ru).Increment();
135+
return false;
136+
}
137+
138+
// Check if there are segments that can be scheduled:
139+
// 1. There are already activated segments.
140+
if (merged_task_pool.has(pool->pool_id))
141+
{
142+
return true;
143+
}
144+
// 2. Not reach limitation, we can activate a segment.
145+
if (pool->getFreeActiveSegments() > 0 && pool->getPendingSegmentCount() > 0)
146+
{
147+
return true;
148+
}
149+
150+
if (pool->getFreeActiveSegments() <= 0)
151+
{
152+
GET_METRIC(tiflash_storage_read_thread_counter, type_sche_active_segment_limit).Increment();
153+
}
154+
else
155+
{
156+
GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_segment).Increment();
157+
}
158+
return false;
129159
}
130160

131161
SegmentReadTaskPoolPtr SegmentReadTaskScheduler::scheduleSegmentReadTaskPoolUnlock()
@@ -145,10 +175,6 @@ SegmentReadTaskPoolPtr SegmentReadTaskScheduler::scheduleSegmentReadTaskPoolUnlo
145175
{
146176
GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_pool).Increment();
147177
}
148-
else
149-
{
150-
GET_METRIC(tiflash_storage_read_thread_counter, type_sche_no_slot).Increment();
151-
}
152178
return nullptr;
153179
}
154180

dbms/src/Storages/DeltaMerge/Segment.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ struct SegmentSnapshot : private boost::noncopyable
6969
UInt64 getRows() const { return delta->getRows() + stable->getRows(); }
7070

7171
bool isForUpdate() const { return delta->isForUpdate(); }
72+
73+
UInt64 estimatedBytesOfInternalColumns() const
74+
{
75+
// TODO: how about cluster index?
76+
// handle + version + flag
77+
return (sizeof(Int64) + sizeof(UInt64) + sizeof(UInt8)) * getRows();
78+
}
7279
};
7380

7481
/// A segment contains many rows of a table. A table is split into segments by consecutive ranges.

dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,11 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t
169169
auto block_size = std::max(
170170
expected_block_size,
171171
static_cast<size_t>(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows));
172+
if (likely(read_mode == ReadMode::Bitmap && !res_group_name.empty()))
173+
{
174+
auto bytes = t->read_snapshot->estimatedBytesOfInternalColumns();
175+
LocalAdmissionController::global_instance->consumeResource(res_group_name, bytesToRU(bytes), 0);
176+
}
172177
stream = t->segment->getInputStream(
173178
read_mode,
174179
*dm_context,
@@ -201,7 +206,8 @@ SegmentReadTaskPool::SegmentReadTaskPool(
201206
AfterSegmentRead after_segment_read_,
202207
const String & tracing_id,
203208
bool enable_read_thread_,
204-
Int64 num_streams_)
209+
Int64 num_streams_,
210+
const String & res_group_name_)
205211
: pool_id(nextPoolId())
206212
, physical_table_id(physical_table_id_)
207213
, mem_tracker(current_memory_tracker == nullptr ? nullptr : current_memory_tracker->shared_from_this())
@@ -224,6 +230,7 @@ SegmentReadTaskPool::SegmentReadTaskPool(
224230
// Limiting the minimum number of reading segments to 2 is to avoid, as much as possible,
225231
// situations where the computation may be faster and the storage layer may not be able to keep up.
226232
, active_segment_limit(std::max(num_streams_, 2))
233+
, res_group_name(res_group_name_)
227234
{
228235
if (tasks_wrapper.empty())
229236
{
@@ -355,6 +362,7 @@ void SegmentReadTaskPool::pushBlock(Block && block)
355362
{
356363
blk_stat.push(block);
357364
global_blk_stat.push(block);
365+
read_bytes_after_last_check += block.bytes();
358366
q.push(std::move(block), nullptr);
359367
}
360368

@@ -383,6 +391,12 @@ Int64 SegmentReadTaskPool::getFreeActiveSegmentsUnlock() const
383391
return active_segment_limit - static_cast<Int64>(active_segment_ids.size());
384392
}
385393

394+
Int64 SegmentReadTaskPool::getPendingSegmentCount() const
395+
{
396+
std::lock_guard lock(mutex);
397+
return tasks_wrapper.getTasks().size();
398+
}
399+
386400
bool SegmentReadTaskPool::exceptionHappened() const
387401
{
388402
return exception_happened.load(std::memory_order_relaxed);
@@ -403,4 +417,63 @@ void SegmentReadTaskPool::setException(const DB::Exception & e)
403417
}
404418
}
405419

420+
static Int64 currentMS()
421+
{
422+
return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch())
423+
.count();
424+
}
425+
426+
static bool checkIsRUExhausted(const String & res_group_name)
427+
{
428+
auto priority = LocalAdmissionController::global_instance->getPriority(res_group_name);
429+
if (unlikely(!priority.has_value()))
430+
{
431+
return false;
432+
}
433+
return LocalAdmissionController::isRUExhausted(*priority);
434+
}
435+
436+
bool SegmentReadTaskPool::isRUExhausted()
437+
{
438+
auto res = isRUExhaustedImpl();
439+
if (res)
440+
{
441+
GET_METRIC(tiflash_storage_read_thread_counter, type_ru_exhausted).Increment();
442+
}
443+
return res;
444+
}
445+
446+
bool SegmentReadTaskPool::isRUExhaustedImpl()
447+
{
448+
if (unlikely(res_group_name.empty() || LocalAdmissionController::global_instance == nullptr))
449+
{
450+
return false;
451+
}
452+
453+
// To reduce lock contention in resource control,
454+
// check if RU is exhuasted every `bytes_of_one_hundred_ru` or every `100ms`.
455+
456+
// Fast path.
457+
Int64 ms = currentMS();
458+
if (read_bytes_after_last_check < bytes_of_one_hundred_ru && ms - last_time_check_ru < check_ru_interval_ms)
459+
{
460+
return ru_is_exhausted; // Return result of last time.
461+
}
462+
463+
std::lock_guard lock(ru_mu);
464+
// If last thread has check is ru exhausted, use the result of last thread.
465+
// Attention: `read_bytes_after_last_check` can be written concurrently in `pushBlock`.
466+
ms = currentMS();
467+
if (read_bytes_after_last_check < bytes_of_one_hundred_ru && ms - last_time_check_ru < check_ru_interval_ms)
468+
{
469+
return ru_is_exhausted; // Return result of last time.
470+
}
471+
472+
// Check and reset everything.
473+
read_bytes_after_last_check = 0;
474+
ru_is_exhausted = checkIsRUExhausted(res_group_name);
475+
last_time_check_ru = ms;
476+
return ru_is_exhausted;
477+
}
478+
406479
} // namespace DB::DM

dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ class SegmentReadTaskPool : private boost::noncopyable
167167
AfterSegmentRead after_segment_read_,
168168
const String & tracing_id,
169169
bool enable_read_thread_,
170-
Int64 num_streams_);
170+
Int64 num_streams_,
171+
const String & res_group_name_);
171172

172173
~SegmentReadTaskPool()
173174
{
@@ -216,6 +217,7 @@ class SegmentReadTaskPool : private boost::noncopyable
216217
Int64 decreaseUnorderedInputStreamRefCount();
217218
Int64 getFreeBlockSlots() const;
218219
Int64 getFreeActiveSegments() const;
220+
Int64 getPendingSegmentCount() const;
219221
bool valid() const;
220222
void setException(const DB::Exception & e);
221223

@@ -245,12 +247,16 @@ class SegmentReadTaskPool : private boost::noncopyable
245247
}
246248
}
247249

250+
bool isRUExhausted();
251+
248252
private:
249253
Int64 getFreeActiveSegmentsUnlock() const;
250254
bool exceptionHappened() const;
251255
void finishSegment(const SegmentPtr & seg);
252256
void pushBlock(Block && block);
253257

258+
bool isRUExhaustedImpl();
259+
254260
const int extra_table_id_index;
255261
DMContextPtr dm_context;
256262
ColumnDefines columns_to_read;
@@ -280,9 +286,16 @@ class SegmentReadTaskPool : private boost::noncopyable
280286
const Int64 block_slot_limit;
281287
const Int64 active_segment_limit;
282288

289+
const String res_group_name;
290+
std::mutex ru_mu;
291+
std::atomic<Int64> last_time_check_ru = 0;
292+
std::atomic<bool> ru_is_exhausted = false;
293+
std::atomic<UInt64> read_bytes_after_last_check = 0;
294+
283295
inline static std::atomic<uint64_t> pool_id_gen{1};
284296
inline static BlockStat global_blk_stat;
285297
static uint64_t nextPoolId() { return pool_id_gen.fetch_add(1, std::memory_order_relaxed); }
298+
inline static constexpr Int64 check_ru_interval_ms = 100;
286299
};
287300

288301
using SegmentReadTaskPoolPtr = std::shared_ptr<SegmentReadTaskPool>;

dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,10 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
221221
if (likely(scan_context != nullptr))
222222
{
223223
scan_context->total_user_read_bytes += bytes;
224-
lac_bytes_collector.collect(bytes);
224+
if constexpr (!need_row_id)
225+
{
226+
lac_bytes_collector.collect(bytes);
227+
}
225228
}
226229
}
227230
BlockInputStreams::iterator current_stream;

dbms/src/Storages/StorageDisaggregatedRemote.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,9 @@ DM::Remote::RNReadTaskPtr StorageDisaggregated::buildReadTaskWithBackoff(const C
122122
{
123123
using namespace pingcap;
124124

125-
auto scan_context = std::make_shared<DM::ScanContext>();
126-
context.getDAGContext()->scan_context_map[table_scan.getTableScanExecutorID()] = scan_context;
125+
auto * dag_context = context.getDAGContext();
126+
auto scan_context = std::make_shared<DM::ScanContext>(dag_context->getResourceGroupName());
127+
dag_context->scan_context_map[table_scan.getTableScanExecutorID()] = scan_context;
127128

128129
DM::Remote::RNReadTaskPtr read_task;
129130

0 commit comments

Comments
 (0)