Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions omniscidb/QueryEngine/CompilationOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ struct ExecutionOptions {
std::vector<size_t> outer_fragment_indices{};
bool multifrag_result = false;
bool preserve_order = false;
int64_t override_scan_limit = -1;

static ExecutionOptions fromConfig(const Config& config) {
auto eo = ExecutionOptions();
Expand All @@ -134,6 +135,7 @@ struct ExecutionOptions {

eo.multifrag_result = config.exec.enable_multifrag_rs;
eo.preserve_order = false;
eo.override_scan_limit = config.debug.override_scan_limit;

return eo;
}
Expand Down
6 changes: 6 additions & 0 deletions omniscidb/QueryEngine/Execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3248,6 +3248,11 @@ int32_t Executor::executePlan(const RelAlgExecutionUnit& ra_exe_unit,
if (interrupted_.load()) {
throw QueryExecutionError(ERR_INTERRUPTED);
}
// if (config_->debug.override_scan_limit > 0) {
// scan_limit = config_->debug.override_scan_limit;
// LOG(DEBUG2) << "scan_limit is overriden by the config. New value is " <<
// scan_limit;
// }

VLOG(2) << "bool(ra_exe_unit.union_all)=" << bool(ra_exe_unit.union_all)
<< " ra_exe_unit.input_descs="
Expand Down Expand Up @@ -3446,6 +3451,7 @@ int32_t Executor::executePlan(const RelAlgExecutionUnit& ra_exe_unit,
if (device_type == ExecutorDeviceType::CPU) {
const int32_t scan_limit_for_query =
ra_exe_unit_copy.union_all ? ra_exe_unit_copy.scan_limit : scan_limit;
LOG(DEBUG4) << "scan_limit_for_query=" << scan_limit_for_query;
const int32_t max_matched = scan_limit_for_query == 0
? query_exe_context->query_mem_desc_.getEntryCount()
: scan_limit_for_query;
Expand Down
30 changes: 8 additions & 22 deletions omniscidb/QueryEngine/NativeCodegen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1834,24 +1834,11 @@ void Executor::insertErrorCodeChecker(llvm::Function* query_func,
llvm::Value* err_lv = &*inst_it;
auto error_check_bb =
bb_it->splitBasicBlock(llvm::BasicBlock::iterator(br_instr), ".error_check");
llvm::Value* error_code_arg = nullptr;
auto arg_cnt = 0;
for (auto arg_it = query_func->arg_begin(); arg_it != query_func->arg_end();
arg_it++, ++arg_cnt) {
// since multi_frag_* func has anonymous arguments so we use arg_offset
// explicitly to capture "error_code" argument in the func's argument list
if (hoist_literals) {
if (arg_cnt == 9) {
error_code_arg = &*arg_it;
break;
}
} else {
if (arg_cnt == 8) {
error_code_arg = &*arg_it;
break;
}
}
}

// since multi_frag_* func has anonymous arguments so we use arg_offset
// explicitly to capture "error_code" argument in the func's argument list
unsigned arg_cnt = hoist_literals ? 9 : 8;
llvm::Value* error_code_arg = query_func->getArg(arg_cnt);
CHECK(error_code_arg);
llvm::Value* err_code = nullptr;
if (allow_runtime_query_interrupt) {
Expand All @@ -1869,6 +1856,9 @@ void Executor::insertErrorCodeChecker(llvm::Function* query_func,
detected_interrupt,
cgen_state_->llInt(Executor::ERR_INTERRUPTED),
detected_error);
interrupt_checker_ir_builder.CreateCall(
cgen_state_->module_->getFunction("record_error_code"),
std::vector<llvm::Value*>{err_code, error_code_arg});
interrupt_checker_ir_builder.CreateBr(error_check_bb);
llvm::ReplaceInstWithInst(&check_interrupt_br_instr,
llvm::BranchInst::Create(interrupt_check_bb));
Expand All @@ -1884,10 +1874,6 @@ void Executor::insertErrorCodeChecker(llvm::Function* query_func,
llvm::ICmpInst::ICMP_NE, err_code, cgen_state_->llInt(0));
auto error_bb = llvm::BasicBlock::Create(
cgen_state_->context_, ".error_exit", query_func, new_bb);
llvm::CallInst::Create(cgen_state_->module_->getFunction("record_error_code"),
std::vector<llvm::Value*>{err_code, error_code_arg},
"",
error_bb);
llvm::ReturnInst::Create(cgen_state_->context_, error_bb);
llvm::ReplaceInstWithInst(&br_instr,
llvm::BranchInst::Create(error_bb, new_bb, err_lv));
Expand Down
5 changes: 5 additions & 0 deletions omniscidb/QueryEngine/RelAlgExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,11 @@ ExecutionResult RelAlgExecutor::executeWorkUnit(
}
}
}
if (eo.override_scan_limit > 0) {
LOG(DEBUG2) << "scan_limit (=" << eo.override_scan_limit
<< ") is overriden by the config.";
ra_exe_unit.scan_limit = eo.override_scan_limit;
}

if (g_columnar_large_projections) {
const auto prefer_columnar = should_output_columnar(ra_exe_unit);
Expand Down
10 changes: 2 additions & 8 deletions omniscidb/QueryEngine/RuntimeFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1744,20 +1744,14 @@ extern "C" RUNTIME_EXPORT void multifrag_query(
}

extern "C" RUNTIME_EXPORT ALWAYS_INLINE DEVICE bool check_interrupt() {
if (check_interrupt_init(static_cast<unsigned>(INT_CHECK))) {
return true;
}
return false;
return check_interrupt_init(static_cast<unsigned>(INT_CHECK));
}

extern "C" RUNTIME_EXPORT bool check_interrupt_init(unsigned command) {
static std::atomic_bool runtime_interrupt_flag{false};

if (command == static_cast<unsigned>(INT_CHECK)) {
if (runtime_interrupt_flag.load()) {
return true;
}
return false;
return runtime_interrupt_flag.load();
}
if (command == static_cast<unsigned>(INT_ABORT)) {
runtime_interrupt_flag.store(true);
Expand Down
1 change: 1 addition & 0 deletions omniscidb/Shared/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ struct DebugConfig {
std::string build_ra_cache = "";
std::string use_ra_cache = "";
bool enable_automatic_ir_metadata = true;
int64_t override_scan_limit = -1;
};

struct StorageConfig {
Expand Down
13 changes: 13 additions & 0 deletions omniscidb/Tests/ArrowBasedExecuteTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17874,6 +17874,14 @@ TEST_F(Select, RowIdJoin) {
}
}

TEST_F(Select, ScanLimit) {
auto tmp = config().debug.override_scan_limit;
config().debug.override_scan_limit = 1;
c("SELECT AVG(CAST(x AS FLOAT)) FROM test WHERE x > 0 GROUP BY y;",
ExecutorDeviceType::CPU);
config().debug.override_scan_limit = tmp;
}

class SubqueryTestEnv : public ExecuteTestBase, public ::testing::Test {
protected:
void SetUp() override {
Expand Down Expand Up @@ -18053,6 +18061,11 @@ int main(int argc, char** argv) {
->default_value(config->exec.materialize_inner_join_tables)
->implicit_value(true),
"Materialize all inner tables for joins.");
desc.add_options()("cpu-only",
po::value<bool>(&config->exec.cpu_only)
->default_value(config->exec.cpu_only)
->implicit_value(true),
"Run on CPU only, even if GPUs are available.");
desc.add_options()(
"test-help",
"Print all ExecuteTest specific options (for gtest options use `--help`).");
Expand Down