Skip to content

Commit e65fc37

Browse files
petrosaggjkosh44
authored andcommitted
storage: make sure the health capability is dropped (#31038)
This is a slight oversight in #30858 that switched from the async operator builder that drops capabilities when it exits to the timely rc operator builder where capabilities need to dropped manually. Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
1 parent 5883f64 commit e65fc37

File tree

1 file changed

+8
-2
lines changed

1 file changed

+8
-2
lines changed

src/storage/src/source/source_reader_pipeline.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,11 +368,17 @@ where
368368
.set(mz_persist_client::metrics::encode_ts_metric(&resume_upper));
369369

370370
builder.build(move |mut caps| {
371-
let health_cap = caps.remove(1);
372-
move |_frontiers| {
371+
let mut health_cap = Some(caps.remove(1));
372+
move |frontiers| {
373373
let mut statuses_by_idx = BTreeMap::new();
374374
let mut health_output = health_output.activate();
375375

376+
if frontiers.iter().all(|f| f.is_empty()) {
377+
health_cap = None;
378+
return;
379+
}
380+
let health_cap = health_cap.as_mut().unwrap();
381+
376382
for (id, input, output) in export_handles.iter_mut() {
377383
while let Some((cap, data)) = input.next() {
378384
for (message, _, _) in data.iter() {

0 commit comments

Comments
 (0)