Skip to content

Commit 20676d6

Browse files
Merge pull request ClickHouse#86468 from ClickHouse/backport/25.8/86422
Backport ClickHouse#86422 to 25.8: Fix leaking of MergesMutationsMemoryTracking and fix query_views_log for streaming
2 parents 1823f23 + 9476064 commit 20676d6

File tree

5 files changed

+39
-30
lines changed

5 files changed

+39
-30
lines changed

src/Common/ThreadStatus.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,11 @@ class ThreadGroup
117117
/// When new query starts, new thread group is created for it, current thread becomes master thread of the query
118118
static ThreadGroupPtr createForQuery(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {});
119119

120-
static ThreadGroupPtr createForBackgroundProcess(ContextPtr storage_context);
120+
/// NOTE: The caller should call background_memory_tracker.adjustOnBackgroundTaskEnd() at the end (see existing callers),
121+
/// and make sure that you are the only user of this shared_ptr (usually it is managed via ThreadGroupSwitcher)
122+
static ThreadGroupPtr createForMergeMutate(ContextPtr storage_context);
121123

122-
static ThreadGroupPtr createForMaterializedView();
124+
static ThreadGroupPtr createForMaterializedView(ContextPtr context);
123125

124126
std::vector<UInt64> getInvolvedThreadIds() const;
125127
size_t getPeakThreadsUsage() const;
@@ -144,6 +146,8 @@ class ThreadGroup
144146
size_t peak_threads_usage TSA_GUARDED_BY(mutex) = 0;
145147

146148
UInt64 elapsed_total_threads_counter_ms TSA_GUARDED_BY(mutex) = 0;
149+
150+
static ThreadGroupPtr create(ContextPtr context);
147151
};
148152

