We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent a229523 commit 0296faeCopy full SHA for 0296fae
be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -66,6 +66,13 @@ bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState* local_st
66
local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes);
67
data_block->swap(block->_data_block);
68
}
69
+
70
+ std::unique_lock l(*_m[channel_id]);
71
+ // data_queue locked so that the size_approx is consistent with the actual queue size
72
+ if (_data_queue[channel_id].data_queue.size_approx() == 0) {
73
+ local_state->_dependency->block();
74
+ }
75
76
return true;
77
} else if (all_finished) {
78
*eos = true;
0 commit comments