Skip to content

Commit 8a3179e

Browse files
authored
Merge branch 'antalya-25.8' into max_bytes_rows_per_file
2 parents 20376f2 + d4794d3 commit 8a3179e

File tree

12 files changed

+93
-86
lines changed

12 files changed

+93
-86
lines changed

.github/workflows/master.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4178,7 +4178,7 @@ jobs:
41784178
secrets: inherit
41794179
with:
41804180
runner_type: altinity-on-demand, altinity-regression-tester
4181-
commit: 00a50b5b8f12c9c603b9a3fa17dd2c5ea2012cac
4181+
commit: 8d2c6d2072771d450a47faca38966da7493821e1
41824182
arch: release
41834183
build_sha: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.sha || github.sha }}
41844184
timeout_minutes: 300
@@ -4190,7 +4190,7 @@ jobs:
41904190
secrets: inherit
41914191
with:
41924192
runner_type: altinity-on-demand, altinity-regression-tester-aarch64
4193-
commit: 00a50b5b8f12c9c603b9a3fa17dd2c5ea2012cac
4193+
commit: 8d2c6d2072771d450a47faca38966da7493821e1
41944194
arch: aarch64
41954195
build_sha: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.sha || github.sha }}
41964196
timeout_minutes: 300

.github/workflows/pull_request.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4134,7 +4134,7 @@ jobs:
41344134
secrets: inherit
41354135
with:
41364136
runner_type: altinity-on-demand, altinity-regression-tester
4137-
commit: 00a50b5b8f12c9c603b9a3fa17dd2c5ea2012cac
4137+
commit: 8d2c6d2072771d450a47faca38966da7493821e1
41384138
arch: release
41394139
build_sha: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.sha || github.sha }}
41404140
timeout_minutes: 300
@@ -4146,7 +4146,7 @@ jobs:
41464146
secrets: inherit
41474147
with:
41484148
runner_type: altinity-on-demand, altinity-regression-tester-aarch64
4149-
commit: 00a50b5b8f12c9c603b9a3fa17dd2c5ea2012cac
4149+
commit: 8d2c6d2072771d450a47faca38966da7493821e1
41504150
arch: aarch64
41514151
build_sha: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.sha || github.sha }}
41524152
timeout_minutes: 300

ci/praktika/yaml_additional_templates.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class AltinityWorkflowTemplates:
3535
echo "Workflow Run Report: [View Report]($REPORT_LINK)" >> $GITHUB_STEP_SUMMARY
3636
"""
3737
# Additional jobs
38-
REGRESSION_HASH = "00a50b5b8f12c9c603b9a3fa17dd2c5ea2012cac"
38+
REGRESSION_HASH = "8d2c6d2072771d450a47faca38966da7493821e1"
3939
ALTINITY_JOBS = {
4040
"GrypeScan": r"""
4141
GrypeScanServer:

