Skip to content

Commit 56939d2

Browse files
authored
Merge pull request #29921 from vbotbuildovich/backport-pr-29831-v25.3.x-535
[v25.3.x] archival: repair misaligned archive offsets
2 parents 9de3a95 + c40f62b commit 56939d2

File tree

6 files changed

+369
-0
lines changed

6 files changed

+369
-0
lines changed

src/v/cloud_storage/partition_manifest.cc

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1152,6 +1152,11 @@ void partition_manifest::spillover(const segment_meta& spillover_meta) {
11521152
}
11531153

11541154
segment_meta partition_manifest::make_manifest_metadata() const {
1155+
if (empty()) {
1156+
throw std::runtime_error(
1157+
"can't make manifest metadata for empty manifest");
1158+
}
1159+
11551160
return segment_meta{
11561161
.size_bytes = cloud_log_size(),
11571162
.base_offset = get_start_offset().value(),
@@ -2811,6 +2816,69 @@ void partition_manifest::process_anomalies(
28112816
_last_scrubbed_offset);
28122817
}
28132818

2819+
std::optional<partition_manifest> partition_manifest::repair_state() const {
2820+
auto repaired = do_repair_state();
2821+
2822+
// Guard against "infinite repair loop" scenario, where the manifest is
2823+
// repaired but the repaired manifest is still inconsistent and needs to be
2824+
// repaired again.
2825+
if (repaired.has_value() && repaired->do_repair_state().has_value()) {
2826+
throw std::runtime_error(
2827+
fmt::format(
2828+
"[{}] Manifest repair failed: repaired manifest is still "
2829+
"inconsistent, this should never happen",
2830+
display_name()));
2831+
}
2832+
2833+
return repaired;
2834+
}
2835+
2836+
std::optional<partition_manifest> partition_manifest::do_repair_state() const {
2837+
if (!get_spillover_map().empty()) {
2838+
const auto first_spill = get_spillover_map().begin();
2839+
if (
2840+
_archive_start_offset < first_spill->base_offset
2841+
|| _archive_clean_offset < first_spill->base_offset) {
2842+
auto cloned = clone();
2843+
2844+
// Old versions of redpanda (pre v25.3) could have a manifest with
2845+
// incorrect `archive_start_offset` and `archive_clean_offset`
2846+
// values—below the offset of the first spillover manifest. To avoid
2847+
// issues with such manifests, we need to detect this case and fix
2848+
// the offsets by bumping them up to the offset of the first
2849+
// spillover manifest.
2850+
2851+
if (_archive_start_offset < first_spill->base_offset) {
2852+
vlog(
2853+
cst_log.warn,
2854+
"[{}] archive_start_offset {} is below the first spillover "
2855+
"manifest offset {}, advancing it to align with the first "
2856+
"spillover manifest offset.",
2857+
display_name(),
2858+
_archive_start_offset,
2859+
first_spill->base_offset);
2860+
cloned.set_archive_start_offset(
2861+
first_spill->base_offset, first_spill->delta_offset);
2862+
}
2863+
if (_archive_clean_offset < first_spill->base_offset) {
2864+
vlog(
2865+
cst_log.warn,
2866+
"[{}] archive_clean_offset {} is below the first spillover "
2867+
"manifest offset {}, advancing it to align with the first "
2868+
"spillover manifest offset.",
2869+
display_name(),
2870+
_archive_clean_offset,
2871+
first_spill->base_offset);
2872+
cloned.set_archive_clean_offset(first_spill->base_offset, 0);
2873+
}
2874+
2875+
return cloned;
2876+
}
2877+
}
2878+
2879+
return std::nullopt;
2880+
}
2881+
28142882
std::ostream& operator<<(std::ostream& o, const partition_manifest& pm) {
28152883
o << "{manifest: ";
28162884
pm.serialize_json(o, false);

src/v/cloud_storage/partition_manifest.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,8 @@ class partition_manifest : public base_manifest {
381381
/// This mechanism is used by the 'spillover' mechanism. The 'segment_meta'
382382
/// instances that describe spillover manifests are stored using this
383383
/// format.
384+
///
385+
/// \pre The manifest must not be empty.
384386
segment_meta make_manifest_metadata() const;
385387

386388
/// Return 'true' if the spillover manifest can be added to
@@ -594,6 +596,10 @@ class partition_manifest : public base_manifest {
594596
scrub_status status,
595597
anomalies detected);
596598

599+
/// Returns a repaired copy of the manifest if the manifest has known
600+
/// defects. Returns `std::nullopt` otherwise.
601+
std::optional<partition_manifest> repair_state() const;
602+
597603
private:
598604
ss::sstring display_name() const;
599605
std::optional<kafka::offset> compute_start_kafka_offset_local() const;
@@ -643,6 +649,8 @@ class partition_manifest : public base_manifest {
643649
void serialize_removed_segment_meta(
644650
const lw_segment_meta& meta, serialization_cursor_ptr cursor) const;
645651

652+
std::optional<partition_manifest> do_repair_state() const;
653+
646654
model::ntp _ntp;
647655
model::initial_revision_id _rev;
648656

src/v/cloud_storage/tests/partition_manifest_test.cc

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2844,3 +2844,96 @@ SEASTAR_THREAD_TEST_CASE(
28442844
BOOST_REQUIRE(restored == m);
28452845
BOOST_REQUIRE(m.get_applied_offset() == model::offset{100});
28462846
}
2847+
2848+
SEASTAR_THREAD_TEST_CASE(test_repair_state_no_spillovers) {
2849+
partition_manifest m;
2850+
m.add(make_segment({10, 99}, 1234));
2851+
m.add(make_segment({100, 199}, 1000));
2852+
2853+
BOOST_REQUIRE(!m.repair_state().has_value());
2854+
}
2855+
2856+
SEASTAR_THREAD_TEST_CASE(test_repair_state_aligned) {
2857+
partition_manifest m;
2858+
m.add(make_segment({10, 199}, 2234, 0));
2859+
m.add(make_segment({200, 299}, 2000, 0));
2860+
m.add(make_segment({300, 399}, 500, 0));
2861+
{
2862+
auto spill_manifest = partition_manifest{
2863+
m.get_ntp(), m.get_revision_id()};
2864+
spill_manifest.add(*m.begin());
2865+
m.spillover(spill_manifest.make_manifest_metadata());
2866+
m.set_archive_start_offset(model::offset{10}, model::offset_delta{0});
2867+
m.set_archive_clean_offset(model::offset{10}, 0);
2868+
}
2869+
2870+
auto first_spill_base = m.get_spillover_map().begin()->base_offset;
2871+
BOOST_REQUIRE_EQUAL(first_spill_base, model::offset{10});
2872+
BOOST_REQUIRE(!m.repair_state().has_value());
2873+
}
2874+
2875+
SEASTAR_THREAD_TEST_CASE(test_repair_state_misaligned) {
2876+
partition_manifest m;
2877+
m.add(make_segment({10, 199}, 2234, 0));
2878+
m.add(make_segment({200, 299}, 2000, 0));
2879+
m.add(make_segment({300, 399}, 500, 0));
2880+
2881+
{
2882+
auto spill_manifest = partition_manifest{
2883+
m.get_ntp(), m.get_revision_id()};
2884+
spill_manifest.add(*m.begin());
2885+
m.spillover(spill_manifest.make_manifest_metadata());
2886+
m.set_archive_start_offset(model::offset{5}, model::offset_delta{0});
2887+
m.set_archive_clean_offset(model::offset{5}, 0);
2888+
}
2889+
2890+
auto first_spill = *m.get_spillover_map().begin();
2891+
BOOST_REQUIRE(m.get_archive_start_offset() < first_spill.base_offset);
2892+
2893+
auto repaired = m.repair_state();
2894+
BOOST_REQUIRE(repaired.has_value());
2895+
BOOST_REQUIRE_EQUAL(
2896+
repaired->get_archive_start_offset(), first_spill.base_offset);
2897+
BOOST_REQUIRE_EQUAL(
2898+
repaired->get_archive_clean_offset(), first_spill.base_offset);
2899+
2900+
// Original manifest is unchanged.
2901+
BOOST_REQUIRE_EQUAL(m.get_archive_start_offset(), model::offset{5});
2902+
BOOST_REQUIRE_EQUAL(m.get_archive_clean_offset(), model::offset{5});
2903+
2904+
// Repaired manifest no longer needs repair.
2905+
BOOST_REQUIRE(!repaired->repair_state().has_value());
2906+
}
2907+
2908+
SEASTAR_THREAD_TEST_CASE(test_repair_state_misaligned_clean_offset) {
2909+
partition_manifest m;
2910+
m.add(make_segment({10, 199}, 2234, 0));
2911+
m.add(make_segment({200, 299}, 2000, 0));
2912+
m.add(make_segment({300, 399}, 500, 0));
2913+
2914+
{
2915+
auto spill_manifest = partition_manifest{
2916+
m.get_ntp(), m.get_revision_id()};
2917+
spill_manifest.add(*m.begin());
2918+
m.spillover(spill_manifest.make_manifest_metadata());
2919+
m.set_archive_start_offset(model::offset{200}, model::offset_delta{0});
2920+
m.set_archive_clean_offset(model::offset{5}, 0);
2921+
}
2922+
2923+
auto first_spill = *m.get_spillover_map().begin();
2924+
BOOST_REQUIRE(m.get_archive_clean_offset() < first_spill.base_offset);
2925+
2926+
auto repaired = m.repair_state();
2927+
BOOST_REQUIRE(repaired.has_value());
2928+
BOOST_REQUIRE_EQUAL(
2929+
repaired->get_archive_start_offset(), model::offset{200});
2930+
BOOST_REQUIRE_EQUAL(
2931+
repaired->get_archive_clean_offset(), first_spill.base_offset);
2932+
2933+
// Original manifest is unchanged.
2934+
BOOST_REQUIRE_EQUAL(m.get_archive_start_offset(), model::offset{200});
2935+
BOOST_REQUIRE_EQUAL(m.get_archive_clean_offset(), model::offset{5});
2936+
2937+
// Repaired manifest no longer needs repair.
2938+
BOOST_REQUIRE(!repaired->repair_state().has_value());
2939+
}

src/v/cluster/archival/ntp_archiver_service.cc

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,40 @@ archival_stm_fence ntp_archiver::emit_rw_fence() {
420420
};
421421
}
422422

423+
ss::future<std::error_code>
424+
ntp_archiver::maybe_repair_manifest(ss::lowres_clock::time_point deadline) {
425+
auto repaired_copy = _parent.archival_meta_stm()->manifest().repair_state();
426+
if (!repaired_copy) {
427+
co_return std::error_code{};
428+
}
429+
430+
vlog(
431+
_rtclog.warn, "Manifest repair created a new manifest. Replicating it.");
432+
auto batch = _parent.archival_meta_stm()->batch_start(deadline, _as);
433+
auto fence = emit_rw_fence();
434+
if (fence.emit_rw_fence_cmd) {
435+
vlog(
436+
_rtclog.debug,
437+
"replace manifest with repair, read-write fence: {}",
438+
fence.read_write_fence);
439+
batch.read_write_fence(fence.read_write_fence);
440+
}
441+
442+
batch.replace_manifest(repaired_copy->to_iobuf());
443+
auto ec = co_await batch.replicate();
444+
if (ec) {
445+
vlog(
446+
_rtclog.error,
447+
"Failed to replace manifest with repaired version: {}",
448+
ec);
449+
} else {
450+
vlog(
451+
_rtclog.debug, "Finished replacing manifest with repaired version");
452+
}
453+
454+
co_return ec;
455+
}
456+
423457
void ntp_archiver::log_collected_traces() noexcept {
424458
try {
425459
_rtclog.bypass_tracing([this] {
@@ -700,6 +734,13 @@ ss::future<> ntp_archiver::upload_until_abort() {
700734
}
701735
}
702736

737+
if (auto ec = co_await maybe_repair_manifest(
738+
ss::lowres_clock::now() + sync_timeout);
739+
ec) {
740+
vlog(_rtclog.warn, "Failed to repair manifest: {}, retrying", ec);
741+
continue;
742+
}
743+
703744
vlog(_rtclog.debug, "upload loop synced in term {}", _start_term);
704745
if (!may_begin_uploads()) {
705746
continue;

src/v/cluster/archival/ntp_archiver_service.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,9 @@ class ntp_archiver {
482482
/// Create a fence value for the next STM operation
483483
archival_stm_fence emit_rw_fence();
484484

485+
ss::future<std::error_code>
486+
maybe_repair_manifest(ss::lowres_clock::time_point deadline);
487+
485488
/// Delete objects, return true on success and false otherwise
486489
ss::future<bool>
487490
batch_delete(std::vector<cloud_storage_clients::object_key> paths);

0 commit comments

Comments
 (0)