@@ -257,6 +257,7 @@ ss::future<checked<model::term_id, tx::errc>> rm_stm::begin_tx(
257257 std::chrono::milliseconds transaction_timeout_ms,
258258 model::partition_id tm) {
259259 auto state_lock = co_await _state_lock.hold_read_lock ();
260+ auto lso_lock_holder = co_await _lso_lock.hold_write_lock ();
260261 if (!co_await sync (_sync_timeout ())) {
261262 vlog (
262263 _ctx_log.trace ,
@@ -738,7 +739,7 @@ ss::future<tx::errc> rm_stm::do_abort_tx(
738739 // Or it may mean that a tx coordinator
739740 // - lost its state
740741 // - rolled back to previous op
741- // - the previous op happend to be an abort
742+ // - the previous op happened to be an abort
742743 // - the coordinator retried it
743744 //
744745 // In the first case the least impactful way to reject the request.
@@ -1264,6 +1265,9 @@ ss::future<result<kafka_result>> rm_stm::replicate_msg(
12641265}
12651266
12661267model::offset rm_stm::last_stable_offset () {
1268+ if (_as.abort_requested ()) [[unlikely]] {
1269+ return model::invalid_lso;
1270+ }
12671271 // There are two main scenarios we deal with here.
12681272 // 1. stm is still bootstrapping
12691273 // 2. stm is past bootstrapping.
@@ -1278,6 +1282,8 @@ model::offset rm_stm::last_stable_offset() {
12781282 // We optimize for the case where there are no inflight transactional
12791283 // batch to return the high water mark.
12801284 auto last_applied = last_applied_offset ();
1285+
1286+ // scenario 1: still bootstrapping
12811287 if (unlikely (
12821288 !_bootstrap_committed_offset
12831289 || last_applied < _bootstrap_committed_offset.value ())) {
@@ -1287,6 +1293,29 @@ model::offset rm_stm::last_stable_offset() {
12871293 return model::invalid_lso;
12881294 }
12891295
1296+ // scenario 2: past bootstrapping
1297+ auto read_units = _state_lock.try_hold_read_lock ();
1298+ if (!read_units) {
1299+ // A reset in progress means the stm may not be in a consistent state
1300+ // for LSO calculation. In this case we return the last known LSO to be
1301+ // conservative.
1302+ vlog (
1303+ _ctx_log.trace ,
1304+ " state machine is resetting, last_known_lso: {}, last_applied: {}" ,
1305+ _last_known_lso,
1306+ last_applied);
1307+ return _last_known_lso;
1308+ }
1309+ auto lso_read_units = _lso_lock.try_hold_read_lock ();
1310+ if (!lso_read_units) {
1311+ // LSO calculation is in progress, return last known LSO
1312+ vlog (
1313+ _ctx_log.trace ,
1314+ " lso update in progress, last_known_lso: {}, last_applied: {}" ,
1315+ _last_known_lso,
1316+ last_applied);
1317+ return _last_known_lso;
1318+ }
12901319 // Check for any in-flight transactions.
12911320 auto first_tx_start = model::offset::max ();
12921321 if (_is_tx_enabled && !_active_tx_producers.empty ()) {
@@ -1313,6 +1342,39 @@ model::offset rm_stm::last_stable_offset() {
13131342 // transactions.
13141343 lso = std::min (first_tx_start, next_to_apply);
13151344 } else if (synced_leader) {
1345+ // ////////////// WARNING ///////////
1346+ // there is a real bug lurking here that overestimates the LSO beyond
1347+ // an open transaction.
1348+ //
1349+
1350+ // The problem manifests when the LSO is requested after successful
1351+ // replication of begin_tx batch but before the stm has applied it.
1352+ // In this case the LSO may be advanced beyond the begin_tx batch offset
1353+ // because the leader doesn't yet 'know' about the begin_tx batch and
1354+ // may not consider it in LSO calculation.
1355+
1356+ // Another problem is we do not let lso move backwards once
1357+ // computed (see _last_known_lso update below), So even if the
1358+ // stm has applied the begin_tx later, we will not correct
1359+ // the LSO to reflect the begin_tx presence.
1360+
1361+ // There is a test that caught this issue in rm_stm_tests which
1362+ // is disabled for now until we can fix the underlying problem.
1363+
1364+ // The impact of this overestimation is that compaction may compact
1365+ // away open transaction begin marker as it relies on LSO. The
1366+ // chances are rare but not impossible :(. if at that point the replica
1367+ // restarts and there are no further updates in the transaction, the
1368+ // transaction has no record of ever beginning.
1369+
1370+ // We need a better way to track in-flight transactions for the purposes
1371+ // of LSO calculation.
1372+
1373+ // An obvious solution is to clamp LSO to next_to_apply in all cases
1374+ // but it was tried in the past and caused performance regressions
1375+ // in non transaction workloads like write_caching, acks=0/1. So that
1376+ // is not a viable solution.
1377+
13161378 // no inflight transactions in (last_applied, last_visible_index]
13171379 lso = model::next_offset (last_visible_index);
13181380 } else {
@@ -1834,14 +1896,15 @@ model::offset rm_stm::to_log_offset(kafka::offset k_offset) const {
18341896
18351897ss::future<raft::local_snapshot_applied>
18361898rm_stm::apply_local_snapshot (raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
1899+ auto data_buf = std::move (tx_ss_buf);
18371900 auto units = co_await _state_lock.hold_write_lock ();
18381901
18391902 vlog (
18401903 _ctx_log.trace ,
18411904 " applying snapshot with last included offset: {}" ,
18421905 hdr.offset );
18431906 tx_snapshot_v6 data;
1844- iobuf_parser data_parser (std::move (tx_ss_buf ));
1907+ iobuf_parser data_parser (std::move (data_buf ));
18451908 if (hdr.version == tx_snapshot_v4::version) {
18461909 tx_snapshot_v4 data_v4
18471910 = co_await reflection::async_adl<tx_snapshot_v4>{}.from (data_parser);
0 commit comments