Skip to content

Commit 8d68102

Browse files
authored
Merge pull request ClickHouse#79963 from ClickHouse/revert-79898-revert-pushing-to-views
Revert "Revert "rework pushing to views ClickHouse#77309""
2 parents f54bad7 + ac468ea commit 8d68102

File tree

102 files changed

+3442
-2229
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

102 files changed

+3442
-2229
lines changed

src/Client/LocalConnection.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,23 @@ bool LocalConnection::poll(size_t)
466466
return true;
467467
}
468468

469+
// pushing executors have to be finished before the final stats are sent
470+
if (state->is_finished)
471+
{
472+
if (state->executor)
473+
{
474+
// no op
475+
}
476+
else if (state->pushing_async_executor)
477+
{
478+
state->pushing_async_executor->finish();
479+
}
480+
else if (state->pushing_executor)
481+
{
482+
state->pushing_executor->finish();
483+
}
484+
}
485+
469486
if (state->is_finished && !state->sent_totals)
470487
{
471488
state->sent_totals = true;
@@ -514,7 +531,7 @@ bool LocalConnection::poll(size_t)
514531
{
515532
state->sent_profile_events = true;
516533

517-
if (send_profile_events && state->executor)
534+
if (send_profile_events && (state->executor || state->pushing_async_executor || state->pushing_executor))
518535
{
519536
sendProfileEvents();
520537
return true;

src/Common/ErrorCodes.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,7 @@
625625
M(743, ICEBERG_SPECIFICATION_VIOLATION) \
626626
M(744, SESSION_ID_EMPTY) \
627627
M(745, SERVER_OVERLOADED) \
628+
M(746, DEPENDENCIES_NOT_FOUND) \
628629
\
629630
M(900, DISTRIBUTED_CACHE_ERROR) \
630631
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

src/Common/Exception.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -593,15 +593,15 @@ void tryLogException(std::exception_ptr e, const char * log_name, const std::str
593593
}
594594
}
595595

596-
void tryLogException(std::exception_ptr e, LoggerPtr logger, const std::string & start_of_message)
596+
void tryLogException(std::exception_ptr e, LoggerPtr logger, const std::string & start_of_message, LogsLevel level)
597597
{
598598
try
599599
{
600600
std::rethrow_exception(std::move(e)); // NOLINT
601601
}
602602
catch (...)
603603
{
604-
tryLogCurrentException(logger, start_of_message);
604+
tryLogCurrentException(logger, start_of_message, level);
605605
}
606606
}
607607

src/Common/Exception.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ struct ExecutionStatus
326326

327327
/// TODO: Logger leak constexpr overload
328328
void tryLogException(std::exception_ptr e, const char * log_name, const std::string & start_of_message = "");
329-
void tryLogException(std::exception_ptr e, LoggerPtr logger, const std::string & start_of_message = "");
329+
void tryLogException(std::exception_ptr e, LoggerPtr logger, const std::string & start_of_message = "", LogsLevel level = LogsLevel::error);
330330
void tryLogException(std::exception_ptr e, const AtomicLogger & logger, const std::string & start_of_message = "");
331331

332332
std::string getExceptionMessage(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace = false);

src/Common/Logger.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class Logger;
1414
using LoggerPtr = std::shared_ptr<Logger>;
1515
}
1616

17-
using LoggerPtr = std::shared_ptr<Poco::Logger>;
17+
using LoggerPtr = Poco::LoggerPtr;
1818
using LoggerRawPtr = Poco::Logger *;
1919

2020
/** RAII wrappers around Poco/Logger.h.

src/Common/MemorySpillScheduler.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <atomic>
4+
#include <memory>
45
#include <mutex>
56
#include <unordered_map>
67
#include <base/types.h>
@@ -21,6 +22,8 @@ struct ProcessorMemoryStats
2122
class MemorySpillScheduler
2223
{
2324
public:
25+
using Ptr = std::shared_ptr<MemorySpillScheduler>;
26+
2427
explicit MemorySpillScheduler(bool enable_ = false) : enable(enable_) {}
2528
~MemorySpillScheduler() = default;
2629

src/Common/ThreadStatus.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ static thread_local bool has_alt_stack = false;
107107

108108
ThreadGroup::ThreadGroup()
109109
: master_thread_id(CurrentThread::get().thread_id)
110-
, memory_spill_scheduler(false)
110+
, memory_spill_scheduler(std::make_shared<MemorySpillScheduler>(false))
111111
{}
112112

113113
ThreadStatus::ThreadStatus(bool check_current_thread_on_destruction_)

src/Common/ThreadStatus.h

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class ThreadGroup
6969
ThreadGroup();
7070
using FatalErrorCallback = std::function<void()>;
7171
explicit ThreadGroup(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {});
72+
explicit ThreadGroup(ThreadGroupPtr parent);
7273

7374
/// The first thread created this thread group
7475
const UInt64 master_thread_id;
@@ -79,7 +80,7 @@ class ThreadGroup
7980

8081
const FatalErrorCallback fatal_error_callback;
8182

82-
MemorySpillScheduler memory_spill_scheduler;
83+
MemorySpillScheduler::Ptr memory_spill_scheduler;
8384
ProfileEvents::Counters performance_counters{VariableContext::Process};
8485
MemoryTracker memory_tracker{VariableContext::Process};
8586

@@ -118,11 +119,14 @@ class ThreadGroup
118119

119120
static ThreadGroupPtr createForBackgroundProcess(ContextPtr storage_context);
120121

122+
static ThreadGroupPtr createForMaterializedView();
123+
121124
std::vector<UInt64> getInvolvedThreadIds() const;
122125
size_t getPeakThreadsUsage() const;
126+
UInt64 getThreadsTotalElapsedMs() const;
123127

124128
void linkThread(UInt64 thread_id);
125-
void unlinkThread();
129+
void unlinkThread(UInt64 elapsed_thread_counter_ms);
126130

127131
private:
128132
mutable std::mutex mutex;
@@ -138,6 +142,8 @@ class ThreadGroup
138142

139143
/// Peak threads count in the group
140144
size_t peak_threads_usage TSA_GUARDED_BY(mutex) = 0;
145+
146+
UInt64 elapsed_total_threads_counter_ms TSA_GUARDED_BY(mutex) = 0;
141147
};
142148

143149
/**
@@ -240,10 +246,13 @@ class ThreadStatus : public boost::noncopyable
240246
UInt64 microseconds() const;
241247
UInt64 seconds() const;
242248

249+
UInt64 elapsedMilliseconds() const;
250+
UInt64 elapsedMilliseconds(const TimePoint & current) const;
251+
243252
std::chrono::time_point<std::chrono::system_clock> point;
244253
};
245254

246-
TimePoint query_start_time{};
255+
TimePoint thread_attach_time{};
247256

248257
// CPU and Real time query profilers
249258
std::unique_ptr<QueryProfilerReal> query_profiler_real;
@@ -255,9 +264,6 @@ class ThreadStatus : public boost::noncopyable
255264
Stopwatch stopwatch{CLOCK_MONOTONIC_COARSE};
256265
UInt64 last_performance_counters_update_time = 0;
257266

258-
/// See setInternalThread()
259-
bool internal_thread = false;
260-
261267
/// This is helpful for cut linking dependencies for clickhouse_common_io
262268
using Deleter = std::function<void()>;
263269
Deleter deleter;
@@ -277,22 +283,6 @@ class ThreadStatus : public boost::noncopyable
277283
ContextPtr getQueryContext() const;
278284
ContextPtr getGlobalContext() const;
279285

280-
/// "Internal" ThreadStatus is used for materialized views for separate
281-
/// tracking into system.query_views_log
282-
///
283-
/// You can have multiple internal threads, but only one non-internal with
284-
/// the same thread_id.
285-
///
286-
/// "Internal" thread:
287-
/// - cannot have query profiler
288-
/// since the running (main query) thread should already have one
289-
/// - should not try to obtain latest counter on detach
290-
/// because detaching of such threads will be done from a different
291-
/// thread_id, and some counters are not available (i.e. getrusage()),
292-
/// but anyway they are accounted correctly in the main ThreadStatus of a
293-
/// query.
294-
void setInternalThread();
295-
296286
/// Attaches slave thread to existing thread group
297287
void attachToGroup(const ThreadGroupPtr & thread_group_, bool check_detached = true);
298288

src/Interpreters/CancellationChecker.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ bool CancellationChecker::removeQueryFromSet(std::shared_ptr<QueryStatus> query)
6262

6363
if (it != querySet.end())
6464
{
65-
LOG_TEST(log, "Removing query {} from done tasks", query->getInfo().query);
65+
LOG_TEST(log, "Removing query {} from done tasks", query->getClientInfo().current_query_id);
6666
querySet.erase(it);
6767
return true;
6868
}
@@ -74,7 +74,7 @@ void CancellationChecker::appendTask(const std::shared_ptr<QueryStatus> & query,
7474
{
7575
if (timeout <= 0) // Avoid cases when the timeout is less or equal zero
7676
{
77-
LOG_TEST(log, "Did not add the task because the timeout is 0. Query: {}", query->getInfo().query);
77+
LOG_TEST(log, "Did not add the task because the timeout is 0, query_id: {}", query->getClientInfo().current_query_id);
7878
return;
7979
}
8080
std::unique_lock<std::mutex> lock(m);
@@ -122,10 +122,11 @@ void CancellationChecker::workerFunction()
122122

123123
if ((end_time_ms <= now_ms && duration_milliseconds.count() != 0))
124124
{
125-
LOG_TRACE(log, "Cancelling the task because of the timeout: {} ms, query: {}", duration, next_task.query->getInfo().query);
125+
LOG_DEBUG(log, "Cancelling the task because of the timeout: {} ms, query_id: {}",
126+
duration, next_task.query->getClientInfo().current_query_id);
127+
126128
cancelTask(next_task);
127129
querySet.erase(next_task);
128-
129130
continue;
130131
}
131132
}

src/Interpreters/Context.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@
112112
#include <Common/logger_useful.h>
113113
#include <Common/RemoteHostFilter.h>
114114
#include <Common/HTTPHeaderFilter.h>
115+
#include <Interpreters/StorageID.h>
115116
#include <Interpreters/SystemLog.h>
116117
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
117118
#include <Interpreters/AsynchronousInsertQueue.h>
@@ -2172,6 +2173,12 @@ bool Context::hasScalar(const String & name) const
21722173
return scalars.contains(name);
21732174
}
21742175

2176+
void Context::addQueryAccessInfo(
2177+
const StorageID & table_id,
2178+
const Names & column_names)
2179+
{
2180+
addQueryAccessInfo(backQuoteIfNeed(table_id.getDatabaseName()), table_id.getFullTableName(), column_names);
2181+
}
21752182

21762183
void Context::addQueryAccessInfo(
21772184
const String & quoted_database_name,

0 commit comments

Comments
 (0)