Skip to content

Commit 5576cb7

Browse files
authored
Merge pull request ClickHouse#62625 from kitaisreal/hashed-dictionary-parallel-loader-exception-safe-constructor
HashedDictionaryParallelLoader exception safe constructor
2 parents 54871f0 + 7ebaa4d commit 5576cb7

File tree

1 file changed

+29
-16
lines changed

1 file changed

+29
-16
lines changed

src/Dictionaries/HashedDictionaryParallelLoader.h

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <Dictionaries/IDictionary.h>
4+
#include <Dictionaries/DictionaryHelpers.h>
45
#include <Common/CurrentThread.h>
56
#include <Common/iota.h>
67
#include <Common/scope_guard_safe.h>
@@ -62,28 +63,40 @@ class HashedDictionaryParallelLoader : public boost::noncopyable
6263
for (size_t shard = 0; shard < shards; ++shard)
6364
{
6465
shards_queues[shard].emplace(backlog);
65-
pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()]
66+
67+
try
6668
{
67-
WorkerStatistic statistic;
68-
SCOPE_EXIT_SAFE(
69-
LOG_TRACE(dictionary.log, "Finished worker for dictionary {} shard {}, processed {} blocks, {} rows, total time {}ms",
70-
dictionary_name, shard, statistic.total_blocks, statistic.total_rows, statistic.total_elapsed_ms);
69+
pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()]
70+
{
71+
WorkerStatistic statistic;
72+
SCOPE_EXIT_SAFE(
73+
LOG_TRACE(dictionary.log, "Finished worker for dictionary {} shard {}, processed {} blocks, {} rows, total time {}ms",
74+
dictionary_name, shard, statistic.total_blocks, statistic.total_rows, statistic.total_elapsed_ms);
7175

72-
if (thread_group)
73-
CurrentThread::detachFromGroupIfNotDetached();
74-
);
76+
if (thread_group)
77+
CurrentThread::detachFromGroupIfNotDetached();
78+
);
79+
80+
/// Do not account memory that was occupied by the dictionaries for the query/user context.
81+
MemoryTrackerBlockerInThread memory_blocker;
7582

76-
/// Do not account memory that was occupied by the dictionaries for the query/user context.
77-
MemoryTrackerBlockerInThread memory_blocker;
83+
if (thread_group)
84+
CurrentThread::attachToGroupIfDetached(thread_group);
85+
setThreadName("HashedDictLoad");
7886

79-
if (thread_group)
80-
CurrentThread::attachToGroupIfDetached(thread_group);
81-
setThreadName("HashedDictLoad");
87+
LOG_TRACE(dictionary.log, "Starting worker for dictionary {}, shard {}", dictionary_name, shard);
8288

83-
LOG_TRACE(dictionary.log, "Starting worker for dictionary {}, shard {}", dictionary_name, shard);
89+
threadWorker(shard, statistic);
90+
});
91+
}
92+
catch (...)
93+
{
94+
for (size_t shard_to_finish = 0; shard_to_finish < shard; ++shard_to_finish)
95+
shards_queues[shard_to_finish]->clearAndFinish();
8496

85-
threadWorker(shard, statistic);
86-
});
97+
pool.wait();
98+
throw;
99+
}
87100
}
88101
}
89102

0 commit comments

Comments
 (0)