Skip to content

Commit 754dc2c

Browse files
authored
Merge pull request #26822 from oleiman/manual-backport-26803-v24.2.x-185
2 parents 5bd05d9 + ede09af commit 754dc2c

File tree

4 files changed

+275
-28
lines changed

4 files changed

+275
-28
lines changed

src/v/archival/archival_metadata_stm.cc

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1330,6 +1330,21 @@ archival_metadata_stm::take_local_snapshot(ssx::semaphore_units apply_units) {
13301330
0, snapshot_offset, std::move(snap_data));
13311331
}
13321332

1333+
model::offset archival_metadata_stm::cloud_recoverable_offset() {
1334+
auto lo = get_last_offset();
1335+
if (_manifest->size() == 0 && lo == model::offset{0}) {
1336+
lo = model::offset::min();
1337+
}
1338+
1339+
// Do not collect past the offset we last uploaded manifest for: this is
1340+
// needed for correctness because the remote manifest is used in
1341+
// handle_eviction() - it is what a remote node doing snapshot-driven
1342+
// raft recovery will use to start from.
1343+
lo = std::min(lo, _last_clean_at);
1344+
1345+
return lo;
1346+
}
1347+
13331348
model::offset archival_metadata_stm::max_collectible_offset() {
13341349
// From Redpanda 22.3 up, the ntp_config's impression of whether
13351350
// archival is enabled is authoritative.
@@ -1352,18 +1367,7 @@ model::offset archival_metadata_stm::max_collectible_offset() {
13521367
// need to interact with local retention.
13531368
return model::offset::max();
13541369
}
1355-
auto lo = get_last_offset();
1356-
if (_manifest->size() == 0 && lo == model::offset{0}) {
1357-
lo = model::offset::min();
1358-
}
1359-
1360-
// Do not collect past the offset we last uploaded manifest for: this is
1361-
// needed for correctness because the remote manifest is used in
1362-
// handle_eviction() - it is what a remote node doing snapshot-driven
1363-
// raft recovery will use to start from.
1364-
lo = std::min(lo, _last_clean_at);
1365-
1366-
return lo;
1370+
return cloud_recoverable_offset();
13671371
}
13681372

