Skip to content

Commit 116eab7

Browse files
authored
Merge pull request #10734 from RinChanNOWWW/fix-duplicate
fix: fix duplicate processor.
2 parents f599894 + 8756c7c commit 116eab7

File tree

2 files changed

+24
-1
lines changed

2 files changed

+24
-1
lines changed

src/query/pipeline/core/src/processors/duplicate_processor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ impl Processor for DuplicateProcessor {
8080
return Ok(Event::Finished);
8181
}
8282

83-
if !can_push1 || !can_push2 {
83+
if (!is_finished1 && !can_push1) || (!is_finished2 && !can_push2) {
8484
return Ok(Event::NeedConsume);
8585
}
8686

src/query/pipeline/core/tests/it/pipelines/processors/duplicate.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,29 @@ async fn test_duplicate_output_finish() -> Result<()> {
7575
assert!(input.is_finished());
7676
}
7777

78+
// One output finished, one output no finished and can push.
79+
{
80+
let input = InputPort::create();
81+
let output1 = OutputPort::create();
82+
let output2 = OutputPort::create();
83+
let mut processor =
84+
DuplicateProcessor::create(input.clone(), output1.clone(), output2.clone(), false);
85+
86+
let upstream_output = OutputPort::create();
87+
let downstream_input1 = InputPort::create();
88+
let downstream_input2 = InputPort::create();
89+
90+
unsafe {
91+
connect(&input, &upstream_output);
92+
connect(&downstream_input1, &output1);
93+
connect(&downstream_input2, &output2);
94+
}
95+
96+
downstream_input1.finish();
97+
downstream_input2.set_need_data();
98+
assert!(matches!(processor.event()?, Event::NeedData));
99+
}
100+
78101
Ok(())
79102
}
80103

0 commit comments

Comments
 (0)