Skip to content

Commit 4c9dce0

Browse files
committed
impl. yield for other operators
1 parent 9f89347 commit 4c9dce0

File tree

12 files changed

+336
-221
lines changed

12 files changed

+336
-221
lines changed

src/jogasaki/executor/process/impl/ops/aggregate_group.cpp

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -161,21 +161,25 @@ operation_status aggregate_group::operator()(
161161
if (ctx.aborted()) {
162162
return operation_status_kind::aborted;
163163
}
164-
for(std::size_t i=0, n=arguments_.size(); i < n; ++i) {
165-
// append value store the values
166-
auto& store = ctx.stores_[i];
167-
auto& arg = arguments_[i];
168-
auto src = ctx.input_variables().store().ref();
169-
copy_value(
170-
src,
171-
arg.offset_,
172-
arg.nullity_offset_,
173-
arg.nullable_,
174-
store
175-
);
176-
}
164+
if (ctx.state() != context_state::calling_child) {
165+
for(std::size_t i=0, n=arguments_.size(); i < n; ++i) {
166+
// append value store the values
167+
auto& store = ctx.stores_[i];
168+
auto& arg = arguments_[i];
169+
auto src = ctx.input_variables().store().ref();
170+
copy_value(
171+
src,
172+
arg.offset_,
173+
arg.nullity_offset_,
174+
arg.nullable_,
175+
store
176+
);
177+
}
178+
179+
if (! last_member) {
180+
return operation_status_kind::ok;
181+
}
177182

178-
if (last_member) {
179183
// do aggregation from value store and create column values
180184
for(std::size_t i=0, n=columns_.size(); i < n; ++i) {
181185
auto& c = columns_[i];
@@ -191,19 +195,24 @@ operation_status aggregate_group::operator()(
191195
ctx.function_arg_stores_[i]
192196
);
193197
}
194-
195-
if (downstream_) {
196-
if(auto st = unsafe_downcast<record_operator>(downstream_.get())->process_record(context); !st) {
197-
ctx.abort();
198-
return operation_status_kind::aborted;
199-
}
198+
}
199+
if (downstream_) {
200+
ctx.state(context_state::calling_child);
201+
auto st = unsafe_downcast<record_operator>(downstream_.get())->process_record(context);
202+
if (st.kind() == operation_status_kind::yield) {
203+
return operation_status_kind::yield;
200204
}
201-
// reset
202-
for(std::size_t i=0, n=columns_.size(); i < n; ++i) {
203-
ctx.stores_[i].reset();
204-
ctx.resources_[i]->deallocate_after(memory::lifo_paged_memory_resource::initial_checkpoint);
205-
ctx.nulls_resources_[i]->deallocate_after(memory::lifo_paged_memory_resource::initial_checkpoint);
205+
if (! st) {
206+
ctx.abort();
207+
return operation_status_kind::aborted;
206208
}
209+
ctx.state(context_state::running_operator_body);
210+
}
211+
// reset
212+
for(std::size_t i=0, n=columns_.size(); i < n; ++i) {
213+
ctx.stores_[i].reset();
214+
ctx.resources_[i]->deallocate_after(memory::lifo_paged_memory_resource::initial_checkpoint);
215+
ctx.nulls_resources_[i]->deallocate_after(memory::lifo_paged_memory_resource::initial_checkpoint);
207216
}
208217
return operation_status_kind::ok;
209218
}

src/jogasaki/executor/process/impl/ops/find.cpp

Lines changed: 73 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -145,16 +145,24 @@ operation_status find::call_downstream(
145145
context_base::memory_resource* resource,
146146
abstract::task_context* context
147147
) {
148-
auto& tx = ctx.strand() != nullptr ? *ctx.strand() : *ctx.tx_->object();
149-
if(auto res = field_mapper_.process(k, v, target, *ctx.stg_, tx, resource, *ctx.req_context());
150-
res != status::ok) {
151-
return error_abort(ctx, res);
148+
if (ctx.state() != context_state::calling_child) {
149+
auto& tx = ctx.strand() != nullptr ? *ctx.strand() : *ctx.tx_->object();
150+
if(auto res = field_mapper_.process(k, v, target, *ctx.stg_, tx, resource, *ctx.req_context());
151+
res != status::ok) {
152+
return error_abort(ctx, res);
153+
}
152154
}
153155
if (downstream_) {
154-
if(auto st = unsafe_downcast<record_operator>(downstream_.get())->process_record(context); !st) {
156+
ctx.state(context_state::calling_child);
157+
auto st = unsafe_downcast<record_operator>(downstream_.get())->process_record(context);
158+
if (st.kind() == operation_status_kind::yield) {
159+
return operation_status_kind::yield;
160+
}
161+
if (! st) {
155162
ctx.abort();
156163
return operation_status_kind::aborted;
157164
}
165+
ctx.state(context_state::running_operator_body);
158166
}
159167
return operation_status_kind::ok;
160168
}
@@ -207,57 +215,82 @@ operation_status find::operator()(class find_context& ctx, abstract::task_contex
207215
std::string_view k{static_cast<char*>(ctx.key_.data()), len};
208216
auto& tx = ctx.strand() != nullptr ? *ctx.strand() : *ctx.tx_->object();
209217
if (! use_secondary_) {
210-
auto& stg = *ctx.stg_;
211-
if(auto res = stg.content_get(tx, k, v); res != status::ok) {
212-
finish(context);
213-
utils::modify_concurrent_operation_status(*ctx.tx_->object(), res, false);
214-
if (res == status::not_found) {
215-
return operation_status_kind::ok;
218+
if (ctx.state() != context_state::calling_child) {
219+
auto& stg = *ctx.stg_;
220+
if(auto res = stg.content_get(tx, k, v); res != status::ok) {
221+
finish(context);
222+
utils::modify_concurrent_operation_status(*ctx.tx_->object(), res, false);
223+
if (res == status::not_found) {
224+
return operation_status_kind::ok;
225+
}
226+
handle_kvs_errors(*ctx.req_context(), res);
227+
return error_abort(ctx, res);
216228
}
217-
handle_kvs_errors(*ctx.req_context(), res);
218-
return error_abort(ctx, res);
219229
}
220230
auto ret = call_downstream(ctx, k, v, target, resource, context);
231+
if (ret.kind() == operation_status_kind::yield) {
232+
return operation_status_kind::yield;
233+
}
221234
finish(context);
222235
return ret;
223236
}
224-
auto& stg = *ctx.secondary_stg_;
225-
std::unique_ptr<kvs::iterator> it{};
226-
if(auto res = stg.content_scan(tx,
227-
k, kvs::end_point_kind::prefixed_inclusive,
228-
k, kvs::end_point_kind::prefixed_inclusive,
229-
it
230-
); res != status::ok) {
231-
finish(context);
232-
handle_kvs_errors(*ctx.req_context(), res);
233-
return error_abort(ctx, res);
234-
}
235-
while(true) {
236-
if(auto res = it->next(); res != status::ok) {
237+
auto& secondary_stg = *ctx.secondary_stg_;
238+
if(ctx.state() != context_state::calling_child) {
239+
if(auto res = secondary_stg.content_scan(tx,
240+
k, kvs::end_point_kind::prefixed_inclusive,
241+
k, kvs::end_point_kind::prefixed_inclusive,
242+
ctx.it_
243+
); res != status::ok) {
237244
finish(context);
238-
if (res == status::not_found) {
239-
return operation_status_kind::ok;
240-
}
241245
handle_kvs_errors(*ctx.req_context(), res);
242246
return error_abort(ctx, res);
243247
}
244-
if(auto res = it->read_key(k); res != status::ok) {
245-
utils::modify_concurrent_operation_status(*ctx.tx_->object(), res, true); //FIXME tx_
246-
// shirakami returns error here even if next() above returns ok
247-
// (e.g. not_found for concurrently deleted entry or concurrent_operation for concurrently inserted)
248-
// skip the record and continue to next
249-
if (res == status::not_found) {
250-
continue;
248+
}
249+
while(true) {
250+
if(ctx.state() != context_state::calling_child) {
251+
if(auto res = ctx.it_->next(); res != status::ok) {
252+
ctx.it_.reset();
253+
finish(context);
254+
if (res == status::not_found) {
255+
return operation_status_kind::ok;
256+
}
257+
handle_kvs_errors(*ctx.req_context(), res);
258+
return error_abort(ctx, res);
251259
}
252-
finish(context);
253-
handle_kvs_errors(*ctx.req_context(), res);
254-
return error_abort(ctx, res);
260+
if(auto res = ctx.it_->read_key(k); res != status::ok) {
261+
utils::modify_concurrent_operation_status(*ctx.tx_->object(), res, true); //FIXME tx_
262+
// shirakami returns error here even if next() above returns ok
263+
// (e.g. not_found for concurrently deleted entry or concurrent_operation for concurrently inserted)
264+
// skip the record and continue to next
265+
if (res == status::not_found) {
266+
continue;
267+
}
268+
ctx.it_.reset();
269+
finish(context);
270+
handle_kvs_errors(*ctx.req_context(), res);
271+
return error_abort(ctx, res);
272+
}
273+
} else {
274+
// resume after yield: re-read key from current iterator position (iterator not advanced)
275+
if(auto res = ctx.it_->read_key(k); res != status::ok) {
276+
ctx.it_.reset();
277+
finish(context);
278+
handle_kvs_errors(*ctx.req_context(), res);
279+
return error_abort(ctx, res);
280+
}
281+
}
282+
auto ret = call_downstream(ctx, k, v, target, resource, context);
283+
if (ret.kind() == operation_status_kind::yield) {
284+
// ctx.it_ is preserved in context for resume
285+
return operation_status_kind::yield;
255286
}
256-
if(auto ret = call_downstream(ctx, k, v, target, resource, context); ! ret) {
287+
if (! ret) {
288+
ctx.it_.reset();
257289
finish(context);
258290
return ret;
259291
}
260292
}
293+
ctx.it_.reset();
261294
finish(context);
262295
return operation_status_kind::ok;
263296
}

src/jogasaki/executor/process/impl/ops/find_context.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class find_context : public context_base {
7575
transaction_context* tx_{};
7676
data::aligned_buffer key_{};
7777
kvs::transaction* strand_{};
78+
std::unique_ptr<kvs::iterator> it_{};
7879
};
7980

8081
}

src/jogasaki/executor/process/impl/ops/flatten.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,16 @@ operation_status flatten::operator()(flatten_context& ctx, abstract::task_contex
6262
return operation_status_kind::aborted;
6363
}
6464
if (downstream_) {
65-
if(auto st = unsafe_downcast<record_operator>(downstream_.get())->process_record(context); !st) {
65+
ctx.state(context_state::calling_child);
66+
auto st = unsafe_downcast<record_operator>(downstream_.get())->process_record(context);
67+
if (st.kind() == operation_status_kind::yield) {
68+
return operation_status_kind::yield;
69+
}
70+
if (! st) {
6671
ctx.abort();
6772
return operation_status_kind::aborted;
6873
}
74+
ctx.state(context_state::running_operator_body);
6975
}
7076
return operation_status_kind::ok;
7177
}

src/jogasaki/executor/process/impl/ops/index_join.h

Lines changed: 51 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -329,55 +329,71 @@ class index_join : public record_operator {
329329
return operation_status_kind::aborted;
330330
}
331331
auto resource = ctx.varlen_resource();
332-
nullify_output_variables(ctx.output_variables().store().ref());
333332
auto& tx = ctx.strand_ != nullptr ? *ctx.strand_ : *ctx.tx_->object();
334333
context_helper helper{ctx.task_context()};
335334
expr::evaluator_context ectx{
336335
resource,
337336
ctx.req_context() ? ctx.req_context()->transaction().get() : nullptr
338337
};
339338
ectx.blob_session(std::addressof(helper.blob_session_container()));
340-
bool matched = ctx.matcher_->template process<MatchInfo>(
341-
ectx,
342-
*ctx.req_context(),
343-
ctx.input_variables(),
344-
ctx.output_variables(),
345-
*ctx.primary_stg_,
346-
ctx.secondary_stg_.get(),
347-
tx,
348-
resource
349-
);
350-
if(matched || join_kind_ == join_kind::left_outer) {
351-
do {
352-
if (condition_) {
353-
// newly create c instead of re-using ectx since errors in ectx might have bad impact //TODO resolve this
354-
expr::evaluator_context c{
355-
resource,
356-
ctx.req_context() ? ctx.req_context()->transaction().get() : nullptr
357-
};
358-
c.blob_session(std::addressof(helper.blob_session_container()));
359-
auto r = evaluate_bool(c, evaluator_, ctx.input_variables(), resource);
360-
if (r.error()) {
361-
return handle_expression_error(ctx, r, c);
362-
}
363-
if(! r.template to<bool>()) {
364-
if(join_kind_ != join_kind::left_outer) {
365-
// inner join: skip record
366-
continue;
339+
if (ctx.state() != context_state::calling_child) {
340+
nullify_output_variables(ctx.output_variables().store().ref());
341+
ctx.matched_ = ctx.matcher_->template process<MatchInfo>(
342+
ectx,
343+
*ctx.req_context(),
344+
ctx.input_variables(),
345+
ctx.output_variables(),
346+
*ctx.primary_stg_,
347+
ctx.secondary_stg_.get(),
348+
tx,
349+
resource
350+
);
351+
}
352+
if(ctx.matched_ || join_kind_ == join_kind::left_outer) {
353+
while(true) {
354+
bool should_call_downstream = true;
355+
if (ctx.state() != context_state::calling_child) {
356+
if (condition_) {
357+
// newly create c instead of re-using ectx since errors in ectx might have bad impact //TODO resolve this
358+
expr::evaluator_context c{
359+
resource,
360+
ctx.req_context() ? ctx.req_context()->transaction().get() : nullptr
361+
};
362+
c.blob_session(std::addressof(helper.blob_session_container()));
363+
auto r = evaluate_bool(c, evaluator_, ctx.input_variables(), resource);
364+
if (r.error()) {
365+
return handle_expression_error(ctx, r, c);
366+
}
367+
if(! r.template to<bool>()) {
368+
if(join_kind_ != join_kind::left_outer) {
369+
// inner join: skip record, advance to next
370+
should_call_downstream = false;
371+
} else {
372+
// left outer join: nullify output variables and send record downstream
373+
nullify_output_variables(ctx.output_variables().store().ref());
374+
}
367375
}
368-
// left outer join: nullify output variables and send record downstream
369-
nullify_output_variables(ctx.output_variables().store().ref());
370376
}
371377
}
372-
if (downstream_) {
373-
if(auto st = unsafe_downcast<record_operator>(downstream_.get())->process_record(context); !st) {
378+
if (should_call_downstream && downstream_) {
379+
ctx.state(context_state::calling_child);
380+
auto st = unsafe_downcast<record_operator>(downstream_.get())->process_record(context);
381+
if (st.kind() == operation_status_kind::yield) {
382+
return operation_status_kind::yield;
383+
}
384+
if (! st) {
374385
ctx.abort();
375386
return operation_status_kind::aborted;
376387
}
388+
ctx.state(context_state::running_operator_body);
389+
// clean output variables for next record just in case
390+
nullify_output_variables(ctx.output_variables().store().ref());
377391
}
378-
// clean output variables for next record just in case
379-
nullify_output_variables(ctx.output_variables().store().ref());
380-
} while (matched && ctx.matcher_->next(*ctx.req_context()));
392+
// advance to next matching record; exit loop if no more
393+
if(! (ctx.matched_ && ctx.matcher_->next(*ctx.req_context()))) {
394+
break;
395+
}
396+
}
381397
}
382398
// normally `res` is not_found here indicating there are no more records to process
383399
if(auto res = ctx.matcher_->result(); res != status::ok && res != status::not_found) {

src/jogasaki/executor/process/impl/ops/index_join_context.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ class index_join_context : public context_base {
100100
transaction_context* tx_{};
101101
std::unique_ptr<details::matcher<MatchInfo>> matcher_{};
102102
kvs::transaction* strand_{};
103+
bool matched_{false};
103104
};
104105

105106
} // namespace jogasaki::executor::process::impl::ops

0 commit comments

Comments
 (0)