Skip to content

Commit ede09af

Browse files
committed
controller: Skip FPM truncation if TS is not enabled
Fast partition movement works by setting the learner's raft start offset to a safe prefix truncation of the local log, based on initial retention configs and some calculations to determine an offset below which all data are recoverable from cloud storage. Raft data above this offset are transferred directly, whereas data below this offset (assumed to be recoverable from cloud) are not. Prior to this change, this computation could produce an unsafe initial start offset for the learner if archival_metadata_stm::max_removable_local_log_offset exceeds the last uploaded offset, which can occur if archival is disabled. Note that this difference between max_removable and last uploaded is by design since automatic truncation is usually predicated on cleanup policy, and archival being switched off should not block cleanup in the common case. FPM is a special case in the sense that the "truncation" is not for cleanup as such, but rather an omission of backed-up data from an expensive bulk transfer. Which is to say it can and does occur even if the source topic is compact-only. The net result of this is that archival_metadata_stm reports that it is safe to remove up to offset::max, and, as a result, FPM truncates the raft transfer up to whatever removable offset is reported by the other stms. Any data produced between the last cloud storage upload and this offset are lost. This commit introduces checks to - prevent FPM from adjusting the learner start offset above the start of the local log for any partition w/ TS disabled or w/ uploads paused - prevent FPM from operating on internal topics - ensure that FPM won't truncate past the safe offset in general as reported by archival metadata stm. This is mostly a safeguard, as in most cases max_removable_local_log_offset should return the correct thing. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com> (cherry picked from commit 3dc8d34) Conflicts: - max_collectible_offset was renamed to max_removable_local_log_offset - local naming in controller_backend - config::cloud_storage_enable_segment_uploads not yet implemented - turn off upload pause ducktape test
1 parent de76e18 commit ede09af

File tree

2 files changed

+249
-16
lines changed

2 files changed

+249
-16
lines changed