src/Core/Settings.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7111,9 +7111,6 @@ Use Shuffle aggregation strategy instead of PartialAggregation + Merge in distri
71117111
)", EXPERIMENTAL) \
71127112
DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"(
71137113
Allow Iceberg read optimization based on Iceberg metadata.
7114-
)", EXPERIMENTAL) \
7115-
DECLARE(Bool, allow_retries_in_cluster_requests, false, R"(
7116-
Allow retries in cluster request, when one node goes offline
71177114
)", EXPERIMENTAL) \
71187115
DECLARE(Bool, object_storage_remote_initiator, false, R"(
71197116
Execute request to object storage as remote on one of object_storage_cluster nodes.
@@ -7236,6 +7233,7 @@ Sets the evaluation time to be used with promql dialect. 'auto' means the curren
72367233
MAKE_OBSOLETE(M, Bool, allow_experimental_shared_set_join, true) \
72377234
MAKE_OBSOLETE(M, UInt64, min_external_sort_block_bytes, 100_MiB) \
72387235
MAKE_OBSOLETE(M, UInt64, distributed_cache_read_alignment, 0) \
7236+
MAKE_OBSOLETE(M, Bool, allow_retries_in_cluster_requests, false) \
72397237
/** The section above is for obsolete settings. Do not add anything there. */
72407238
#endif /// __CLION_IDE__
72417239

src/Databases/DataLake/RestCatalog.cpp

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@
2828
#include <Poco/URI.h>
2929
#include <Poco/JSON/Array.h>
3030
#include <Poco/JSON/Parser.h>
31+
#include <Poco/Net/HTTPClientSession.h>
32+
#include <Poco/Net/HTTPResponse.h>
33+
#include <Poco/Net/HTTPSClientSession.h>
34+
#include <Poco/Net/SSLManager.h>
35+
#include <Poco/StreamCopier.h>
3136

3237

3338
namespace DB::ErrorCodes
@@ -203,12 +208,11 @@ std::string RestCatalog::retrieveAccessToken() const
203208
/// 1. support oauth2-server-uri
204209
/// https://github.com/apache/iceberg/blob/918f81f3c3f498f46afcea17c1ac9cdc6913cb5c/open-api/rest-catalog-open-api.yaml#L183C82-L183C99
205210

206-
DB::HTTPHeaderEntries headers;
207-
headers.emplace_back("Content-Type", "application/x-www-form-urlencoded");
208-
headers.emplace_back("Accepts", "application/json; charset=UTF-8");
209-
210211
Poco::URI url;
211212
DB::ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback;
213+
size_t body_size = 0;
214+
String body;
215+
212216
if (oauth_server_uri.empty() && !oauth_server_use_request_body)
213217
{
214218
url = Poco::URI(base_url / oauth_tokens_endpoint);
@@ -223,11 +227,20 @@ std::string RestCatalog::retrieveAccessToken() const
223227
}
224228
else
225229
{
230+
String encoded_auth_scope;
231+
String encoded_client_id;
232+
String encoded_client_secret;
233+
Poco::URI::encode(auth_scope, auth_scope, encoded_auth_scope);
234+
Poco::URI::encode(client_id, client_id, encoded_client_id);
235+
Poco::URI::encode(client_secret, client_secret, encoded_client_secret);
236+
237+
body = fmt::format(
238+
"grant_type=client_credentials&scope={}&client_id={}&client_secret={}",
239+
encoded_auth_scope, encoded_client_id, encoded_client_secret);
240+
body_size = body.size();
226241
out_stream_callback = [&](std::ostream & os)
227242
{
228-
os << fmt::format(
229-
"grant_type=client_credentials&scope={}&client_id={}&client_secret={}",
230-
auth_scope, client_id, client_secret);
243+
os << body;
231244
};
232245

233246
if (oauth_server_uri.empty())
@@ -237,19 +250,23 @@ std::string RestCatalog::retrieveAccessToken() const
237250
}
238251

239252
const auto & context = getContext();
240-
auto wb = DB::BuilderRWBufferFromHTTP(url)
241-
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
242-
.withMethod(Poco::Net::HTTPRequest::HTTP_POST)
243-
.withSettings(context->getReadSettings())
244-
.withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings()))
245-
.withHostFilter(&context->getRemoteHostFilter())
246-
.withOutCallback(std::move(out_stream_callback))
247-
.withSkipNotFound(false)
248-
.withHeaders(headers)
249-
.create(credentials);
253+
auto timeouts = DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings());
254+
auto session = makeHTTPSession(DB::HTTPConnectionGroupType::HTTP, url, timeouts, {});
255+
256+
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, url.getPathAndQuery(),
257+
Poco::Net::HTTPMessage::HTTP_1_1);
258+
request.setContentType("application/x-www-form-urlencoded");
259+
request.setContentLength(body_size);
260+
request.set("Accept", "application/json");
261+
262+
std::ostream & os = session->sendRequest(request);
263+
out_stream_callback(os);
264+
265+
Poco::Net::HTTPResponse response;
266+
std::istream & rs = session->receiveResponse(response);
250267

