Skip to content

Commit 8c154f2

Browse files
Revert "Merge pull request ClickHouse#77309 from ClickHouse/chesema-deduplicate-sync-inserts"
This reverts commit 120768e, reversing changes made to 0d42a5c.
1 parent cf0e9f2 commit 8c154f2

File tree

96 files changed

+2216
-1832
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

96 files changed

+2216
-1832
lines changed

src/Client/LocalConnection.cpp

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -466,23 +466,6 @@ bool LocalConnection::poll(size_t)
466466
return true;
467467
}
468468

469-
// pushing executors have to be finished before the final stats are sent
470-
if (state->is_finished)
471-
{
472-
if (state->executor)
473-
{
474-
// no op
475-
}
476-
else if (state->pushing_async_executor)
477-
{
478-
state->pushing_async_executor->finish();
479-
}
480-
else if (state->pushing_executor)
481-
{
482-
state->pushing_executor->finish();
483-
}
484-
}
485-
486469
if (state->is_finished && !state->sent_totals)
487470
{
488471
state->sent_totals = true;
@@ -531,7 +514,7 @@ bool LocalConnection::poll(size_t)
531514
{
532515
state->sent_profile_events = true;
533516

534-
if (send_profile_events && (state->executor || state->pushing_async_executor || state->pushing_executor))
517+
if (send_profile_events && state->executor)
535518
{
536519
sendProfileEvents();
537520
return true;

src/Common/ErrorCodes.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -625,7 +625,6 @@
625625
M(743, ICEBERG_SPECIFICATION_VIOLATION) \
626626
M(744, SESSION_ID_EMPTY) \
627627
M(745, SERVER_OVERLOADED) \
628-
M(746, DEPENDENCIES_NOT_FOUND) \
629628
\
630629
M(900, DISTRIBUTED_CACHE_ERROR) \
631630
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

src/Common/Exception.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -593,15 +593,15 @@ void tryLogException(std::exception_ptr e, const char * log_name, const std::str
593593
}
594594
}
595595

596-
void tryLogException(std::exception_ptr e, LoggerPtr logger, const std::string & start_of_message, LogsLevel level)
596+
void tryLogException(std::exception_ptr e, LoggerPtr logger, const std::string & start_of_message)
597597
{
598598
try
599599
{
600600
std::rethrow_exception(std::move(e)); // NOLINT
601601
}
602602
catch (...)
603603
{
604-
tryLogCurrentException(logger, start_of_message, level);
604+
tryLogCurrentException(logger, start_of_message);
605605
}
606606
}
607607

src/Common/Exception.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ struct ExecutionStatus
326326

327327
/// TODO: Logger leak constexpr overload
328328
void tryLogException(std::exception_ptr e, const char * log_name, const std::string & start_of_message = "");
329-
void tryLogException(std::exception_ptr e, LoggerPtr logger, const std::string & start_of_message = "", LogsLevel level = LogsLevel::error);
329+
void tryLogException(std::exception_ptr e, LoggerPtr logger, const std::string & start_of_message = "");
330330
void tryLogException(std::exception_ptr e, const AtomicLogger & logger, const std::string & start_of_message = "");
331331

332332
std::string getExceptionMessage(const Exception & e, bool with_stacktrace, bool check_embedded_stacktrace = false);

src/Common/Logger.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class Logger;
1414
using LoggerPtr = std::shared_ptr<Logger>;
1515
}
1616

17-
using LoggerPtr = Poco::LoggerPtr;
17+
using LoggerPtr = std::shared_ptr<Poco::Logger>;
1818
using LoggerRawPtr = Poco::Logger *;
1919

2020
/** RAII wrappers around Poco/Logger.h.

src/Common/MemorySpillScheduler.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#pragma once
22

33
#include <atomic>
4-
#include <memory>
54
#include <mutex>
65
#include <unordered_map>
76
#include <base/types.h>
@@ -22,8 +21,6 @@ struct ProcessorMemoryStats
2221
class MemorySpillScheduler
2322
{
2423
public:
25-
using Ptr = std::shared_ptr<MemorySpillScheduler>;
26-
2724
explicit MemorySpillScheduler(bool enable_ = false) : enable(enable_) {}
2825
~MemorySpillScheduler() = default;
2926

src/Common/ThreadStatus.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ static thread_local bool has_alt_stack = false;
107107

108108
ThreadGroup::ThreadGroup()
109109
: master_thread_id(CurrentThread::get().thread_id)
110-
, memory_spill_scheduler(std::make_shared<MemorySpillScheduler>(false))
110+
, memory_spill_scheduler(false)
111111
{}
112112

113113
ThreadStatus::ThreadStatus(bool check_current_thread_on_destruction_)

src/Common/ThreadStatus.h

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ class ThreadGroup
6969
ThreadGroup();
7070
using FatalErrorCallback = std::function<void()>;
7171
explicit ThreadGroup(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {});
72-
explicit ThreadGroup(ThreadGroupPtr parent);
7372

7473
/// The first thread created this thread group
7574
const UInt64 master_thread_id;
@@ -80,7 +79,7 @@ class ThreadGroup
8079

8180
const FatalErrorCallback fatal_error_callback;
8281

83-
MemorySpillScheduler::Ptr memory_spill_scheduler;
82+
MemorySpillScheduler memory_spill_scheduler;
8483
ProfileEvents::Counters performance_counters{VariableContext::Process};
8584
MemoryTracker memory_tracker{VariableContext::Process};
8685

@@ -119,14 +118,11 @@ class ThreadGroup
119118

120119
static ThreadGroupPtr createForBackgroundProcess(ContextPtr storage_context);
121120

122-
static ThreadGroupPtr createForMaterializedView();
123-
124121
std::vector<UInt64> getInvolvedThreadIds() const;
125122
size_t getPeakThreadsUsage() const;
126-
UInt64 getThreadsTotalElapsedMs() const;
127123

128124
void linkThread(UInt64 thread_id);
129-
void unlinkThread(UInt64 elapsed_thread_counter_ms);
125+
void unlinkThread();
130126

131127
private:
132128
mutable std::mutex mutex;
@@ -142,8 +138,6 @@ class ThreadGroup
142138

143139
/// Peak threads count in the group
144140
size_t peak_threads_usage TSA_GUARDED_BY(mutex) = 0;
145-
146-
UInt64 elapsed_total_threads_counter_ms TSA_GUARDED_BY(mutex) = 0;
147141
};
148142

149143
/**
@@ -246,13 +240,10 @@ class ThreadStatus : public boost::noncopyable
246240
UInt64 microseconds() const;
247241
UInt64 seconds() const;
248242

249-
UInt64 elapsedMilliseconds() const;
250-
UInt64 elapsedMilliseconds(const TimePoint & current) const;
251-
252243
std::chrono::time_point<std::chrono::system_clock> point;
253244
};
254245

255-
TimePoint thread_attach_time{};
246+
TimePoint query_start_time{};
256247

257248
// CPU and Real time query profilers
258249
std::unique_ptr<QueryProfilerReal> query_profiler_real;
@@ -264,6 +255,9 @@ class ThreadStatus : public boost::noncopyable
264255
Stopwatch stopwatch{CLOCK_MONOTONIC_COARSE};
265256
UInt64 last_performance_counters_update_time = 0;
266257

258+
/// See setInternalThread()
259+
bool internal_thread = false;
260+
267261
/// This is helpful for cut linking dependencies for clickhouse_common_io
268262
using Deleter = std::function<void()>;
269263
Deleter deleter;
@@ -283,6 +277,22 @@ class ThreadStatus : public boost::noncopyable
283277
ContextPtr getQueryContext() const;
284278
ContextPtr getGlobalContext() const;
285279

280+
/// "Internal" ThreadStatus is used for materialized views for separate
281+
/// tracking into system.query_views_log
282+
///
283+
/// You can have multiple internal threads, but only one non-internal with
284+
/// the same thread_id.
285+
///
286+
/// "Internal" thread:
287+
/// - cannot have query profiler
288+
/// since the running (main query) thread should already have one
289+
/// - should not try to obtain latest counter on detach
290+
/// because detaching of such threads will be done from a different
291+
/// thread_id, and some counters are not available (i.e. getrusage()),
292+
/// but anyway they are accounted correctly in the main ThreadStatus of a
293+
/// query.
294+
void setInternalThread();
295+
286296
/// Attaches slave thread to existing thread group
287297
void attachToGroup(const ThreadGroupPtr & thread_group_, bool check_detached = true);
288298

src/Interpreters/CancellationChecker.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ bool CancellationChecker::removeQueryFromSet(std::shared_ptr<QueryStatus> query)
6262

6363
if (it != querySet.end())
6464
{
65-
LOG_TEST(log, "Removing query {} from done tasks", query->getClientInfo().current_query_id);
65+
LOG_TEST(log, "Removing query {} from done tasks", query->getInfo().query);
6666
querySet.erase(it);
6767
return true;
6868
}
@@ -74,7 +74,7 @@ void CancellationChecker::appendTask(const std::shared_ptr<QueryStatus> & query,
7474
{
7575
if (timeout <= 0) // Avoid cases when the timeout is less or equal zero
7676
{
77-
LOG_TEST(log, "Did not add the task because the timeout is 0, query_id: {}", query->getClientInfo().current_query_id);
77+
LOG_TEST(log, "Did not add the task because the timeout is 0. Query: {}", query->getInfo().query);
7878
return;
7979
}
8080
std::unique_lock<std::mutex> lock(m);
@@ -122,11 +122,10 @@ void CancellationChecker::workerFunction()
122122

123123
if ((end_time_ms <= now_ms && duration_milliseconds.count() != 0))
124124
{
125-
LOG_DEBUG(log, "Cancelling the task because of the timeout: {} ms, query_id: {}",
126-
duration, next_task.query->getClientInfo().current_query_id);
127-
125+
LOG_TRACE(log, "Cancelling the task because of the timeout: {} ms, query: {}", duration, next_task.query->getInfo().query);
128126
cancelTask(next_task);
129127
querySet.erase(next_task);
128+
130129
continue;
131130
}
132131
}

src/Interpreters/Context.cpp

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@
112112
#include <Common/logger_useful.h>
113113
#include <Common/RemoteHostFilter.h>
114114
#include <Common/HTTPHeaderFilter.h>
115-
#include <Interpreters/StorageID.h>
116115
#include <Interpreters/SystemLog.h>
117116
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
118117
#include <Interpreters/AsynchronousInsertQueue.h>
@@ -2173,12 +2172,6 @@ bool Context::hasScalar(const String & name) const
21732172
return scalars.contains(name);
21742173
}
21752174

2176-
void Context::addQueryAccessInfo(
2177-
const StorageID & table_id,
2178-
const Names & column_names)
2179-
{
2180-
addQueryAccessInfo(backQuoteIfNeed(table_id.getDatabaseName()), table_id.getFullTableName(), column_names);
2181-
}
21822175

21832176
void Context::addQueryAccessInfo(
21842177
const String & quoted_database_name,

0 commit comments

Comments
 (0)