Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb

saveLastNodeActivity(number_of_current_replica);

auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica);
if (processed_file_list_ptr == replica_to_files_to_be_processed.end())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Replica number {} was marked as lost, can't set task for it anymore",
number_of_current_replica
);
{
std::lock_guard lock(mutex);
auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica);
if (processed_file_list_ptr == replica_to_files_to_be_processed.end())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Replica number {} was marked as lost, can't set task for it anymore",
number_of_current_replica
);
}

// 1. Check pre-queued files first
auto file = getPreQueuedFile(number_of_current_replica);
Expand All @@ -63,7 +66,19 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb
file = getAnyUnprocessedFile(number_of_current_replica);

if (file)
processed_file_list_ptr->second.push_back(file);
{
std::lock_guard lock(mutex);
auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica);
if (processed_file_list_ptr == replica_to_files_to_be_processed.end())
{ // It is possible that replica was lost after check in the begining of the method
auto file_identifier = getFileIdentifier(file);
auto file_replica_idx = getReplicaForFile(file_identifier);
unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Stop returning task after requeueing lost replica work

When getNextTask detects that the current replica was removed, it re-enqueues the selected file into unprocessed_files/connection_to_files but still returns the same file at the end of the function. In the race where rescheduleTasksFromReplica() runs between task selection and this check, the same object becomes available to another replica while this caller can still process it, which can lead to duplicate processing/results if the caller proceeds with the returned task.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically it can't be.
getNextTask calls from processReadTaskRequest, what is called form processPacket as well as rescheduleTasksFromReplica. So rescheduleTasksFromReplica can't be called for the same replica simultaneously with getNextTask.
Comment was written incorrectly, code fixed

connection_to_files[file_replica_idx].push_back(file);
}
else
processed_file_list_ptr->second.push_back(file);
}

return file;
}
Expand Down Expand Up @@ -121,7 +136,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t
auto next_file = files.back();
files.pop_back();

auto file_identifier = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getIdentifier();
auto file_identifier = getFileIdentifier(next_file);
auto it = unprocessed_files.find(file_identifier);
if (it == unprocessed_files.end())
continue;
Expand Down Expand Up @@ -179,20 +194,15 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter
}
}

String file_identifier;
if (send_over_whole_archive && object_info->isArchive())
{
file_identifier = object_info->getPathOrPathToArchiveIfArchive();
LOG_TEST(log, "Will send over the whole archive {} to replicas. "
"This will be suboptimal, consider turning on "
"cluster_function_process_archive_on_multiple_nodes setting", file_identifier);
}
else
String file_identifier = getFileIdentifier(object_info, true);

size_t file_replica_idx;

{
file_identifier = object_info->getIdentifier();
std::lock_guard lock(mutex);
file_replica_idx = getReplicaForFile(file_identifier);
}

size_t file_replica_idx = getReplicaForFile(file_identifier);
if (file_replica_idx == number_of_current_replica)
{
LOG_TRACE(
Expand Down Expand Up @@ -248,7 +258,7 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s
auto next_file = it->second.first;
unprocessed_files.erase(it);

auto file_path = send_over_whole_archive ? next_file->getPathOrPathToArchiveIfArchive() : next_file->getAbsolutePath().value_or(next_file->getPath());
auto file_path = getFileIdentifier(next_file);
LOG_TRACE(
log,
"Iterator exhausted. Assigning unprocessed file {} to replica {} from matched replica {}",
Expand Down Expand Up @@ -308,13 +318,31 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_
"All replicas were marked as lost"
);

auto files = std::move(processed_file_list_ptr->second);
replica_to_files_to_be_processed.erase(number_of_current_replica);
for (const auto & file : processed_file_list_ptr->second)
for (const auto & file : files)
{
auto file_replica_idx = getReplicaForFile(file->getAbsolutePath().value_or(file->getPath()));
unprocessed_files.emplace(file->getAbsolutePath().value_or(file->getPath()), std::make_pair(file, file_replica_idx));
auto file_identifier = getFileIdentifier(file);
auto file_replica_idx = getReplicaForFile(file_identifier);
unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx));
connection_to_files[file_replica_idx].push_back(file);
}
}

String StorageObjectStorageStableTaskDistributor::getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log) const
{
if (send_over_whole_archive && file_object->isArchive())
{
auto file_identifier = file_object->getPathOrPathToArchiveIfArchive();
if (write_to_log)
{
LOG_TEST(log, "Will send over the whole archive {} to replicas. "
"This will be suboptimal, consider turning on "
"cluster_function_process_archive_on_multiple_nodes setting", file_identifier);
}
return file_identifier;
}
return file_object->getIdentifier();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class StorageObjectStorageStableTaskDistributor

void saveLastNodeActivity(size_t number_of_current_replica);

String getFileIdentifier(ObjectInfoPtr file_object, bool write_to_log = false) const;

const std::shared_ptr<IObjectIterator> iterator;
const bool send_over_whole_archive;

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_s3_cluster/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,7 @@ def test_joins(started_cluster):
assert len(res) == 25


def _test_graceful_shutdown(started_cluster):
def test_graceful_shutdown(started_cluster):
node = started_cluster.instances["s0_0_0"]
node_to_shutdown = started_cluster.instances["s0_1_0"]

Expand Down
Loading