Skip to content

Commit 591db49

Browse files
authored
feat(flow-control): do flow control during evaluation too (#710)
1 parent 4b3b9a2 commit 591db49

File tree

3 files changed

+9
-9
lines changed

3 files changed

+9
-9
lines changed

src/execution/dumper.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ impl<'a> Dumper<'a> {
136136
key: value::KeyValue,
137137
file_path: PathBuf,
138138
) -> Result<()> {
139+
let _permit = import_op
140+
.concurrency_controller
141+
.acquire(concur_control::BYTES_UNKNOWN_YET)
142+
.await?;
139143
let mut collected_values_buffer = Vec::new();
140144
let (exports, error) = match self
141145
.evaluate_source_entry(import_op_idx, import_op, &key, &mut collected_values_buffer)

src/execution/evaluator.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,11 @@ pub async fn evaluate_source_entry(
464464
source_value: value::FieldValues,
465465
memory: &EvaluationMemory,
466466
) -> Result<EvaluateSourceEntryOutput> {
467+
let _permit = src_eval_ctx
468+
.import_op
469+
.concurrency_controller
470+
.acquire_bytes_with_reservation(|| source_value.estimated_byte_size())
471+
.await?;
467472
let root_schema = &src_eval_ctx.schema.schema;
468473
let root_scope_value = ScopeValueBuilder::new(root_schema.fields.len());
469474
let root_scope_entry = ScopeEntry::new(

src/execution/source_indexer.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -163,15 +163,6 @@ impl SourceIndexingContext {
163163

164164
{
165165
let _processing_permit = processing_sem.acquire().await?;
166-
let _concur_permit = match &source_data.value {
167-
interface::SourceValue::Existence(value) => {
168-
import_op
169-
.concurrency_controller
170-
.acquire_bytes_with_reservation(|| value.estimated_byte_size())
171-
.await?
172-
}
173-
interface::SourceValue::NonExistence => None,
174-
};
175166
let result = row_indexer::update_source_row(
176167
&SourceRowEvaluationContext {
177168
plan: &plan,

0 commit comments

Comments
 (0)