Skip to content

Commit 4ed44a6

Browse files
authored
Merge pull request #27880 from vbotbuildovich/backport-pr-27865-v25.2.x-350
[v25.2.x] c/tx: fix begin of transactions without a fence batch
2 parents 145e58c + c3fb87f commit 4ed44a6

File tree

2 files changed

+80
-1
lines changed

2 files changed

+80
-1
lines changed

src/v/cluster/producer_state.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ void producer_state::apply_data(
414414
}
415415
_requests.stm_apply(bid, header.ctx.term, offset);
416416
if (bid.is_transactional) {
417-
if (!_transaction_state) {
417+
if (!_transaction_state || !_transaction_state->is_in_progress()) {
418418
// possible if begin batch got truncated.
419419
_transaction_state
420420
= std::make_unique<producer_partition_transaction_state>(

src/v/cluster/tests/producer_state_tests.cc

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,3 +479,82 @@ FIXTURE_TEST(test_negative_sequence_number, test_fixture) {
479479
result.error(), cluster::errc::sequence_out_of_order);
480480
}
481481
}
482+
483+
FIXTURE_TEST(test_transaction_start_without_fence, test_fixture) {
484+
create_producer_state_manager(1, 1);
485+
auto producer = new_producer();
486+
auto defer = ss::defer(
487+
[&] { manager().deregister_producer(*producer, std::nullopt); });
488+
validate_producer_count(1);
489+
490+
// begin a transaction with fence batch
491+
auto fence = make_fence_batch(
492+
producer->id(),
493+
model::tx_seq{0},
494+
std::chrono::milliseconds{10000},
495+
model::partition_id{0});
496+
497+
auto begin_header = fence.header();
498+
producer->apply_transaction_begin(
499+
begin_header, read_fence_batch(std::move(fence)));
500+
BOOST_REQUIRE(producer->has_transaction_in_progress());
501+
502+
auto num_records = random_generators::get_int(1, 10);
503+
// add data to transaction
504+
auto batch = model::test::make_random_batch(
505+
{.offset = model::offset{1},
506+
.allow_compression = true,
507+
.count = num_records,
508+
.bt = model::record_batch_type::raft_data,
509+
.enable_idempotence = true,
510+
.producer_id = producer->id().id,
511+
.producer_epoch = producer->id().epoch,
512+
.base_sequence = 0,
513+
.is_transactional = true});
514+
auto last_offset = batch.last_offset();
515+
auto bid = model::batch_identity::from(batch.header());
516+
auto request = producer->try_emplace_request(bid, model::term_id{1}, true);
517+
518+
BOOST_REQUIRE(!request.has_error());
519+
// producer has an inflight request
520+
BOOST_REQUIRE(!producer->can_evict());
521+
producer->apply_data(batch.header(), kafka::offset{10});
522+
523+
// commit the transaction.
524+
producer->apply_transaction_end(model::control_record_type::tx_commit);
525+
526+
// verify the begin and end of transaction
527+
const auto& tx_state = producer->transaction_state();
528+
BOOST_REQUIRE(bool(tx_state));
529+
BOOST_REQUIRE_EQUAL(
530+
tx_state->status, partition_transaction_status::committed);
531+
BOOST_REQUIRE_EQUAL(tx_state->first, model::offset{0}); // fence
532+
BOOST_REQUIRE_EQUAL(tx_state->last, last_offset); // last offset of batch
533+
534+
// Now start another transaction without a fence batch
535+
// This simulates a prefix truncated fence batch
536+
// it should still be possible to start a transaction
537+
auto another_batch = model::test::make_random_batch(
538+
{.offset = model::offset{last_offset + model::offset{10}},
539+
.allow_compression = true,
540+
.count = num_records,
541+
.bt = model::record_batch_type::raft_data,
542+
.enable_idempotence = true,
543+
.producer_id = producer->id().id,
544+
.producer_epoch = producer->id().epoch,
545+
.base_sequence = 10,
546+
.is_transactional = true});
547+
548+
auto another_bid = model::batch_identity::from(another_batch.header());
549+
auto another_request = producer->try_emplace_request(
550+
another_bid, model::term_id{1}, true);
551+
BOOST_REQUIRE(!another_request.has_error());
552+
producer->apply_data(another_batch.header(), kafka::offset{10});
553+
BOOST_REQUIRE(producer->has_transaction_in_progress());
554+
BOOST_REQUIRE(!producer->can_evict());
555+
const auto& another_tx_state = producer->transaction_state();
556+
BOOST_REQUIRE(bool(another_tx_state));
557+
BOOST_REQUIRE_EQUAL(
558+
another_tx_state->status, partition_transaction_status::ongoing);
559+
BOOST_REQUIRE_EQUAL(another_tx_state->first, another_batch.base_offset());
560+
}

0 commit comments

Comments
 (0)