251268
std::string json_str;
252-
readJSONObjectPossiblyInvalid(json_str, *wb);
269+
Poco::StreamCopier::copyToString(rs, json_str);
253270

254271
Poco::JSON::Parser parser;
255272
Poco::Dynamic::Var res_json = parser.parse(json_str);

src/QueryPipeline/RemoteQueryExecutor.cpp

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ namespace Setting
5252
extern const SettingsBool use_hedged_requests;
5353
extern const SettingsBool push_external_roles_in_interserver_queries;
5454
extern const SettingsMilliseconds parallel_replicas_connect_timeout_ms;
55-
extern const SettingsBool allow_retries_in_cluster_requests;
5655
}
5756

5857
namespace ErrorCodes
@@ -83,7 +82,6 @@ RemoteQueryExecutor::RemoteQueryExecutor(
8382
, extension(extension_)
8483
, priority_func(priority_func_)
8584
, read_packet_type_separately(context->canUseParallelReplicasOnInitiator() && !context->getSettingsRef()[Setting::use_hedged_requests])
86-
, allow_retries_in_cluster_requests(context->getSettingsRef()[Setting::allow_retries_in_cluster_requests])
8785
{
8886
if (stage == QueryProcessingStage::QueryPlan && !query_plan)
8987
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query plan is not passed for QueryPlan processing stage");
@@ -469,8 +467,7 @@ int RemoteQueryExecutor::sendQueryAsync()
469467
read_context = std::make_unique<ReadContext>(
470468
*this,
471469
/*suspend_when_query_sent*/ true,
472-
read_packet_type_separately,
473-
allow_retries_in_cluster_requests);
470+
read_packet_type_separately);
474471

475472
/// If query already sent, do nothing. Note that we cannot use sent_query flag here,
476473
/// because we can still be in process of sending scalars or external tables.
@@ -543,8 +540,7 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
543540
read_context = std::make_unique<ReadContext>(
544541
*this,
545542
/*suspend_when_query_sent*/ false,
546-
read_packet_type_separately,
547-
allow_retries_in_cluster_requests);
543+
read_packet_type_separately);
548544
recreate_read_context = false;
549545
}
550546

@@ -735,16 +731,13 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
735731
break;
736732

737733
case Protocol::Server::ConnectionLost:
738-
if (allow_retries_in_cluster_requests)
734+
if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info)
739735
{
740-
if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info)
736+
if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica))
741737
{
742-
if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica))
743-
{
744-
finished = true;
745-
extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica);
746-
return ReadResult(Block{});
747-
}
738+
finished = true;
739+
extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica);
740+
return ReadResult(Block{});
748741
}
749742
}
750743
packet.exception->rethrow();

src/QueryPipeline/RemoteQueryExecutor.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,6 @@ class RemoteQueryExecutor
339339

340340
const bool read_packet_type_separately = false;
341341

342-
const bool allow_retries_in_cluster_requests = false;
343-
344342
std::unordered_set<size_t> replica_has_processed_data;
345343

346344
/// Send all scalars to remote servers

src/QueryPipeline/RemoteQueryExecutorReadContext.cpp

