Skip to content

Commit d5d5672

Browse files
authored
Merge branch 'antalya-25.8' into bugfix/antalya-25.8/system_drop_filesystem_cache_on_cluster
2 parents 7061fbc + cc82fa1 commit d5d5672

File tree

8 files changed

+30
-8
lines changed

8 files changed

+30
-8
lines changed

src/Client/MultiplexedConnections.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ void MultiplexedConnections::sendIgnoredPartUUIDs(const std::vector<UUID> & uuid
232232
void MultiplexedConnections::sendClusterFunctionReadTaskResponse(const ClusterFunctionReadTaskResponse & response)
233233
{
234234
std::lock_guard lock(cancel_mutex);
235-
if (cancelled)
235+
if (cancelled || !current_connection || !current_connection->isConnected())
236236
return;
237237
current_connection->sendClusterFunctionReadTaskResponse(response);
238238
}
@@ -241,7 +241,7 @@ void MultiplexedConnections::sendClusterFunctionReadTaskResponse(const ClusterFu
241241
void MultiplexedConnections::sendMergeTreeReadTaskResponse(const ParallelReadResponse & response)
242242
{
243243
std::lock_guard lock(cancel_mutex);
244-
if (cancelled)
244+
if (cancelled || !current_connection || !current_connection->isConnected())
245245
return;
246246
current_connection->sendMergeTreeReadTaskResponse(response);
247247
}
@@ -527,9 +527,12 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
527527

528528
void MultiplexedConnections::invalidateReplica(ReplicaState & state)
529529
{
530+
Connection * old_connection = state.connection;
530531
state.connection = nullptr;
531532
state.pool_entry = IConnectionPool::Entry();
532533
--active_connection_count;
534+
if (current_connection == old_connection)
535+
current_connection = nullptr;
533536
}
534537

535538
void MultiplexedConnections::setAsyncCallback(AsyncCallback async_callback)

src/Interpreters/ClusterFunctionReadTask.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ namespace ErrorCodes
1919
namespace Setting
2020
{
2121
extern const SettingsBool cluster_function_process_archive_on_multiple_nodes;
22+
extern const SettingsBool allow_experimental_iceberg_read_optimization;
2223
}
2324

2425
ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr object, const ContextPtr & context)
26+
: iceberg_read_optimization_enabled(context->getSettingsRef()[Setting::allow_experimental_iceberg_read_optimization])
2527
{
2628
if (!object)
2729
throw Exception(ErrorCodes::LOGICAL_ERROR, "`object` cannot be null");
@@ -67,7 +69,8 @@ void ClusterFunctionReadTaskResponse::serialize(WriteBuffer & out, size_t protoc
6769

6870
if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA)
6971
{
70-
if (file_meta_info.has_value())
72+
/// This info is not used when optimization is disabled, so there is no need to send it.
73+
if (iceberg_read_optimization_enabled && file_meta_info.has_value())
7174
file_meta_info.value()->serialize(out);
7275
else
7376
DataFileMetaInfo().serialize(out);

src/Interpreters/ClusterFunctionReadTask.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ struct ClusterFunctionReadTaskResponse
2323
/// File's columns info
2424
std::optional<DataFileMetaInfoPtr> file_meta_info;
2525

26+
const bool iceberg_read_optimization_enabled = false;
27+
2628
/// Convert received response into ObjectInfo.
2729
ObjectInfoPtr getObjectInfo() const;
2830

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ namespace Setting
213213
extern const SettingsUInt64 min_bytes_to_use_direct_io;
214214
extern const SettingsBool export_merge_tree_part_overwrite_file_if_exists;
215215
extern const SettingsBool output_format_parallel_formatting;
216+
extern const SettingsBool output_format_parquet_parallel_encoding;
216217
}
217218

218219
namespace MergeTreeSetting
@@ -6244,7 +6245,9 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP
62446245
dest_storage->getStorageID(),
62456246
part,
62466247
query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists],
6247-
query_context->getSettingsRef()[Setting::output_format_parallel_formatting]);
6248+
query_context->getSettingsRef()[Setting::output_format_parallel_formatting],
6249+
query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding],
6250+
query_context->getSettingsRef()[Setting::max_threads]);
62486251

62496252
std::lock_guard lock(export_manifests_mutex);
62506253

@@ -6292,6 +6295,8 @@ void MergeTreeData::exportPartToTableImpl(
62926295
{
62936296
auto context_copy = Context::createCopy(local_context);
62946297
context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting);
6298+
context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_encoding);
6299+
context_copy->setSetting("max_threads", manifest.max_threads);
62956300

62966301
sink = destination_storage->import(
62976302
manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(),

src/Storages/MergeTree/MergeTreeExportManifest.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,24 @@ struct MergeTreeExportManifest
1313
const StorageID & destination_storage_id_,
1414
const DataPartPtr & data_part_,
1515
bool overwrite_file_if_exists_,
16-
bool parallel_formatting_)
16+
bool parallel_formatting_,
17+
bool parallel_formatting_parquet_,
18+
std::size_t max_threads_)
1719
: destination_storage_id(destination_storage_id_),
1820
data_part(data_part_),
1921
overwrite_file_if_exists(overwrite_file_if_exists_),
2022
parallel_formatting(parallel_formatting_),
23+
parquet_parallel_encoding(parallel_formatting_parquet_),
24+
max_threads(max_threads_),
2125
create_time(time(nullptr)) {}
2226

2327
StorageID destination_storage_id;
2428
DataPartPtr data_part;
2529
bool overwrite_file_if_exists;
2630
bool parallel_formatting;
31+
/// parquet has a different setting for parallel formatting
32+
bool parquet_parallel_encoding;
33+
std::size_t max_threads;
2734

2835
time_t create_time;
2936
mutable bool in_progress = false;

src/Storages/MergeTree/MergeTreeSequentialSource.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,8 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
169169
addThrottler(read_settings.local_throttler, context->getMergesThrottler());
170170
break;
171171
case Export:
172-
read_settings.local_throttler = context->getExportsThrottler();
172+
addThrottler(read_settings.local_throttler, context->getExportsThrottler());
173+
addThrottler(read_settings.remote_throttler, context->getExportsThrottler());
173174
break;
174175
}
175176

src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ namespace DB
5555

5656
result += raw_path;
5757

58-
if (raw_path.back() != '/')
58+
if (!raw_path.empty() && raw_path.back() != '/')
5959
{
6060
result += "/";
6161
}

src/Storages/ObjectStorage/StorageObjectStorage.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <optional>
12
#include <thread>
23
#include <Core/ColumnWithTypeAndName.h>
34
#include <Storages/ObjectStorage/StorageObjectStorage.h>
@@ -507,7 +508,7 @@ SinkToStoragePtr StorageObjectStorage::import(
507508
destination_file_path,
508509
object_storage,
509510
configuration,
510-
format_settings,
511+
std::nullopt, /// passing nullopt to force rebuild for format_settings based on query context
511512
std::make_shared<const Block>(getInMemoryMetadataPtr()->getSampleBlock()),
512513
local_context);
513514
}

0 commit comments

Comments
 (0)