13691373
void archival_metadata_stm::maybe_notify_waiter(cluster::errc err) noexcept {

src/v/archival/archival_metadata_stm.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,16 @@ class archival_metadata_stm final : public raft::persisted_stm<> {
273273

274274
model::offset get_last_clean_at() const { return _last_clean_at; };
275275

276+
/// Returns the maximum offset which is guaranteed to be recoverable from
277+
/// cloud storage.
278+
///
279+
/// This is the lesser of the last offset uploaded to cloud storage and the
280+
/// last offset we uploaded a manifest for.
281+
///
282+
/// If the manifest is empty or the last uploaded offset is 0, returns
283+
/// offset::min(), indicating that nothing is recoverable from cloud.
284+
model::offset cloud_recoverable_offset();
285+
276286
model::offset max_collectible_offset() override;
277287

278288
ss::future<iobuf> take_snapshot(model::offset) final { co_return iobuf{}; }

src/v/cluster/controller_backend.cc

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -585,12 +585,32 @@ controller_backend::calculate_learner_initial_offset(
585585
* Initial learner start offset only makes sense for partitions with cloud
586586
* storage data
587587
*/
588+
if (auto tp_cfg = p->get_topic_config();
589+
tp_cfg.has_value() && tp_cfg->get().is_internal()) {
590+
vlog(clusterlog.trace, "{} is part of an internal topic", p->ntp());
591+
return std::nullopt;
592+
}
593+
588594
if (!p->cloud_data_available()) {
589595
vlog(clusterlog.trace, "no cloud data available for: {}", p->ntp());
590596
return std::nullopt;
591597
}
592598

599+
if (p->get_cloud_storage_mode() != cluster::cloud_storage_mode::full) {
600+
vlog(
601+
clusterlog.trace,
602+
"cloud storage not fully enabled for: {}",
603+
p->ntp());
604+
return std::nullopt;
605+
}
606+
607+
if (p->archival_meta_stm() == nullptr) {
608+
vlog(clusterlog.trace, "no archival_meta_stm for {}", p->ntp());
609+
return std::nullopt;
610+
}
611+
593612
auto log = p->log();
613+
594614
/**
595615
* Calculate retention targets based on cluster and topic configuration
596616
*/
@@ -670,20 +690,39 @@ controller_backend::calculate_learner_initial_offset(
670690
return std::nullopt;
671691
}
672692

673-
auto const cloud_storage_safe_offset
693+
auto cloud_storage_safe_offset
674694
= p->archival_meta_stm()->max_collectible_offset();
695+
auto archival_safe_removable
696+
= p->archival_meta_stm()->cloud_recoverable_offset();
697+
675698
/**
676699
* Last offset uploaded to the cloud is target learner retention upper
677700
* bound. We can not start retention recover from the point which is not yet
678701
* uploaded to Cloud Storage.
702+
*
703+
* In general cloud_storage_safe_offset should not exceed
704+
* last_uploaded, but can if, for example, archival is disabled or paused.
679705
*/
706+
707+
if (cloud_storage_safe_offset > archival_safe_removable) {
708+
vlog(
709+
clusterlog.info,
710+
"[{}] cloud_storage_safe_offset {} exceeds last uploaded to "
711+
"cloud {}, clamping to {}",
712+
p->ntp(),
713+
cloud_storage_safe_offset,
714+
archival_safe_removable,
715+
archival_safe_removable);
716+
cloud_storage_safe_offset = archival_safe_removable;
717+
}
718+
680719
vlog(
681720
clusterlog.info,
682721
"[{}] calculated retention offset: {}, last uploaded to cloud: {}, "
683722
"manifest clean offset: {}, max_collectible_offset: {}",
684723
p->ntp(),
685724
*retention_offset,
686-
p->archival_meta_stm()->manifest().get_last_offset(),
725+
archival_safe_removable,
687726
p->archival_meta_stm()->get_last_clean_at(),
688727
cloud_storage_safe_offset);
689728

tests/rptest/tests/node_pool_migration_test.py

Lines changed: 208 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from concurrent.futures import ThreadPoolExecutor
1111
import random
1212
import re
13+
import json
1314

1415
import requests
1516
from rptest.clients.kafka_cat import KafkaCat
@@ -23,9 +24,13 @@
2324
from rptest.clients.types import TopicSpec
2425
from rptest.services.admin import Admin
2526
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, SISettings
27+
from rptest.util import expect_exception
2628
from rptest.utils.mode_checks import cleanup_on_early_exit
2729
from rptest.utils.node_operations import NodeDecommissionWaiter
30+
from rptest.utils.mode_checks import skip_debug_mode
31+
from rptest.tests.redpanda_test import RedpandaTest
2832
from enum import Enum
33+
import ducktape.errors
2934

3035
TS_LOG_ALLOW_LIST = [
3136
re.compile(
@@ -45,21 +50,10 @@ def has_tiered_storage(self):
4550
return self.value == self.TIRED_STORAGE or self.value == self.FAST_MOVES
4651

4752

48-
class NodePoolMigrationTest(PreallocNodesTest):
49-
"""
50-
Basic nodes decommissioning test.
51-
"""
52-
def __init__(self, test_context):
53+
class NodePoolMigrationTestBase(PreallocNodesTest):
54+
def __init__(self, *args, **kwargs):
5355
self._topic = None
54-
55-
super(NodePoolMigrationTest, self).__init__(
56-
test_context=test_context,
57-
num_brokers=10,
58-
node_prealloc_count=1,
59-
si_settings=SISettings(test_context,
60-
cloud_storage_enable_remote_read=True,
61-
cloud_storage_enable_remote_write=True,
62-
fast_uploads=True))
56+
super(NodePoolMigrationTestBase, self).__init__(*args, **kwargs)
6357

6458
def setup(self):
6559
# defer starting redpanda to test body
@@ -269,6 +263,27 @@ def _replicas_per_node(self):
269263

270264
return node_replicas
271265

266+
267+
class NodePoolMigrationTest(NodePoolMigrationTestBase):
268+
"""
269+
Basic nodes decommissioning test.
270+
"""
271+
def __init__(self, test_context):
272+
self._topic = None
273+
274+
super(NodePoolMigrationTest, self).__init__(
275+
test_context=test_context,
276+
num_brokers=10,
277+
node_prealloc_count=1,
278+
si_settings=SISettings(test_context,
279+
cloud_storage_enable_remote_read=True,
280+
cloud_storage_enable_remote_write=True,
281+
fast_uploads=True))
282+
283+
def setup(self):
284+
# defer starting redpanda to test body
285+
pass
286+
272287
@cluster(num_nodes=11,
273288
log_allow_list=RESTART_LOG_ALLOW_LIST + TS_LOG_ALLOW_LIST)
274289
@matrix(balancing_mode=["off", 'node_add'],
@@ -399,3 +414,182 @@ def _quiescent_state():
399414
self.redpanda.stop_node(n)
400415

401416
self.verify()
417+
418+
419+
class DisableTestMode(str, Enum):
420+
DISABLE = "disable tiered storage"
421+
PAUSE = "pause uploads"
422+
423+
def do_disable(self, test: RedpandaTest, topic_name: str):
424+
if self.value == self.DISABLE:
425+
test.client().alter_topic_config(topic_name,
426+
'redpanda.remote.read', 'false')
427+
test.client().alter_topic_config(topic_name,
428+
'redpanda.remote.write', 'false')
429+
elif self.value == self.PAUSE:
430+
test.client().alter_topic_config(topic_name,
431+
'redpanda.remote.allowgaps',
432+
'true')
433+
test.redpanda.set_cluster_config(
434+
{"cloud_storage_enable_segment_uploads": False})
435+
436+
437+
class DisableTieredStorageTest(NodePoolMigrationTestBase):
438+
def __init__(self, test_context):
439+
self._topic = None
440+
441+
super(DisableTieredStorageTest, self).__init__(
442+
test_context=test_context,
443+
num_brokers=3,
444+
node_prealloc_count=1,
445+
si_settings=SISettings(test_context,
446+
cloud_storage_enable_remote_read=True,
447+
cloud_storage_enable_remote_write=True,
448+
fast_uploads=True))
449+
450+
def setup(self):
451+
# defer starting redpanda to test body
452+
pass
453+
454+
@cluster(num_nodes=4,
455+
log_allow_list=RESTART_LOG_ALLOW_LIST + TS_LOG_ALLOW_LIST)
456+
@matrix(disable_mode=[
457+
DisableTestMode.DISABLE,
458+
# Removed in backport, not yet implemented
459+
# DisableTestMode.PAUSE,
460+
])
461+
def test_disable_tiered_storage(self, disable_mode: DisableTestMode):
462+
'''
463+
This test performs the following actions:
464+
- Create a tiered storage topic
465+
- Produce some data and wait for cloud storage upload
466+
- Disable tiered storage on the topic
467+
- Produce some more data (note no additional upload)
468+
- Decommission leader to force leadership transfer
469+
- Check that start offset and high watermark on the new leader reflect
470+
the full content of the original leader's raft log prior to decom.
471+
'''
472+
473+
self.redpanda.start()
474+
cfg = {"partition_autobalancing_mode": 'node_add'}
475+
cfg["cloud_storage_enable_remote_write"] = True
476+
cfg["cloud_storage_enable_remote_read"] = True
477+
# we want data to be actually deleted
478+
cfg["retention_local_strict"] = True
479+
480+
# we need to configure a small amount of initial local retention,
481+
# otherwise we get the hwm, batch boundary adjustment fails, and we
482+
# fall back to setting the learner to start at offset 0
483+
self.redpanda.set_cluster_config({
484+
"initial_retention_local_target_bytes_default":
485+
self.segment_size * 2
486+
})
487+
488+
self.admin.patch_cluster_config(upsert=cfg)
489+
490+
spec = TopicSpec(
491+
name=f"migration-test",
492+
partition_count=1,
493+
replication_factor=1,
494+
cleanup_policy='compact',
495+
segment_bytes=self.segment_size,
496+
)
497+
self.client().create_topic(spec)
498+
self._topic = spec.name
499+
rpk = RpkTool(self.redpanda)
500+
501+
def describe_topic():
502+
info = None
503+
while info is None:
504+
for i in rpk.describe_topic(spec.name):
505+
info = i
506+
self.logger.debug(f"{info}")
507+
return info
508+
509+
self.start_producer()
510+
self.producer.wait(timeout_sec=60)
511+
512+
info = describe_topic()
513+
514+
initial_start_offset = info.start_offset
515+
initial_hwm = info.high_watermark
516+
517+
def pm_last_offset():
518+
v = self.admin.get_partition_manifest(spec.name, 0)['last_offset']
519+
return v
520+
521+
self.logger.debug("Wait until most of the topic is uploaded")
522+
523+
wait_until(lambda: pm_last_offset() >= initial_hwm,
524+
timeout_sec=30,
525+
backoff_sec=2,
526+
err_msg="Partition never uploaded")
527+
528+
self.logger.debug(
529+
f"Now {disable_mode} and produce some more to put HWM well above the last uploaded offset"
530+
)
531+
disable_mode.do_disable(self, spec.name)
532+
533+
last_uploaded = pm_last_offset()
534+
535+
self.start_producer()
536+
self.producer.wait(timeout_sec=60)
537+
538+
info = describe_topic()
539+
second_start_offset = info.start_offset
540+
second_hwm = info.high_watermark
541+
542+
assert pm_last_offset() == last_uploaded, \
543+
f"Unexpectedly uploaded more data {pm_last_offset()} > {last_uploaded}"
544+
545+
self.logger.debug(
546+
"Decommission the partition's leader and wait for leadership transfer"
547+
)
548+
549+
leader_id = self.admin.get_partition_leader(namespace='kafka',
550+
topic=spec.name,
551+
partition=0)
552+
553+
self._decommission(leader_id, decommissioned_ids=[leader_id])
554+
555+
def new_leader_id():
556+
partition_info = self.admin.get_partitions(topic=spec.name,
557+
partition=0,
558+
namespace='kafka',
559+
node=None)
560+
self.logger.debug(f"{partition_info=}")
561+
new_id = self.admin.get_partition_leader(namespace='kafka',
562+
topic=spec.name,
563+
partition=0)
564+
self.logger.debug(f"{new_id=}")
565+
return new_id
566+
567+
wait_until(lambda: new_leader_id() not in [leader_id, -1],
568+
timeout_sec=60,
569+
backoff_sec=2,
570+
err_msg="Partition didn't move")
571+
572+
if disable_mode == DisableTestMode.DISABLE:
573+
self.logger.debug(
574+
"With tiered storage disabled, we should skip FPM truncation and transfer the whole log via raft"
575+
)
576+
elif disable_mode == DisableTestMode.PAUSE:
577+
self.logger.debug(
578+
"With uploads paused, FPM should truncate only up to the last uploaded offset to avoid introducing a gap in the log"
579+
)
580+
581+
with expect_exception(ducktape.errors.TimeoutError, lambda e: True):
582+
wait_until(
583+
lambda: describe_topic().start_offset > initial_start_offset,
584+
timeout_sec=30,
585+
backoff_sec=2,
586+
err_msg="Start offset never jumped")
587+
588+
final_start_offset = describe_topic().start_offset
589+
final_hwm = describe_topic().high_watermark
590+
591+
assert final_start_offset == initial_start_offset, \
592+
f"Expected final_start_offset == {initial_start_offset}, got {final_start_offset=}"
593+
594+
assert final_hwm == second_hwm, \
595+
f"Expected final_hwm == {second_hwm}, got {final_hwm=}"

0 commit comments

Comments
 (0)