Skip to content

Commit 0b1cf29

Browse files
Merge pull request #25946 from vbotbuildovich/backport-pr-25925-v25.1.x-321
[v25.1.x] [CORE-9906] datalake: fix translation double-stop
2 parents f696298 + ffc623d commit 0b1cf29

File tree

4 files changed

+20
-10
lines changed

4 files changed

+20
-10
lines changed

src/v/datalake/translation/scheduling.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ class translator {
231231
* matter what stat it is in (e.g. running, waiting, idle). The translator
232232
* is free to clear the request after taking action.
233233
*/
234-
virtual void set_finish_translation() {}
234+
virtual void set_finish_translation() = 0;
235235

236236
/**
237237
* Return true if the translator is still in the progress of satisfying the

src/v/datalake/translation/scheduling_policies.cc

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -384,19 +384,18 @@ ss::future<> fair_scheduling_policy::on_resource_exhaustion(
384384
> b.status().memory_bytes_reserved;
385385
});
386386

387-
auto num_running = executor.running.size();
388387
// pick the earliest scheduled translator and force a flush.
388+
auto& executable = *executor.running.begin();
389389
vlog(
390390
datalake_log.debug,
391391
"[{}] stopping translator due to memory exhaustion",
392-
*executor.running.begin());
393-
executor.stop_translation(
394-
*executor.running.begin(), translator::stop_reason::oom);
395-
396-
while (mem_tracker.memory_exhausted() && !executor.as.abort_requested()
397-
&& executor.running.size() == num_running) {
398-
co_await ss::sleep_abortable(polling_interval, executor.as);
399-
}
392+
executable);
393+
co_await finish_translator(
394+
executor,
395+
finish_choice_info(
396+
executable.translator_ptr()->id(),
397+
finish_choice_info::status::running,
398+
translator::stop_reason::oom));
400399
}
401400

402401
enum fair_scheduling_policy::finish_choice_info::status

src/v/datalake/translation/tests/scheduler_fixture.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include "ssx/future-util.h"
1616
#include "test_utils/randoms.h"
1717

18+
#include <seastar/util/defer.hh>
19+
1820
using namespace std::chrono_literals;
1921

2022
namespace datalake::translation::scheduling {
@@ -163,6 +165,8 @@ ss::future<> mock_translator::translation_loop() {
163165
{
164166
co_await _wait_for_scheduler_cb.wait(
165167
[this] { return _translation_state.has_value(); });
168+
auto clear_finish_request = ss::defer(
169+
[this] { _finish_translation_requested = false; });
166170
auto holder = _translation_state->gate.hold();
167171
auto deadline = _translation_state->translate_for;
168172
auto start_time = clock::now();

src/v/datalake/translation/tests/scheduler_fixture.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ class mock_translator : public translator {
4343
std::chrono::milliseconds current_lag_ms() const override {
4444
return std::chrono::milliseconds{0};
4545
}
46+
void set_finish_translation() final {
47+
_finish_translation_requested = true;
48+
}
49+
bool get_finish_translation() final {
50+
return _finish_translation_requested;
51+
}
4652

4753
private:
4854
// Mock of a single parquet file writer.
@@ -91,6 +97,7 @@ class mock_translator : public translator {
9197
ss::timer<clock> _translation_timer;
9298
ss::condition_variable _wait_for_scheduler_cb;
9399
clock::time_point _next_checkpoint;
100+
bool _finish_translation_requested{false};
94101
};
95102

96103
// A translator that overshoots deadline and requires explict force flushing

0 commit comments

Comments
 (0)