File tree Expand file tree Collapse file tree 1 file changed +16
-5
lines changed
src/Storages/ObjectStorage Expand file tree Collapse file tree 1 file changed +16
-5
lines changed Original file line number Diff line number Diff line change @@ -72,16 +72,27 @@ size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String
7272 return 0 ;
7373
7474 // / Rendezvous hashing
75- size_t best_id = 0 ;
76- UInt64 best_weight = sipHash64 (ids_of_nodes[0 ] + file_path);
77- for (size_t id = 1 ; id < nodes_count; ++id)
75+ auto replica = replica_to_files_to_be_processed.begin ();
76+ if (replica == replica_to_files_to_be_processed.end ())
77+ throw Exception (
78+ ErrorCodes::LOGICAL_ERROR,
79+ " No active replicas, can't find best replica for file {}" ,
80+ file_path
81+ );
82+
83+ size_t best_id = replica->first ;
84+ UInt64 best_weight = sipHash64 (ids_of_nodes[best_id] + file_path);
85+ ++replica;
86+ while (replica != replica_to_files_to_be_processed.end ())
7887 {
88+ size_t id = replica->first ;
7989 UInt64 weight = sipHash64 (ids_of_nodes[id] + file_path);
8090 if (weight > best_weight)
8191 {
8292 best_weight = weight;
8393 best_id = id;
8494 }
95+ ++replica;
8596 }
8697 return best_id;
8798}
@@ -264,9 +275,9 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_
264275 " All replicas were marked as lost"
265276 );
266277
267- for (const auto & file_path : processed_file_list_ptr->second )
268- unprocessed_files[file_path] = number_of_current_replica;
269278 replica_to_files_to_be_processed.erase (number_of_current_replica);
279+ for (const auto & file_path : processed_file_list_ptr->second )
280+ unprocessed_files[file_path] = getReplicaForFile (file_path);
270281}
271282
272283}
You can’t perform that action at this time.
0 commit comments