Skip to content

Commit 14f3f5d

Browse files
authored
[pegasus] fix operator blocking (#2347)
* fix operator blocking
1 parent a8bdeb8 commit 14f3f5d

File tree

1 file changed

+6
-5
lines changed
  • interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator

1 file changed

+6
-5
lines changed

interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/exchange.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ impl<D: Data> ExchangeByDataPush<D> {
176176

177177
fn update_end(
178178
&mut self, target: Option<usize>, end: &EndOfScope,
179-
) -> impl Iterator<Item = (u64, u64, DynPeers)> {
179+
) -> impl Iterator<Item=(u64, u64, DynPeers)> {
180180
let mut push_stat = Vec::with_capacity(self.pushes.len());
181181
for (index, p) in self.pushes.iter().enumerate() {
182182
let mut pushes = p.get_push_count(&end.tag).unwrap_or(0) as u64;
@@ -245,7 +245,7 @@ impl<D: Data> ExchangeByDataPush<D> {
245245
}
246246

247247
if has_block {
248-
if !batch.is_empty() {
248+
if !batch.is_empty() || batch.is_last() {
249249
trace_worker!(
250250
"output[{:?}] blocking on push batch(len={}) of {:?} ;",
251251
self.port,
@@ -534,9 +534,10 @@ impl<D: Data> BlockPush for ExchangeByDataPush<D> {
534534
.blocks
535535
.get_mut(tag)
536536
.expect("expect has block;");
537-
while let Some(x) = blocks.pop_front() {
538-
b.push_back(x);
537+
while let Some(x) = b.pop_back() {
538+
blocks.push_front(x);
539539
}
540+
*b = blocks;
540541
}
541542
Ok(false)
542543
} else {
@@ -590,7 +591,7 @@ impl<D: Data> ExchangeByBatchPush<D> {
590591

591592
fn update_end(
592593
&mut self, target: Option<usize>, end: &EndOfScope,
593-
) -> impl Iterator<Item = (u64, u64, DynPeers)> {
594+
) -> impl Iterator<Item=(u64, u64, DynPeers)> {
594595
let mut push_stat = Vec::with_capacity(self.pushes.len());
595596
for (index, p) in self.pushes.iter().enumerate() {
596597
let mut pushes = p.get_push_count(&end.tag).unwrap_or(0) as u64;

0 commit comments

Comments
 (0)