Skip to content

Commit 8fe2a9c

Browse files
authored
Merge pull request #27977 from nvartolomei/nv/backport-27952
2 parents baf178e + 62ef162 commit 8fe2a9c

File tree

3 files changed

+108
-8
lines changed

3 files changed

+108
-8
lines changed

src/v/cloud_storage/partition_manifest.cc

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -714,12 +714,19 @@ bool partition_manifest::advance_start_offset(model::offset new_start_offset) {
714714
subtract_from_cloud_log_size(it->size_bytes);
715715
}
716716

717-
// The new start offset has moved past the user-requested start offset,
718-
// so there's no point in tracking it further.
719-
if (
720-
highest_removed_offset != kafka::offset{}
721-
&& highest_removed_offset >= _start_kafka_offset_override) {
722-
_start_kafka_offset_override = kafka::offset{};
717+
if (_archive_start_offset == model::offset{}) {
718+
// This method is used by spillover (archive) mechanism to advance
719+
// the start offset of the manifest and by STM retention mechanism.
720+
// We should touch _start_kafka_offset_override only if the archive
721+
// is not used. Otherwise, the override should be managed by the
722+
// archive logic (apply_archive_retention, etc).
723+
if (
724+
highest_removed_offset != kafka::offset{}
725+
&& highest_removed_offset >= _start_kafka_offset_override) {
726+
// The new start offset has moved past the user-requested start
727+
// offset, so there's no point in tracking it further.
728+
_start_kafka_offset_override = kafka::offset{};
729+
}
723730
}
724731
return true;
725732
}

src/v/cloud_storage/tests/cloud_storage_e2e_test.cc

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "cluster/health_monitor_frontend.h"
2020
#include "kafka/data/replicated_partition.h"
2121
#include "kafka/protocol/find_coordinator.h"
22+
#include "kafka/server/tests/delete_records_utils.h"
2223
#include "kafka/server/tests/list_offsets_utils.h"
2324
#include "kafka/server/tests/produce_consume_utils.h"
2425
#include "model/fundamental.h"
@@ -1267,6 +1268,95 @@ TEST_P(EndToEndFixture, TestConsumerOffsetsNoTieredStorage) {
12671268
ASSERT_FALSE(partition->cloud_data_available());
12681269
}
12691270

1271+
TEST_F(ManualFixture, TestSpilloverWithTruncationRetainsStartOffset) {
1272+
test_local_cfg.get("cloud_storage_disable_upload_loop_for_tests")
1273+
.set_value(true);
1274+
test_local_cfg.get("cloud_storage_spillover_manifest_max_segments")
1275+
.set_value(std::make_optional<size_t>(2));
1276+
test_local_cfg.get("cloud_storage_spillover_manifest_size")
1277+
.set_value(std::optional<size_t>{});
1278+
1279+
const model::topic topic_name("spillover_truncate_test");
1280+
model::ntp ntp(model::kafka_namespace, topic_name, 0);
1281+
1282+
cluster::topic_properties props;
1283+
props.shadow_indexing = model::shadow_indexing_mode::full;
1284+
props.cleanup_policy_bitflags = model::cleanup_policy_bitflags::deletion;
1285+
props.retention_bytes = tristate<size_t>(disable_tristate_t{});
1286+
props.retention_duration = tristate<std::chrono::milliseconds>(
1287+
disable_tristate_t{});
1288+
add_topic({model::kafka_namespace, topic_name}, 1, props).get();
1289+
wait_for_leader(ntp).get();
1290+
1291+
auto partition = app.partition_manager.local().get(ntp);
1292+
auto& archiver = partition->archiver().value().get();
1293+
1294+
SCOPED_TRACE("Seeding partition data");
1295+
1296+
{
1297+
tests::remote_segment_generator gen(
1298+
make_kafka_client().get(), *partition);
1299+
auto first_batch = gen.num_segments(6)
1300+
.batches_per_segment(5)
1301+
.records_per_batch(1)
1302+
.produce()
1303+
.get();
1304+
ASSERT_GE(first_batch, 30);
1305+
}
1306+
1307+
// Sync and upload segments
1308+
ASSERT_TRUE(archiver.sync_for_tests().get());
1309+
1310+
// Step 2: Apply spillover
1311+
vlog(e2e_test_log.info, "Applying first spillover");
1312+
archiver.apply_spillover().get();
1313+
1314+
// Capture start offset before truncation
1315+
auto start_kafka_offset_before
1316+
= archiver.manifest().full_log_start_kafka_offset();
1317+
vlog(
1318+
e2e_test_log.info,
1319+
"Start kafka offset before truncation: {}",
1320+
start_kafka_offset_before);
1321+
1322+
SCOPED_TRACE("Truncating via DeleteRecords");
1323+
const auto timeout = 10s;
1324+
1325+
tests::kafka_delete_records_transport deleter(make_kafka_client().get());
1326+
deleter.start().get();
1327+
1328+
// Delete offset from second segment of the first spillover manifest.
1329+
deleter
1330+
.delete_records_from_partition(
1331+
topic_name, ntp.tp.partition, model::offset{8}, timeout)
1332+
.get();
1333+
1334+
ASSERT_EQ(
1335+
archiver.manifest().full_log_start_kafka_offset(), kafka::offset{0});
1336+
ASSERT_EQ(
1337+
archiver.manifest().get_start_kafka_offset_override(), kafka::offset{8});
1338+
1339+
// Additional segments to trigger another spillover round.
1340+
{
1341+
tests::remote_segment_generator gen(
1342+
make_kafka_client().get(), *partition);
1343+
gen.num_segments(6)
1344+
.batches_per_segment(5)
1345+
.records_per_batch(1)
1346+
.produce()
1347+
.get();
1348+
}
1349+
1350+
archiver.housekeeping().get();
1351+
1352+
// Due to implementation deficiencies truncation doesn't advance start
1353+
// offset but at least the override should not regress.
1354+
ASSERT_EQ(
1355+
archiver.manifest().full_log_start_kafka_offset(), kafka::offset{0});
1356+
ASSERT_EQ(
1357+
archiver.manifest().get_start_kafka_offset_override(), kafka::offset{8});
1358+
}
1359+
12701360
INSTANTIATE_TEST_SUITE_P(WithOverride, EndToEndFixture, ::testing::Bool());
12711361

12721362
INSTANTIATE_TEST_SUITE_P(

src/v/cluster/archival/tests/archival_metadata_stm_test.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -905,14 +905,17 @@ FIXTURE_TEST(
905905
kafka::offset(1200));
906906
BOOST_REQUIRE_EQUAL(archival_stm->get_start_offset(), model::offset(1000));
907907

908-
// Advancing the start offset past the override resets the override.
908+
// This does reset the override as archive is not truncated yet.
909909
archival_stm
910910
->truncate(
911911
model::offset(2000), ss::lowres_clock::now() + 10s, never_abort)
912912
.get();
913913
BOOST_REQUIRE_EQUAL(
914914
archival_stm->manifest().get_start_kafka_offset_override(),
915-
kafka::offset{});
915+
kafka::offset{1200});
916+
917+
// This is STM start offset. Previous truncate truncated as-if applied by
918+
// spillover.
916919
BOOST_REQUIRE_EQUAL(archival_stm->get_start_offset(), model::offset(2000));
917920
}
918921

0 commit comments

Comments
 (0)