Lines changed: 25 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,11 @@ namespace ErrorCodes
2222
RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(
2323
RemoteQueryExecutor & executor_,
2424
bool suspend_when_query_sent_,
25-
bool read_packet_type_separately_,
26-
bool allow_retries_in_cluster_requests_)
25+
bool read_packet_type_separately_)
2726
: AsyncTaskExecutor(std::make_unique<Task>(*this))
2827
, executor(executor_)
2928
, suspend_when_query_sent(suspend_when_query_sent_)
3029
, read_packet_type_separately(read_packet_type_separately_)
31-
, allow_retries_in_cluster_requests(allow_retries_in_cluster_requests_)
3230
{
3331
if (-1 == pipe2(pipe_fd, O_NONBLOCK))
3432
throw ErrnoException(ErrorCodes::CANNOT_OPEN_FILE, "Cannot create pipe");
@@ -63,46 +61,38 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus
6361
{
6462
while (true)
6563
{
66-
try
67-
{
68-
read_context.has_read_packet_part = PacketPart::None;
69-
70-
if (read_context.read_packet_type_separately)
71-
{
72-
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
73-
read_context.has_read_packet_part = PacketPart::Type;
74-
suspend_callback();
75-
}
76-
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
77-
read_context.has_read_packet_part = PacketPart::Body;
78-
if (read_context.packet.type == Protocol::Server::Data)
79-
read_context.has_data_packets = true;
80-
}
81-
catch (const Exception & e)
64+
read_context.has_read_packet_part = PacketPart::None;
65+
66+
if (read_context.read_packet_type_separately)
8267
{
83-
/// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes.
84-
/// If initiator did not process any data packets before, this fact can be ignored.
85-
/// Unprocessed tasks will be executed on other nodes.
86-
if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
87-
&& !read_context.has_data_packets.load() && read_context.executor.skipUnavailableShards())
88-
{
89-
read_context.has_read_packet_part = PacketPart::None;
90-
}
91-
else
92-
throw;
68+
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
69+
read_context.has_read_packet_part = PacketPart::Type;
70+
suspend_callback();
9371
}
72+
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
73+
read_context.has_read_packet_part = PacketPart::Body;
74+
if (read_context.packet.type == Protocol::Server::Data)
75+
read_context.has_data_packets = true;
9476

9577
suspend_callback();
9678
}
9779
}
98-
catch (const Exception &)
80+
catch (const Exception & e)
9981
{
100-
if (!read_context.allow_retries_in_cluster_requests)
82+
/// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes.
83+
/// If initiator did not process any data packets before, this fact can be ignored.
84+
/// Unprocessed tasks will be executed on other nodes.
85+
if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
86+
&& !read_context.has_data_packets.load()
87+
&& read_context.executor.skipUnavailableShards())
88+
{
89+
read_context.packet.type = Protocol::Server::ConnectionLost;
90+
read_context.packet.exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode());
91+
read_context.has_read_packet_part = PacketPart::Body;
92+
suspend_callback();
93+
}
94+
else
10195
throw;
102-
read_context.packet.type = Protocol::Server::ConnectionLost;
103-
read_context.packet.exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode());
104-
read_context.has_read_packet_part = PacketPart::Body;
105-
suspend_callback();
10696
}
10797
}
10898

src/QueryPipeline/RemoteQueryExecutorReadContext.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
2828
explicit RemoteQueryExecutorReadContext(
2929
RemoteQueryExecutor & executor_,
3030
bool suspend_when_query_sent_,
31-
bool read_packet_type_separately_,
32-
bool allow_retries_in_cluster_requests_);
31+
bool read_packet_type_separately_);
3332

3433
~RemoteQueryExecutorReadContext() override;
3534

@@ -112,7 +111,6 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
112111
bool suspend_when_query_sent = false;
113112
bool is_query_sent = false;
114113
const bool read_packet_type_separately = false;
115-
const bool allow_retries_in_cluster_requests = false;
116114
};
117115

118116
}

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,15 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
396396
}
397397

398398
ASTPtr object_storage_type_arg;
399-
configuration->extractDynamicStorageType(args, context, &object_storage_type_arg);
399+
if (cluster_name_in_settings)
400+
configuration->extractDynamicStorageType(args, context, &object_storage_type_arg);
401+
else
402+
{
403+
auto args_copy = args;
404+
// Remove cluster name from args to avoid confusing cluster name and named collection name
405+
args_copy.erase(args_copy.begin());
406+
configuration->extractDynamicStorageType(args_copy, context, &object_storage_type_arg);
407+
}
400408
ASTPtr settings_temporary_storage = nullptr;
401409
for (auto * it = args.begin(); it != args.end(); ++it)
402410
{

0 commit comments

Comments
 (0)