Skip to content

Commit 1cf5e4e

Browse files
committed
[WIP] Add stable file to node mapping in StorageObjectStorageCluster - rename node -> replica
1 parent c79f556 commit 1cf5e4e

File tree

2 files changed

+37
-37
lines changed

2 files changed

+37
-37
lines changed

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@ StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistrib
1515

1616
LOG_INFO(
1717
log,
18-
"Initialized StorageObjectStorageStableTaskDistributor to distribute files across {} unique nodes",
19-
total_nodes
18+
"Initialized StorageObjectStorageStableTaskDistributor to distribute files across {} unique replicas",
19+
total_replicas
2020
);
2121
}
2222

2323
void StorageObjectStorageStableTaskDistributor::initializeConnectionMapping(const ClusterPtr & cluster)
2424
{
25-
connection_key_to_node.clear();
26-
node_to_connection_key.clear();
25+
connection_key_to_replica.clear();
26+
replica_to_connection_key.clear();
2727

2828
const auto & addresses_with_failover = cluster->getShardsAddresses();
2929

@@ -35,52 +35,52 @@ void StorageObjectStorageStableTaskDistributor::initializeConnectionMapping(cons
3535
{
3636
String connection_key = address.host_name + ":" + std::to_string(address.port);
3737

38-
if (connection_key_to_node.contains(connection_key))
38+
if (connection_key_to_replica.contains(connection_key))
3939
continue;
4040

41-
Int32 node_idx = static_cast<Int32>(node_to_connection_key.size());
42-
connection_key_to_node[connection_key] = node_idx;
43-
node_to_connection_key.push_back(connection_key);
41+
Int32 replica_idx = static_cast<Int32>(replica_to_connection_key.size());
42+
connection_key_to_replica[connection_key] = replica_idx;
43+
replica_to_connection_key.push_back(connection_key);
4444

4545
LOG_TRACE(
4646
log,
47-
"Discovered shard {} replica with connection key {} (node_idx: {})",
47+
"Discovered shard {} replica with connection key {} (replica_idx: {})",
4848
shard_idx,
4949
connection_key,
50-
node_idx
50+
replica_idx
5151
);
5252
}
5353
}
5454

55-
total_nodes = static_cast<Int32>(node_to_connection_key.size());
55+
total_replicas = static_cast<Int32>(replica_to_connection_key.size());
5656

5757
LOG_INFO(
5858
log,
59-
"Mapping connections to {} unique nodes",
60-
total_nodes
59+
"Mapping connections to {} unique replicas",
60+
total_replicas
6161
);
6262
}
6363

6464
String StorageObjectStorageStableTaskDistributor::getNextTask(Connection * connection)
6565
{
6666
String connection_key = "default";
67-
Int32 node_idx = -1;
67+
Int32 replica_idx = -1;
6868

6969
if (connection)
7070
{
7171
connection_key = connection->getHost() + ":" + std::to_string(connection->getPort());
72-
auto it = connection_key_to_node.find(connection_key);
73-
if (it != connection_key_to_node.end())
72+
auto it = connection_key_to_replica.find(connection_key);
73+
if (it != connection_key_to_replica.end())
7474
{
75-
node_idx = it->second;
75+
replica_idx = it->second;
7676
}
7777
}
7878

7979
LOG_TRACE(
8080
log,
81-
"Received a new connection ({}, node_idx: {}) looking for a file",
81+
"Received a new connection ({}, replica_idx: {}) looking for a file",
8282
connection_key,
83-
node_idx
83+
replica_idx
8484
);
8585

8686
// 1. Check pre-queued files first
@@ -89,26 +89,26 @@ String StorageObjectStorageStableTaskDistributor::getNextTask(Connection * conne
8989
return file;
9090

9191
// 2. Try to find a matching file from the iterator
92-
file = getMatchingFileFromIterator(connection_key, node_idx);
92+
file = getMatchingFileFromIterator(connection_key, replica_idx);
9393
if (!file.empty())
9494
return file;
9595

9696
if (!unprocessed_files.empty()) {
97-
// Prevent initiator from stealing jobs from other nodes
97+
// Prevent initiator from stealing jobs from other replicas
9898
sleepForMilliseconds(50);
9999
}
100100

101101
// 3. Process unprocessed files if iterator is exhausted
102102
return getAnyUnprocessedFile(connection_key);
103103
}
104104

105-
Int32 StorageObjectStorageStableTaskDistributor::getNodeForFile(const String & file_path, Int32 total_nodes)
105+
Int32 StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path, Int32 total_replicas)
106106
{
107-
if (total_nodes <= 0)
107+
if (total_replicas <= 0)
108108
return 0;
109109

110110
UInt64 hash_value = sipHash64(file_path);
111-
return static_cast<Int32>(hash_value % total_nodes);
111+
return static_cast<Int32>(hash_value % total_replicas);
112112
}
113113

