Skip to content

Commit 61228a1

Browse files
Merge pull request ClickHouse#86874 from ClickHouse/backport/25.8/86615
Backport ClickHouse#86615 to 25.8: s3queue: fix logical error because of keeper session expired
2 parents 8950b79 + 28634d1 commit 61228a1

File tree

2 files changed

+14
-2
lines changed

2 files changed

+14
-2
lines changed

src/Storages/ObjectStorageQueue/ObjectStorageQueueOrderedFileMetadata.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ std::pair<bool, ObjectStorageQueueIFileMetadata::FileStatus::State> ObjectStorag
278278
processing_id = node_metadata.processing_id = getRandomASCIIString(10);
279279
auto processor_info = getProcessorInfo(processing_id.value());
280280

281-
const size_t max_num_tries = 1000;
281+
const size_t max_num_tries = 100;
282282
Coordination::Error code;
283283
for (size_t i = 0; i < max_num_tries; ++i)
284284
{
@@ -428,7 +428,7 @@ std::pair<bool, ObjectStorageQueueIFileMetadata::FileStatus::State> ObjectStorag
428428
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of zookeeper transaction: {}", code);
429429

430430
/// most likely the processing node id path node was removed or created so let's try again
431-
LOG_TRACE(log, "Retrying setProcessing because processing node id path is unexpectedly missing or was created (error code: {})", code);
431+
LOG_DEBUG(log, "Retrying setProcessing because processing node id path is unexpectedly missing or was created (error code: {})", code);
432432
}
433433

434434
throw Exception(

src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,18 @@ ObjectStorageQueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t proc
593593
if (current_bucket_holder && current_bucket_holder->isZooKeeperSessionExpired())
594594
{
595595
LOG_TRACE(log, "ZooKeeper session expired, bucket {} not longer hold", current_bucket_holder->getBucket());
596+
597+
for (auto & [bucket, bucket_info] : listed_keys_cache)
598+
{
599+
/// Reset current processor for the keys
600+
/// to avoid the above error "Expected current processor {} to be equal to {} for bucket {}".
601+
if (bucket_info.processor.has_value() && bucket_info.processor.value() == current_processor)
602+
{
603+
LOG_DEBUG(log, "Resetting processor ({}) for bucket {}", current_processor, bucket);
604+
bucket_info.processor.reset();
605+
}
606+
}
607+
596608
current_bucket_holder = {};
597609
}
598610

0 commit comments

Comments
 (0)