Skip to content

Commit bbe007e

Browse files
committed
Fix file identifier in rescheduleTasksFromReplica
1 parent c71caec commit bbe007e

File tree

2 files changed

+25
-17
lines changed

2 files changed

+25
-17
lines changed

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb
7171
auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica);
7272
if (processed_file_list_ptr == replica_to_files_to_be_processed.end())
7373
{ // It is possible that replica was lost after check in the begining of the method
74-
auto file_identifier = file->getAbsolutePath().value_or(file->getPath());
74+
auto file_identifier = getFileIdentifier(file);
7575
auto file_replica_idx = getReplicaForFile(file_identifier);
7676
unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx));
7777
connection_to_files[file_replica_idx].push_back(file);
@@ -136,7 +136,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t
136136
auto next_file = files.back();
137137
files.pop_back();
138138

139-
auto file_identifier = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getIdentifier();
139+
auto file_identifier = getFileIdentifier(next_file);
140140
auto it = unprocessed_files.find(file_identifier);
141141
if (it == unprocessed_files.end())
142142
continue;
@@ -194,18 +194,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter
194194
}
195195
}
196196

197-
String file_identifier;
198-
if (send_over_whole_archive && object_info->isArchive())
199-
{
200-
file_identifier = object_info->getPathOrPathToArchiveIfArchive();
201-
LOG_TEST(log, "Will send over the whole archive {} to replicas. "
202-
"This will be suboptimal, consider turning on "
203-
"cluster_function_process_archive_on_multiple_nodes setting", file_identifier);
204-
}
205-
else
206-
{
207-
file_identifier = object_info->getIdentifier();
208-
}
197+
String file_identifier = getFileIdentifier(object_info, true);
209198

210199
size_t file_replica_idx;
211200

@@ -269,7 +258,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s
269258
auto next_file = it->second.first;
270259
unprocessed_files.erase(it);
271260

272-
auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getAbsolutePath().value_or(next_file->getPath());
261+
auto file_path = getFileIdentifier(next_file);
273262
LOG_TRACE(
274263
log,
275264
"Iterator exhausted. Assigning unprocessed file {} to replica {} from matched replica {}",
@@ -333,10 +322,27 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_
333322
replica_to_files_to_be_processed.erase(number_of_current_replica);
334323
for (const auto & file : files)
335324
{
336-
auto file_replica_idx = getReplicaForFile(file->getAbsolutePath().value_or(file->getPath()));
337-
unprocessed_files.emplace(file->getAbsolutePath().value_or(file->getPath()), std::make_pair(file, file_replica_idx));
325+
auto file_identifier = getFileIdentifier(file);
326+
auto file_replica_idx = getReplicaForFile(file_identifier);
327+
unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx));
338328
connection_to_files[file_replica_idx].push_back(file);
339329
}
340330
}
341331

332+
String StorageObjectStorageStableTaskDistributor::getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log) const
333+
{
334+
if (send_over_whole_archive && file_object->isArchive())
335+
{
336+
auto file_identifier = file_object->getPathOrPathToArchiveIfArchive();
337+
if (write_to_log)
338+
{
339+
LOG_TEST(log, "Will send over the whole archive {} to replicas. "
340+
"This will be suboptimal, consider turning on "
341+
"cluster_function_process_archive_on_multiple_nodes setting", file_identifier);
342+
}
343+
return file_identifier;
344+
}
345+
return file_object->getIdentifier();
346+
}
347+
342348
}

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ class StorageObjectStorageStableTaskDistributor
4040

4141
void saveLastNodeActivity(size_t number_of_current_replica);
4242

43+
String getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log = false) const;
44+
4345
const std::shared_ptr<IObjectIterator> iterator;
4446
const bool send_over_whole_archive;
4547

0 commit comments

Comments
 (0)