Skip to content

Commit 94264b1

Browse files
authored
Merge pull request ClickHouse#77578 from azat/fix-applying-async_insert-from-server
Fix applying async_insert from server (via apply_settings_from_server)
2 parents 9d48622 + a595877 commit 94264b1

File tree

17 files changed

+117
-94
lines changed

17 files changed

+117
-94
lines changed

programs/client/Client.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ Client::Client()
7171

7272
Client::~Client() = default;
7373

74-
void Client::processError(const String & query) const
74+
void Client::processError(std::string_view query) const
7575
{
7676
if (server_exception)
7777
{

programs/client/Client.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ class Client : public ClientApplicationBase
2727
protected:
2828
Poco::Util::LayeredConfiguration & getClientConfiguration() override;
2929

30-
bool processWithFuzzing(const String & full_query) override;
30+
bool processWithFuzzing(std::string_view full_query) override;
3131
bool buzzHouse() override;
3232
std::optional<bool> processFuzzingStep(const String & query_to_execute, const ASTPtr & parsed_query, bool permissive);
3333

3434
void connect() override;
3535

36-
void processError(const String & query) const override;
36+
void processError(std::string_view query) const override;
3737

3838
String getName() const override { return "client"; }
3939

programs/client/FuzzLoop.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ extern const int BUZZHOUSE;
4747

4848
std::optional<bool> Client::processFuzzingStep(const String & query_to_execute, const ASTPtr & parsed_query, const bool permissive)
4949
{
50-
processParsedSingleQuery(query_to_execute, query_to_execute, parsed_query);
50+
bool async_insert = false;
51+
processParsedSingleQuery(query_to_execute, parsed_query, async_insert);
5152

5253
const auto * exception = server_exception ? server_exception.get() : client_exception.get();
5354
// Sometimes you may get TOO_DEEP_RECURSION from the server,
@@ -105,7 +106,7 @@ std::optional<bool> Client::processFuzzingStep(const String & query_to_execute,
105106
}
106107

107108
/// Returns false when server is not available.
108-
bool Client::processWithFuzzing(const String & full_query)
109+
bool Client::processWithFuzzing(std::string_view full_query)
109110
{
110111
ASTPtr orig_ast;
111112

programs/local/LocalServer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ Poco::Util::LayeredConfiguration & LocalServer::getClientConfiguration()
150150
return config();
151151
}
152152

153-
void LocalServer::processError(const String &) const
153+
void LocalServer::processError(std::string_view) const
154154
{
155155
if (ignore_error)
156156
return;

programs/local/LocalServer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class LocalServer : public ClientApplicationBase, public Loggers
3434

3535
void connect() override;
3636

37-
void processError(const String & query) const override;
37+
void processError(std::string_view query) const override;
3838

3939
String getName() const override { return "local"; }
4040

programs/server/users.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
<profiles>
66
<!-- Default settings. -->
77
<default>
8+
<!-- <async_insert>1</async_insert> -->
89
</default>
910

1011
<!-- Profile that allows only read queries. -->

src/Client/ClientBase.cpp

Lines changed: 62 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,18 +1091,6 @@ void ClientBase::updateSuggest(const ASTPtr & ast)
10911091
suggest->addWords(std::move(new_words));
10921092
}
10931093

1094-
bool ClientBase::isSyncInsertWithData(const ASTInsertQuery & insert_query, const ContextPtr & context)
1095-
{
1096-
if (!insert_query.data)
1097-
return false;
1098-
1099-
auto settings = context->getSettingsCopy();
1100-
if (insert_query.settings_ast)
1101-
settings.applyChanges(insert_query.settings_ast->as<ASTSetQuery>()->changes);
1102-
1103-
return !settings[Setting::async_insert];
1104-
}
1105-
11061094
bool ClientBase::processTextAsSingleQuery(const String & full_query)
11071095
{
11081096
/// Some parts of a query (result output and formatting) are executed
@@ -1115,8 +1103,6 @@ bool ClientBase::processTextAsSingleQuery(const String & full_query)
11151103
if (!parsed_query)
11161104
return false;
11171105

1118-
String query_to_execute;
1119-
11201106
/// Query will be parsed before checking the result because error does not
11211107
/// always means a problem, i.e. if table already exists, and it is no a
11221108
/// huge problem if suggestion will be added even on error, since this is
@@ -1127,19 +1113,10 @@ bool ClientBase::processTextAsSingleQuery(const String & full_query)
11271113
if (suggest)
11281114
updateSuggest(parsed_query);
11291115

1130-
/// An INSERT query may have the data that follows query text.
1131-
/// Send part of the query without data, because data will be sent separately.
1132-
/// But for asynchronous inserts we don't extract data, because it's needed
1133-
/// to be done on server side in that case (for coalescing the data from multiple inserts on server side).
1134-
const auto * insert = parsed_query->as<ASTInsertQuery>();
1135-
if (insert && isSyncInsertWithData(*insert, client_context))
1136-
query_to_execute = full_query.substr(0, insert->data - full_query.data());
1137-
else
1138-
query_to_execute = full_query;
1139-
11401116
try
11411117
{
1142-
processParsedSingleQuery(full_query, query_to_execute, parsed_query, echo_queries);
1118+
bool is_async_insert_with_inlined_data = false;
1119+
processParsedSingleQuery(full_query, parsed_query, is_async_insert_with_inlined_data);
11431120
}
11441121
catch (Exception & e)
11451122
{
@@ -1155,10 +1132,8 @@ bool ClientBase::processTextAsSingleQuery(const String & full_query)
11551132
return !have_error;
11561133
}
11571134

1158-
void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr parsed_query)
1135+
void ClientBase::processOrdinaryQuery(String query, ASTPtr parsed_query)
11591136
{
1160-
auto query = query_to_execute;
1161-
11621137
/// Rewrite query only when we have query parameters.
11631138
/// Note that if query is rewritten, comments in query are lost.
11641139
/// But the user often wants to see comments in server logs, query log, processlist, etc.
@@ -1760,9 +1735,8 @@ bool isStdinNotEmptyAndValid(ReadBuffer & std_in)
17601735
}
17611736

17621737

1763-
void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr parsed_query)
1738+
void ClientBase::processInsertQuery(String query, ASTPtr parsed_query)
17641739
{
1765-
auto query = query_to_execute;
17661740
if (!query_parameters.empty()
17671741
&& connection->getServerRevision(connection_parameters.timeouts) < DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS)
17681742
{
@@ -2173,8 +2147,11 @@ void ClientBase::cancelQuery()
21732147
cancelled = true;
21742148
}
21752149

2176-
void ClientBase::processParsedSingleQuery(const String & full_query, const String & query_to_execute,
2177-
ASTPtr parsed_query, std::optional<bool> echo_query_, bool report_error)
2150+
void ClientBase::processParsedSingleQuery(
2151+
std::string_view query_,
2152+
ASTPtr parsed_query,
2153+
bool & is_async_insert_with_inlined_data,
2154+
size_t insert_query_without_data_length)
21782155
{
21792156
resetOutput();
21802157
have_error = false;
@@ -2183,13 +2160,6 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
21832160
client_exception.reset();
21842161
server_exception.reset();
21852162

2186-
if (echo_query_ && *echo_query_)
2187-
{
2188-
writeString(full_query, *std_out);
2189-
writeChar('\n', *std_out);
2190-
std_out->next();
2191-
}
2192-
21932163
if (is_interactive)
21942164
{
21952165
client_context->setCurrentQueryId("");
@@ -2258,7 +2228,8 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
22582228
if (insert && insert->select)
22592229
insert->tryFindInputFunction(input_function);
22602230

2261-
bool is_async_insert_with_inlined_data = client_context->getSettingsRef()[Setting::async_insert] && insert && insert->hasInlinedData();
2231+
/// Update async_insert after applying settings from server
2232+
is_async_insert_with_inlined_data = client_context->getSettingsRef()[Setting::async_insert] && insert && insert->hasInlinedData();
22622233

22632234
if (is_async_insert_with_inlined_data)
22642235
{
@@ -2270,16 +2241,26 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
22702241
"Processing async inserts with both inlined and external data (from stdin or infile) is not supported");
22712242
}
22722243

2244+
String query;
2245+
/// An INSERT query may have the data that follows query text.
2246+
/// Send part of the query without data, because data will be sent separately.
2247+
/// But for asynchronous inserts we don't extract data, because it's needed
2248+
/// to be done on server side in that case (for coalescing the data from multiple inserts on server side).
2249+
if (insert && insert->data && !is_async_insert_with_inlined_data && insert_query_without_data_length)
2250+
query = query_.substr(0, insert_query_without_data_length);
2251+
else
2252+
query = query_;
2253+
22732254
/// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately.
22742255
if (insert && (!insert->select || input_function) && (!is_async_insert_with_inlined_data || input_function))
22752256
{
22762257
if (input_function && insert->format.empty())
22772258
throw Exception(ErrorCodes::INVALID_USAGE_OF_INPUT, "FORMAT must be specified for function input()");
22782259

2279-
processInsertQuery(query_to_execute, parsed_query);
2260+
processInsertQuery(query, parsed_query);
22802261
}
22812262
else
2282-
processOrdinaryQuery(query_to_execute, parsed_query);
2263+
processOrdinaryQuery(query, parsed_query);
22832264
}
22842265

22852266
/// Do not change context (current DB, settings) in case of an exception.
@@ -2371,15 +2352,12 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
23712352
{
23722353
output_stream << "Processed rows: " << processed_rows << "\n";
23732354
}
2374-
2375-
if (have_error && report_error)
2376-
processError(full_query);
23772355
}
23782356

23792357

23802358
MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
23812359
const char *& this_query_begin, const char *& this_query_end, const char * all_queries_end,
2382-
String & query_to_execute, ASTPtr & parsed_query, const String & all_queries_text,
2360+
ASTPtr & parsed_query,
23832361
std::unique_ptr<Exception> & current_exception)
23842362
{
23852363
if (!is_interactive && cancelled)
@@ -2458,7 +2436,6 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
24582436
insert_ast = explain_ast->getExplainedQuery()->as<ASTInsertQuery>();
24592437
}
24602438
}
2461-
const char * query_to_execute_end = this_query_end;
24622439
if (insert_ast && insert_ast->data)
24632440
{
24642441
if (insert_ast->format == "Values")
@@ -2495,18 +2472,8 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
24952472
this_query_end = all_queries_end;
24962473
}
24972474
insert_ast->end = this_query_end;
2498-
query_to_execute_end = isSyncInsertWithData(*insert_ast, client_context) ? insert_ast->data : this_query_end;
24992475
}
25002476