149153
/**

src/Interpreters/InsertDependenciesBuilder.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -964,7 +964,11 @@ bool InsertDependenciesBuilder::observePath(const DependencyPath & path)
964964
select_contexts[root_view] = select_context;
965965
insert_contexts[root_view] = insert_context;
966966
input_headers[root_view] = init_header;
967-
thread_groups[root_view] = CurrentThread::getGroup();
967+
/// For background tasks (i.e. Buffer flush) there may not be any group
968+
if (auto thread_group = CurrentThread::getGroup())
969+
thread_groups[root_view] = thread_group;
970+
else
971+
thread_groups[root_view] = ThreadGroup::createForMaterializedView(init_context);
968972
views_error_registry->init(root_view);
969973
dependent_views[root_view] = {};
970974
};
@@ -990,7 +994,7 @@ bool InsertDependenciesBuilder::observePath(const DependencyPath & path)
990994

991995
inner_tables[current] = materialized_view->getTargetTableId();
992996
source_tables[current] = parent;
993-
thread_groups[current] = ThreadGroup::createForMaterializedView();
997+
thread_groups[current] = ThreadGroup::createForMaterializedView(init_context);
994998
view_types[current] = QueryViewsLogElement::ViewType::MATERIALIZED;
995999
views_error_registry->init(current);
9961000

@@ -1019,7 +1023,7 @@ bool InsertDependenciesBuilder::observePath(const DependencyPath & path)
10191023
inner_tables[current] = current;
10201024
select_queries[current] = live_view->getInnerQuery();
10211025
input_headers[current] = output_headers.at(path.parent(2));
1022-
thread_groups[current] = ThreadGroup::createForMaterializedView();
1026+
thread_groups[current] = ThreadGroup::createForMaterializedView(init_context);
10231027
view_types[current] = QueryViewsLogElement::ViewType::LIVE;
10241028
views_error_registry->init(current);
10251029

@@ -1051,7 +1055,7 @@ bool InsertDependenciesBuilder::observePath(const DependencyPath & path)
10511055
inner_tables[current] = current;
10521056
select_queries[current] = window_view->getMergeableQuery();
10531057
input_headers[current] = output_headers.at(path.parent(2));
1054-
thread_groups[current] = ThreadGroup::createForMaterializedView();
1058+
thread_groups[current] = ThreadGroup::createForMaterializedView(init_context);
10551059
view_types[current] = QueryViewsLogElement::ViewType::WINDOW;
10561060
views_error_registry->init(current);
10571061

src/Interpreters/ThreadStatusExt.cpp

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -145,36 +145,42 @@ ThreadGroupPtr ThreadGroup::createForQuery(ContextPtr query_context_, std::funct
145145
return group;
146146
}
147147

148-
ThreadGroupPtr ThreadGroup::createForBackgroundProcess(ContextPtr storage_context)
148+
ThreadGroupPtr ThreadGroup::create(ContextPtr context)
149149
{
150-
auto group = std::make_shared<ThreadGroup>(storage_context);
150+
auto group = std::make_shared<ThreadGroup>(context);
151151

152-
group->memory_tracker.setDescription("Background process (mutate/merge)");
153152
/// However settings from storage context have to be applied
154-
const Settings & settings = storage_context->getSettingsRef();
153+
const Settings & settings = context->getSettingsRef();
155154
group->memory_tracker.setProfilerStep(settings[Setting::memory_profiler_step]);
156155
group->memory_tracker.setSampleProbability(settings[Setting::memory_profiler_sample_probability]);
157156
group->memory_tracker.setSampleMinAllocationSize(settings[Setting::memory_profiler_sample_min_allocation_size]);
158157
group->memory_tracker.setSampleMaxAllocationSize(settings[Setting::memory_profiler_sample_max_allocation_size]);
159158
group->memory_tracker.setSoftLimit(settings[Setting::memory_overcommit_ratio_denominator]);
160-
group->memory_tracker.setParent(&background_memory_tracker);
161159
if (settings[Setting::memory_tracker_fault_probability] > 0.0)
162160
group->memory_tracker.setFaultProbability(settings[Setting::memory_tracker_fault_probability]);
163161

164162
return group;
165163
}
166164

167-
ThreadGroupPtr ThreadGroup::createForMaterializedView()
165+
ThreadGroupPtr ThreadGroup::createForMergeMutate(ContextPtr storage_context)
168166
{
169-
auto current_group = CurrentThread::getGroup();
170-
if (!current_group)
171-
return nullptr;
172-
173-
auto group = std::make_shared<ThreadGroup>(current_group);
174-
group->memory_tracker.setDescription("MaterializeView");
167+
auto group = create(storage_context);
168+
group->memory_tracker.setDescription("Background process (mutate/merge)");
169+
group->memory_tracker.setParent(&background_memory_tracker);
175170
return group;
176171
}
177172

173+
ThreadGroupPtr ThreadGroup::createForMaterializedView(ContextPtr context)
174+
{
175+
ThreadGroupPtr res_group;
176+
if (auto current_group = CurrentThread::getGroup())
177+
res_group = std::make_shared<ThreadGroup>(current_group);
178+
else
179+
res_group = create(context);
180+
res_group->memory_tracker.setDescription("MaterializeView");
181+
return res_group;
182+
}
183+
178184
void ThreadGroup::attachQueryForLog(const String & query_, UInt64 normalized_hash)
179185
{
180186
auto hash = normalized_hash ? normalized_hash : normalizedQueryHash(query_, false);

src/Storages/MergeTree/MergeList.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ MergeListElement::MergeListElement(const StorageID & table_id_, FutureMergedMuta
7474
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got {} source parts for mutation {}: {}", future_part->parts.size(),
7575
result_part_info.getPartNameV1(), fmt::join(source_part_names, ", "));
7676

77-
thread_group = ThreadGroup::createForBackgroundProcess(context);
77+
thread_group = ThreadGroup::createForMergeMutate(context);
7878
}
7979

8080
MergeInfo MergeListElement::getInfo() const

src/Storages/StorageBuffer.cpp

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1089,18 +1089,13 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
10891089

10901090
void StorageBuffer::backgroundFlush()
10911091
{
1092+
try
10921093
{
1093-
auto thread_group = ThreadGroup::createForBackgroundProcess(getContext());
1094-
ThreadGroupSwitcher group_switcher(thread_group, "BufferBgrFlush");
1095-
1096-
try
1097-
{
1098-
flushAllBuffers(true);
1099-
}
1100-
catch (...)
1101-
{
1102-
tryLogCurrentException(__PRETTY_FUNCTION__);
1103-
}
1094+
flushAllBuffers(true);
1095+
}
1096+
catch (...)
1097+
{
1098+
tryLogCurrentException(__PRETTY_FUNCTION__);
11041099
}
11051100

11061101
reschedule();

0 commit comments

Comments
 (0)