src/v/cluster/controller_backend.cc

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -585,12 +585,32 @@ controller_backend::calculate_learner_initial_offset(
585585
* Initial learner start offset only makes sense for partitions with cloud
586586
* storage data
587587
*/
588+
if (auto tp_cfg = p->get_topic_config();
589+
tp_cfg.has_value() && tp_cfg->get().is_internal()) {
590+
vlog(clusterlog.trace, "{} is part of an internal topic", p->ntp());
591+
return std::nullopt;
592+
}
593+
588594
if (!p->cloud_data_available()) {
589595
vlog(clusterlog.trace, "no cloud data available for: {}", p->ntp());
590596
return std::nullopt;
591597
}
592598

599+
if (p->get_cloud_storage_mode() != cluster::cloud_storage_mode::full) {
600+
vlog(
601+
clusterlog.trace,
602+
"cloud storage not fully enabled for: {}",
603+
p->ntp());
604+
return std::nullopt;
605+
}
606+
607+
if (p->archival_meta_stm() == nullptr) {
608+
vlog(clusterlog.trace, "no archival_meta_stm for {}", p->ntp());
609+
return std::nullopt;
610+
}
611+
593612
auto log = p->log();
613+
594614
/**
595615
* Calculate retention targets based on cluster and topic configuration
596616
*/
@@ -670,20 +690,39 @@ controller_backend::calculate_learner_initial_offset(
670690
return std::nullopt;
671691
}
672692

673-
auto const cloud_storage_safe_offset
693+
auto cloud_storage_safe_offset
674694
= p->archival_meta_stm()->max_collectible_offset();
695+
auto archival_safe_removable
696+
= p->archival_meta_stm()->cloud_recoverable_offset();
697+
675698
/**
676699
* Last offset uploaded to the cloud is target learner retention upper
677700
* bound. We can not start retention recover from the point which is not yet
678701
* uploaded to Cloud Storage.
702+
*
703+
* In general cloud_storage_safe_offset should not exceed
704+
* last_uploaded, but can if, for example, archival is disabled or paused.
679705
*/
706+
707+
if (cloud_storage_safe_offset > archival_safe_removable) {
708+
vlog(
709+
clusterlog.info,
710+
"[{}] cloud_storage_safe_offset {} exceeds last uploaded to "
711+
"cloud {}, clamping to {}",
712+
p->ntp(),
713+
cloud_storage_safe_offset,
714+
archival_safe_removable,
715+
archival_safe_removable);
716+
cloud_storage_safe_offset = archival_safe_removable;
717+
}
718+
680719
vlog(
681720
clusterlog.info,
682721
"[{}] calculated retention offset: {}, last uploaded to cloud: {}, "
683722
"manifest clean offset: {}, max_collectible_offset: {}",
684723
p->ntp(),
685724
*retention_offset,
686-
p->archival_meta_stm()->manifest().get_last_offset(),
725+
archival_safe_removable,
687726
p->archival_meta_stm()->get_last_clean_at(),
688727
cloud_storage_safe_offset);
689728

tests/rptest/tests/node_pool_migration_test.py

Lines changed: 208 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from concurrent.futures import ThreadPoolExecutor
1111
import random
1212
import re
13+
import json
1314

1415
import requests
1516
from rptest.clients.kafka_cat import KafkaCat
@@ -23,9 +24,13 @@
2324
from rptest.clients.types import TopicSpec
2425
from rptest.services.admin import Admin
2526
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, SISettings
27+
from rptest.util import expect_exception
2628
from rptest.utils.mode_checks import cleanup_on_early_exit
2729
from rptest.utils.node_operations import NodeDecommissionWaiter
30+
from rptest.utils.mode_checks import skip_debug_mode
31+
from rptest.tests.redpanda_test import RedpandaTest
2832
from enum import Enum
33+
import ducktape.errors
2934

3035
TS_LOG_ALLOW_LIST = [
3136
re.compile(
@@ -45,21 +50,10 @@ def has_tiered_storage(self):
4550
return self.value == self.TIRED_STORAGE or self.value == self.FAST_MOVES
4651

4752

48-
class NodePoolMigrationTest(PreallocNodesTest):
49-
"""
50-
Basic nodes decommissioning test.
51-
"""
52-
def __init__(self, test_context):
53+
class NodePoolMigrationTestBase(PreallocNodesTest):
54+
def __init__(self, *args, **kwargs):
5355
self._topic = None
54-
55-
super(NodePoolMigrationTest, self).__init__(
56-
test_context=test_context,
57-
num_brokers=10,
58-
node_prealloc_count=1,
59-
si_settings=SISettings(test_context,
60-
cloud_storage_enable_remote_read=True,
61-
cloud_storage_enable_remote_write=True,
62-
fast_uploads=True))
56+
super(NodePoolMigrationTestBase, self).__init__(*args, **kwargs)
6357

6458
def setup(self):
6559
# defer starting redpanda to test body
@@ -269,6 +263,27 @@ def _replicas_per_node(self):
269263

270264
return node_replicas
271265

266+
267+
class NodePoolMigrationTest(NodePoolMigrationTestBase):
268+
"""
269+
Basic nodes decommissioning test.
270+
"""
271+
def __init__(self, test_context):
272+
self._topic = None
273+
274+
super(NodePoolMigrationTest, self).__init__(
275+
test_context=test_context,
276+
num_brokers=10,
277+
node_prealloc_count=1,
278+
si_settings=SISettings(test_context,
279+
cloud_storage_enable_remote_read=True,
280+
cloud_storage_enable_remote_write=True,
281+
fast_uploads=True))
282+
283+
def setup(self):
284+
# defer starting redpanda to test body
285+
pass
286+
272287
@cluster(num_nodes=11,
273288
log_allow_list=RESTART_LOG_ALLOW_LIST + TS_LOG_ALLOW_LIST)
274289
@matrix(balancing_mode=["off", 'node_add'],
@@ -399,3 +414,182 @@ def _quiescent_state():
399414
self.redpanda.stop_node(n)
400415

401416
self.verify()
417+
418+
419+
class DisableTestMode(str, Enum):
420+
DISABLE = "disable tiered storage"
421+
PAUSE = "pause uploads"
422+
423+
def do_disable(self, test: RedpandaTest, topic_name: str):
424+
if self.value == self.DISABLE:
425+
test.client().alter_topic_config(topic_name,
426+
'redpanda.remote.read', 'false')
427+
test.client().alter_topic_config(topic_name,
428+
'redpanda.remote.write', 'false')
429+
elif self.value == self.PAUSE:
430+
test.client().alter_topic_config(topic_name,
431+
'redpanda.remote.allowgaps',
432+
'true')
433+
test.redpanda.set_cluster_config(
434+
{"cloud_storage_enable_segment_uploads": False})
435+
436+
437+
class DisableTieredStorageTest(NodePoolMigrationTestBase):
438+
def __init__(self, test_context):
439+
self._topic = None
440+
441+
super(DisableTieredStorageTest, self).__init__(
442+
test_context=test_context,
443+
num_brokers=3,
444+
node_prealloc_count=1,
445+
si_settings=SISettings(test_context,
446+
cloud_storage_enable_remote_read=True,
447+
cloud_storage_enable_remote_write=True,
448+
fast_uploads=True))
449+
450+
def setup(self):
451+
# defer starting redpanda to test body
452+
pass
453+
454+
@cluster(num_nodes=4,
455+
log_allow_list=RESTART_LOG_ALLOW_LIST + TS_LOG_ALLOW_LIST)
456+
@matrix(disable_mode=[
457+
DisableTestMode.DISABLE,
458+
# Removed in backport, not yet implemented
459+
# DisableTestMode.PAUSE,
460+
])
461+
def test_disable_tiered_storage(self, disable_mode: DisableTestMode):
462+
'''
463+
This test performs the following actions:
464+
- Create a tiered storage topic
465+
- Produce some data and wait for cloud storage upload
466+
- Disable tiered storage on the topic
467+
- Produce some more data (note no additional upload)
468+
- Decommission leader to force leadership transfer
469+
- Check that start offset and high watermark on the new leader reflect
470+
the full content of the original leader's raft log prior to decom.
471+
'''
472+
473+
self.redpanda.start()
474+
cfg = {"partition_autobalancing_mode": 'node_add'}
475+
cfg["cloud_storage_enable_remote_write"] = True
476+
cfg["cloud_storage_enable_remote_read"] = True
477+
# we want data to be actually deleted
478+
cfg["retention_local_strict"] = True
479+
480+
# we need to configure a small amount of initial local retention,
481+
# otherwise we get the hwm, batch boundary adjustment fails, and we
482+
# fall back to setting the learner to start at offset 0
483+
self.redpanda.set_cluster_config({
484+
"initial_retention_local_target_bytes_default":
485+
self.segment_size * 2
486+
})
487+
488+
self.admin.patch_cluster_config(upsert=cfg)
489+
490+
spec = TopicSpec(
491+
name=f"migration-test",
492+
partition_count=1,
493+
replication_factor=1,
494+
cleanup_policy='compact',
495+
segment_bytes=self.segment_size,
496+
)
497+
self.client().create_topic(spec)
498+
self._topic = spec.name
499+
rpk = RpkTool(self.redpanda)
500+
501+
def describe_topic():
502+
info = None
503+
while info is None:
504+
for i in rpk.describe_topic(spec.name):
505+
info = i
506+
self.logger.debug(f"{info}")
507+
return info
508+
509+
self.start_producer()
510+
self.producer.wait(timeout_sec=60)
511+
512+
info = describe_topic()
513+
514+
initial_start_offset = info.start_offset
515+
initial_hwm = info.high_watermark
516+
517+
def pm_last_offset():
518+
v = self.admin.get_partition_manifest(spec.name, 0)['last_offset']
519+
return v
520+
521+
self.logger.debug("Wait until most of the topic is uploaded")
522+
523+
wait_until(lambda: pm_last_offset() >= initial_hwm,
524+
timeout_sec=30,
525+
backoff_sec=2,
526+
err_msg="Partition never uploaded")
527+
528+
self.logger.debug(
529+
f"Now {disable_mode} and produce some more to put HWM well above the last uploaded offset"
530+
)
531+
disable_mode.do_disable(self, spec.name)
532+
533+
last_uploaded = pm_last_offset()
534+
535+
self.start_producer()
536+
self.producer.wait(timeout_sec=60)
537+
538+
info = describe_topic()
539+
second_start_offset = info.start_offset
540+
second_hwm = info.high_watermark
541+
542+
assert pm_last_offset() == last_uploaded, \
543+
f"Unexpectedly uploaded more data {pm_last_offset()} > {last_uploaded}"
544+
545+
self.logger.debug(
546+
"Decommission the partition's leader and wait for leadership transfer"
547+
)
548+
549+
leader_id = self.admin.get_partition_leader(namespace='kafka',
550+
topic=spec.name,
551+
partition=0)
552+
553+
self._decommission(leader_id, decommissioned_ids=[leader_id])
554+
555+
def new_leader_id():
556+
partition_info = self.admin.get_partitions(topic=spec.name,
557+
partition=0,
558+
namespace='kafka',
559+
node=None)
560+
self.logger.debug(f"{partition_info=}")
561+
new_id = self.admin.get_partition_leader(namespace='kafka',
562+
topic=spec.name,
563+
partition=0)
564+
self.logger.debug(f"{new_id=}")
565+
return new_id
566+
567+
wait_until(lambda: new_leader_id() not in [leader_id, -1],
568+
timeout_sec=60,
569+
backoff_sec=2,
570+
err_msg="Partition didn't move")
571+
572+
if disable_mode == DisableTestMode.DISABLE:
573+
self.logger.debug(
574+
"With tiered storage disabled, we should skip FPM truncation and transfer the whole log via raft"
575+
)
576+
elif disable_mode == DisableTestMode.PAUSE:
577+
self.logger.debug(
578+
"With uploads paused, FPM should truncate only up to the last uploaded offset to avoid introducing a gap in the log"
579+
)
580+
581+
with expect_exception(ducktape.errors.TimeoutError, lambda e: True):
582+
wait_until(
583+
lambda: describe_topic().start_offset > initial_start_offset,
584+
timeout_sec=30,
585+
backoff_sec=2,
586+
err_msg="Start offset never jumped")
587+
588+
final_start_offset = describe_topic().start_offset
589+
final_hwm = describe_topic().high_watermark
590+
591+
assert final_start_offset == initial_start_offset, \
592+
f"Expected final_start_offset == {initial_start_offset}, got {final_start_offset=}"
593+
594+
assert final_hwm == second_hwm, \
595+
f"Expected final_hwm == {second_hwm}, got {final_hwm=}"

0 commit comments

Comments
 (0)