2501-
query_to_execute = all_queries_text.substr(this_query_begin - all_queries_text.data(), query_to_execute_end - this_query_begin);
2502-
2503-
// Try to include the trailing comment with test hints. It is just
2504-
// a guess for now, because we don't yet know where the query ends
2505-
// if it is an INSERT query with inline data. We will do it again
2506-
// after we have processed the query. But even this guess is
2507-
// beneficial so that we see proper trailing comments in "echo" and
2508-
// server log.
2509-
adjustQueryEnd(this_query_end, all_queries_end, max_parser_depth, max_parser_backtracks);
25102477
return MultiQueryProcessingStage::EXECUTE_QUERY;
25112478
}
25122479

@@ -2546,8 +2513,6 @@ bool ClientBase::executeMultiQuery(const String & all_queries_text)
25462513
UInt32 script_query_number = 0;
25472514
UInt32 script_line_number = 0;
25482515

2549-
String full_query; // full_query is the query + inline INSERT data + trailing comments (the latter is our best guess for now).
2550-
String query_to_execute;
25512516
ASTPtr parsed_query;
25522517
std::unique_ptr<Exception> current_exception;
25532518
size_t retries_count = 0;
@@ -2556,7 +2521,7 @@ bool ClientBase::executeMultiQuery(const String & all_queries_text)
25562521
while (true)
25572522
{
25582523
auto stage = analyzeMultiQueryText(this_query_begin, this_query_end, all_queries_end,
2559-
query_to_execute, parsed_query, all_queries_text, current_exception);
2524+
parsed_query, current_exception);
25602525
switch (stage)
25612526
{
25622527
case MultiQueryProcessingStage::QUERIES_END:
@@ -2611,7 +2576,26 @@ bool ClientBase::executeMultiQuery(const String & all_queries_text)
26112576
case MultiQueryProcessingStage::EXECUTE_QUERY:
26122577
{
26132578
is_first = false;
2614-
full_query = all_queries_text.substr(this_query_begin - all_queries_text.data(), this_query_end - this_query_begin);
2579+
2580+
// Save query without trailing comment
2581+
auto query_to_execute = std::string_view(all_queries_text).substr(this_query_begin - all_queries_text.data(), this_query_end - this_query_begin);
2582+
size_t insert_query_without_data_length = 0;
2583+
if (const auto * insert = parsed_query->as<ASTInsertQuery>())
2584+
insert_query_without_data_length = insert->data - query_to_execute.data();
2585+
2586+
// Try to include the trailing comment with test hints. It is just
2587+
// a guess for now, because we don't yet know where the query ends
2588+
// if it is an INSERT query with inline data. We will do it again
2589+
// after we have processed the query. But even this guess is
2590+
// beneficial so that we see proper trailing comments in "echo" and
2591+
// server log.
2592+
{
2593+
unsigned max_parser_depth = static_cast<unsigned>(client_context->getSettingsRef()[Setting::max_parser_depth]);
2594+
unsigned max_parser_backtracks = static_cast<unsigned>(client_context->getSettingsRef()[Setting::max_parser_backtracks]);
2595+
adjustQueryEnd(this_query_end, all_queries_end, max_parser_depth, max_parser_backtracks);
2596+
}
2597+
// query + inline INSERT data + trailing comments (the latter is our best guess for now).
2598+
auto full_query = std::string_view(all_queries_text).substr(this_query_begin - all_queries_text.data(), this_query_end - this_query_begin);
26152599

26162600
++script_query_number;
26172601
script_line_number += std::count(prev_query_begin, this_query_begin, '\n');
@@ -2637,10 +2621,21 @@ bool ClientBase::executeMultiQuery(const String & all_queries_text)
26372621

26382622
// Echo all queries if asked; makes for a more readable reference file.
26392623
echo_query = test_hint.echoQueries().value_or(echo_query);
2624+
bool is_async_insert_with_inlined_data = false;
2625+
2626+
if (echo_query)
2627+
{
2628+
writeString(full_query, *std_out);
2629+
writeChar('\n', *std_out);
2630+
std_out->next();
2631+
}
26402632

26412633
try
26422634
{
2643-
processParsedSingleQuery(full_query, query_to_execute, parsed_query, echo_query, false);
2635+
processParsedSingleQuery(query_to_execute,
2636+
parsed_query,
2637+
is_async_insert_with_inlined_data,
2638+
insert_query_without_data_length);
26442639
}
26452640
catch (...)
26462641
{
@@ -2773,7 +2768,7 @@ bool ClientBase::executeMultiQuery(const String & all_queries_text)
27732768
// , where the inline data is delimited by semicolon and not by a
27742769
// newline.
27752770
auto * insert_ast = parsed_query->as<ASTInsertQuery>();
2776-
if (insert_ast && isSyncInsertWithData(*insert_ast, client_context))
2771+
if (insert_ast && insert_ast->data && !is_async_insert_with_inlined_data)
27772772
{
27782773
this_query_end = insert_ast->end;
27792774
adjustQueryEnd(
@@ -2891,6 +2886,9 @@ bool ClientBase::addMergeTreeSettings(ASTCreateQuery & ast_create)
28912886
void ClientBase::applySettingsFromServerIfNeeded()
28922887
{
28932888
const Settings & settings = client_context->getSettingsRef();
2889+
if (!settings[Setting::apply_settings_from_server])
2890+
return;
2891+
28942892
SettingsChanges changes_to_apply;
28952893
for (const SettingChange & change : settings_from_server)
28962894
{
@@ -2900,8 +2898,7 @@ void ClientBase::applySettingsFromServerIfNeeded()
29002898
changes_to_apply.push_back(change);
29012899
}
29022900

2903-
if (settings[Setting::apply_settings_from_server])
2904-
global_context->applySettingsChanges(changes_to_apply);
2901+
global_context->applySettingsChanges(changes_to_apply);
29052902
}
29062903

29072904
void ClientBase::startKeystrokeInterceptorIfExists()

src/Client/ClientBase.h

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class ClientBase
115115
/// This is the analogue of Poco::Application::config()
116116
virtual Poco::Util::LayeredConfiguration & getClientConfiguration() = 0;
117117

118-
virtual bool processWithFuzzing(const String &)
118+
virtual bool processWithFuzzing(std::string_view)
119119
{
120120
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Query processing with fuzzing is not implemented");
121121
}
@@ -126,14 +126,18 @@ class ClientBase
126126
}
127127

128128
virtual void connect() = 0;
129-
virtual void processError(const String & query) const = 0;
129+
virtual void processError(std::string_view query) const = 0;
130130
virtual String getName() const = 0;
131131

132-
void processOrdinaryQuery(const String & query_to_execute, ASTPtr parsed_query);
133-
void processInsertQuery(const String & query_to_execute, ASTPtr parsed_query);
132+
void processOrdinaryQuery(String query, ASTPtr parsed_query);
133+
void processInsertQuery(String query, ASTPtr parsed_query);
134134

135-
void processParsedSingleQuery(const String & full_query, const String & query_to_execute,
136-
ASTPtr parsed_query, std::optional<bool> echo_query_ = {}, bool report_error = false);
135+
void processParsedSingleQuery(
136+
std::string_view query_,
137+
ASTPtr parsed_query,
138+
bool & is_async_insert_with_inlined_data,
139+
// to handle INSERT w/o async_insert
140+
size_t insert_query_without_data_length = 0);
137141

138142
static void adjustQueryEnd(const char *& this_query_end, const char * all_queries_end, uint32_t max_parser_depth, uint32_t max_parser_backtracks);
139143
virtual void setupSignalHandler() = 0;
@@ -143,7 +147,7 @@ class ClientBase
143147
bool executeMultiQuery(const String & all_queries_text);
144148
MultiQueryProcessingStage analyzeMultiQueryText(
145149
const char *& this_query_begin, const char *& this_query_end, const char * all_queries_end,
146-
String & query_to_execute, ASTPtr & parsed_query, const String & all_queries_text,
150+
ASTPtr & parsed_query,
147151
std::unique_ptr<Exception> & current_exception);
148152

149153
void clearTerminal();

0 commit comments

Comments
 (0)