114114
String StorageObjectStorageStableTaskDistributor::getPreQueuedFile(const String & connection_key)
@@ -143,7 +143,7 @@ String StorageObjectStorageStableTaskDistributor::getPreQueuedFile(const String
143143
return "";
144144
}
145145

146-
String StorageObjectStorageStableTaskDistributor::getMatchingFileFromIterator(const String & connection_key, Int32 node_idx)
146+
String StorageObjectStorageStableTaskDistributor::getMatchingFileFromIterator(const String & connection_key, Int32 replica_idx)
147147
{
148148
while (!iterator_exhausted)
149149
{
@@ -161,9 +161,9 @@ String StorageObjectStorageStableTaskDistributor::getMatchingFileFromIterator(co
161161
}
162162

163163
String file_path = object_info->getPath();
164-
Int32 file_node_idx = getNodeForFile(file_path, total_nodes);
164+
Int32 file_replica_idx = getReplicaForFile(file_path, total_replicas);
165165

166-
if (file_node_idx == node_idx)
166+
if (file_replica_idx == replica_idx)
167167
{
168168
std::lock_guard lock(mutex);
169169
total_files_processed++;
@@ -179,14 +179,14 @@ String StorageObjectStorageStableTaskDistributor::getMatchingFileFromIterator(co
179179
return file_path;
180180
}
181181

182-
// Queue file for its assigned node
182+
// Queue file for its assigned replica
183183
{
184184
std::lock_guard lock(mutex);
185185

186186
unprocessed_files.insert(file_path);
187-
if (file_node_idx < total_nodes)
187+
if (file_replica_idx < total_replicas)
188188
{
189-
String target_connection_key = node_to_connection_key[file_node_idx];
189+
String target_connection_key = replica_to_connection_key[file_replica_idx];
190190
connection_to_files[target_connection_key].push_back(file_path);
191191
}
192192
}

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ namespace DB
1616
class StorageObjectStorageStableTaskDistributor
1717
{
1818
public:
19-
using ConnectionKeyToNodeMap = std::unordered_map<String, Int32>;
20-
using NodeToConnectionKeyMap = std::vector<String>;
19+
using ConnectionKeyToReplicaMap = std::unordered_map<String, Int32>;
20+
using ReplicaToConnectionKeyMap = std::vector<String>;
2121

2222
StorageObjectStorageStableTaskDistributor(
2323
std::shared_ptr<IObjectIterator> iterator_,
@@ -28,15 +28,15 @@ class StorageObjectStorageStableTaskDistributor
2828
private:
2929
void initializeConnectionMapping(const ClusterPtr & cluster);
3030

31-
static Int32 getNodeForFile(const String & file_path, Int32 total_nodes);
31+
static Int32 getReplicaForFile(const String & file_path, Int32 total_replicas);
3232
String getPreQueuedFile(const String & connection_key);
33-
String getMatchingFileFromIterator(const String & connection_key, Int32 node_idx);
33+
String getMatchingFileFromIterator(const String & connection_key, Int32 replica_idx);
3434
String getAnyUnprocessedFile(const String & connection_key);
3535

3636
std::shared_ptr<IObjectIterator> iterator;
37-
ConnectionKeyToNodeMap connection_key_to_node;
38-
NodeToConnectionKeyMap node_to_connection_key;
39-
Int32 total_nodes;
37+
ConnectionKeyToReplicaMap connection_key_to_replica;
38+
ReplicaToConnectionKeyMap replica_to_connection_key;
39+
Int32 total_replicas;
4040

4141
std::mutex mutex;
4242
std::unordered_map<String, std::vector<String>> connection_to_files;

0 commit comments

Comments
 (0)