Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb

saveLastNodeActivity(number_of_current_replica);

auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica);
if (processed_file_list_ptr == replica_to_files_to_be_processed.end())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Replica number {} was marked as lost, can't set task for it anymore",
number_of_current_replica
);
{
std::lock_guard lock(mutex);
auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica);
if (processed_file_list_ptr == replica_to_files_to_be_processed.end())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Replica number {} was marked as lost, can't set task for it anymore",
number_of_current_replica
);
}

// 1. Check pre-queued files first
auto file = getPreQueuedFile(number_of_current_replica);
Expand All @@ -63,7 +66,19 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb
file = getAnyUnprocessedFile(number_of_current_replica);

if (file)
processed_file_list_ptr->second.push_back(file);
{
std::lock_guard lock(mutex);
auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica);
if (processed_file_list_ptr == replica_to_files_to_be_processed.end())
{ // It is possible that replica was lost after check in the begining of the method
auto file_identifier = file->getAbsolutePath().value_or(file->getPath());
auto file_replica_idx = getReplicaForFile(file_identifier);
unprocessed_files.emplace(file_identifier, std::make_pair(file, file_replica_idx));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve identifier semantics when re-queueing lost-replica task

When a replica disappears mid-request, this branch re-queues the task using getAbsolutePath().value_or(getPath()) as the map key, but getPreQueuedFile later looks up queued tasks by send_over_whole_archive ? getPathOrPathToArchiveIfArchive() : getIdentifier(). For bucketed objects (getIdentifier() includes bucket suffix) or archive mode, the key no longer matches, so pre-queued tasks are skipped, and emplace can collapse multiple buckets with the same path into one entry.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Requeue with canonical file identifier

When a replica is lost mid-getNextTask, this branch re-inserts the task using file->getAbsolutePath().value_or(file->getPath()), but the rest of the distributor now keys tasks via getFileIdentifier(...)/getIdentifier() (including bucket suffixes and archive normalization). In bucket-splitting mode, multiple buckets from the same path collapse to one unprocessed_files key here, and later getPreQueuedFile lookups by canonical identifier won't match this entry, so some tasks can be skipped or dropped after failover.

Useful? React with 👍 / 👎.

connection_to_files[file_replica_idx].push_back(file);
}
else
processed_file_list_ptr->second.push_back(file);
}

return file;
}
Expand Down Expand Up @@ -192,7 +207,13 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getMatchingFileFromIter
file_identifier = object_info->getIdentifier();
}

size_t file_replica_idx = getReplicaForFile(file_identifier);
size_t file_replica_idx;

{
std::lock_guard lock(mutex);
file_replica_idx = getReplicaForFile(file_identifier);
}

if (file_replica_idx == number_of_current_replica)
{
LOG_TRACE(
Expand Down Expand Up @@ -308,8 +329,9 @@ void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_
"All replicas were marked as lost"
);

auto files = std::move(processed_file_list_ptr->second);
replica_to_files_to_be_processed.erase(number_of_current_replica);
for (const auto & file : processed_file_list_ptr->second)
for (const auto & file : files)
{
auto file_replica_idx = getReplicaForFile(file->getAbsolutePath().value_or(file->getPath()));
unprocessed_files.emplace(file->getAbsolutePath().value_or(file->getPath()), std::make_pair(file, file_replica_idx));
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_s3_cluster/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,7 @@ def test_joins(started_cluster):
assert len(res) == 25


def _test_graceful_shutdown(started_cluster):
def test_graceful_shutdown(started_cluster):
node = started_cluster.instances["s0_0_0"]
node_to_shutdown = started_cluster.instances["s0_1_0"]

Expand Down
Loading