Skip to content

Commit 17d8de0

Browse files
committed
Properly initialize replica info wherever we use task iterator
1 parent 28a237b commit 17d8de0

File tree

2 files changed

+17
-7
lines changed

2 files changed

+17
-7
lines changed

src/Storages/StorageDistributed.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1135,9 +1135,6 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
11351135
if (filter)
11361136
predicate = filter->getOutputs().at(0);
11371137

1138-
/// Select query is needed for pruining on virtual columns
1139-
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, 1 /* number_of_replicas */);
1140-
11411138
auto dst_cluster = getCluster();
11421139

11431140
auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
@@ -1164,8 +1161,14 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
11641161
const auto & current_settings = query_context->getSettingsRef();
11651162
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
11661163

1167-
/// Here we take addresses from destination cluster and assume source table exists on these nodes
11681164
const auto cluster = getCluster();
1165+
1166+
/// Select query is needed for pruining on virtual columns
1167+
auto number_of_replicas = static_cast<UInt64>(cluster->getShardsInfo().size());
1168+
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, number_of_replicas);
1169+
1170+
/// Here we take addresses from destination cluster and assume source table exists on these nodes
1171+
size_t replica_index = 0;
11691172
for (const auto & replicas : cluster->getShardsInfo())
11701173
{
11711174
/// Skip unavailable hosts if necessary
@@ -1174,6 +1177,8 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
11741177
/// There will be only one replica, because we consider each replica as a shard
11751178
for (const auto & try_result : try_results)
11761179
{
1180+
IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ };
1181+
11771182
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
11781183
std::vector<IConnectionPool::Entry>{try_result},
11791184
new_query_str,
@@ -1183,7 +1188,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
11831188
Scalars{},
11841189
Tables{},
11851190
QueryProcessingStage::Complete,
1186-
extension);
1191+
RemoteQueryExecutor::Extension{.task_iterator = extension.task_iterator, .replica_info = std::move(replica_info)});
11871192

11881193
QueryPipeline remote_pipeline(std::make_shared<RemoteSource>(
11891194
remote_query_executor, false, settings[Setting::async_socket_for_remote], settings[Setting::async_query_sending_for_remote]));

src/Storages/StorageReplicatedMergeTree.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5998,7 +5998,6 @@ SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, con
59985998
std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWriteFromClusterStorage(const std::shared_ptr<IStorageCluster> & src_storage_cluster, const ASTInsertQuery & query, ContextPtr local_context)
59995999
{
60006000
const auto & settings = local_context->getSettingsRef();
6001-
auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, 1 /* number_of_replicas */);
60026001

60036002
/// Here we won't check that the cluster formed from table replicas is a subset of a cluster specified in s3Cluster/hdfsCluster table function
60046003
auto src_cluster = src_storage_cluster->getCluster(local_context);
@@ -6017,6 +6016,10 @@ std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWriteFromClu
60176016
ContextMutablePtr query_context = Context::createCopy(local_context);
60186017
query_context->increaseDistributedDepth();
60196018

6019+
auto number_of_replicas = static_cast<UInt64>(src_cluster->getShardsAddresses().size());
6020+
auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, number_of_replicas);
6021+
6022+
size_t replica_index = 0;
60206023
for (const auto & replicas : src_cluster->getShardsAddresses())
60216024
{
60226025
/// There will be only one replica, because we consider each replica as a shard
@@ -6032,6 +6035,8 @@ std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWriteFromClu
60326035
node.bind_host
60336036
);
60346037

6038+
IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ };
6039+
60356040
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
60366041
connection,
60376042
query_str,
@@ -6041,7 +6046,7 @@ std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWriteFromClu
60416046
Scalars{},
60426047
Tables{},
60436048
QueryProcessingStage::Complete,
6044-
extension);
6049+
RemoteQueryExecutor::Extension{.task_iterator = extension.task_iterator, .replica_info = std::move(replica_info)});
60456050

60466051
QueryPipeline remote_pipeline(std::make_shared<RemoteSource>(
60476052
remote_query_executor, false, settings[Setting::async_socket_for_remote], settings[Setting::async_query_sending_for_remote]));

0 commit comments

Comments
 (0)