Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 8f22a84

Browse files
committed
Use a dedicated thread for Calcite/Java queries
1 parent f4553f1 commit 8f22a84

File tree

10 files changed

+235
-91
lines changed

10 files changed

+235
-91
lines changed

omniscidb/Calcite/CalciteJNI.cpp

Lines changed: 130 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ class JVM {
8686
return instance_;
8787
}
8888

89+
static void destroyInstance() { instance_ = nullptr; }
90+
8991
// Get JNI environment for the current thread.
9092
// You souldn't pass this obect between threads. It should be deallocated
9193
// in the same thread it was requrested in. It shouldn't outlive JVM object.
@@ -167,13 +169,9 @@ std::shared_ptr<JVM> JVM::instance_;
167169

168170
} // namespace
169171

170-
class CalciteJNI::Impl {
172+
class CalciteJNI {
171173
public:
172-
Impl(SchemaProviderPtr schema_provider,
173-
ConfigPtr config,
174-
const std::string& udf_filename,
175-
size_t calcite_max_mem_mb)
176-
: schema_provider_(schema_provider), config_(config) {
174+
CalciteJNI(const std::string& udf_filename, size_t calcite_max_mem_mb) {
177175
// Initialize JVM.
178176
jvm_ = JVM::getInstance(calcite_max_mem_mb);
179177
auto env = jvm_->getEnv();
@@ -192,15 +190,19 @@ class CalciteJNI::Impl {
192190
findHashMap(env.get());
193191
}
194192

195-
~Impl() {
193+
~CalciteJNI() {
196194
auto env = jvm_->getEnv();
197195
for (auto obj : global_refs_) {
198196
env->DeleteGlobalRef(obj);
199197
}
198+
199+
JVM::destroyInstance();
200200
}
201201

202202
std::string process(const std::string& db_name,
203203
const std::string& sql_string,
204+
SchemaProviderPtr schema_provider,
205+
ConfigPtr config,
204206
const std::vector<FilterPushDownInfo>& filter_push_down_info,
205207
const bool legacy_syntax,
206208
const bool is_explain,
@@ -226,10 +228,10 @@ class CalciteJNI::Impl {
226228
env->NewObject(optimization_opts_cls_,
227229
optimization_opts_ctor_,
228230
(jboolean)is_view_optimize,
229-
(jboolean)config_->exec.watchdog.enable,
231+
(jboolean)config->exec.watchdog.enable,
230232
arg_filter_push_down_info);
231233
jobject arg_restriction = nullptr;
232-
auto schema_json = schema_to_json(schema_provider_);
234+
auto schema_json = schema_to_json(schema_provider);
233235
jstring arg_schema = env->NewStringUTF(schema_json.c_str());
234236

235237
jobject java_res = env->CallObjectMethod(handler_obj_,
@@ -516,9 +518,6 @@ class CalciteJNI::Impl {
516518
extension_fn_cls_, extension_fn_udf_ctor_, name_arg, args_arg, ret_arg);
517519
}
518520

519-
SchemaProviderPtr schema_provider_;
520-
ConfigPtr config_;
521-
522521
// com.mapd.parser.server.CalciteServerHandler instance and methods.
523522
jobject handler_obj_;
524523
jmethodID handler_process_;
@@ -564,41 +563,137 @@ class CalciteJNI::Impl {
564563
std::shared_ptr<JVM> jvm_;
565564
};
566565

567-
CalciteJNI::CalciteJNI(SchemaProviderPtr schema_provider,
568-
ConfigPtr config,
569-
const std::string& udf_filename,
570-
size_t calcite_max_mem_mb) {
571-
impl_ =
572-
std::make_unique<Impl>(schema_provider, config, udf_filename, calcite_max_mem_mb);
566+
CalciteMgr::~CalciteMgr() {
567+
{
568+
std::lock_guard<decltype(queue_mutex_)> lock(queue_mutex_);
569+
should_exit_ = true;
570+
}
571+
worker_cv_.notify_all();
572+
worker_.join();
573573
}
574574

575-
CalciteJNI::~CalciteJNI() {}
576-
577-
std::string CalciteJNI::process(
575+
std::string CalciteMgr::process(
578576
const std::string& db_name,
579577
const std::string& sql_string,
578+
SchemaProviderPtr schema_provider,
579+
ConfigPtr config,
580580
const std::vector<FilterPushDownInfo>& filter_push_down_info,
581581
const bool legacy_syntax,
582582
const bool is_explain,
583583
const bool is_view_optimize) {
584-
return impl_->process(db_name,
585-
sql_string,
586-
filter_push_down_info,
587-
legacy_syntax,
588-
is_explain,
589-
is_view_optimize);
584+
auto task = Task([&db_name,
585+
&sql_string,
586+
&filter_push_down_info,
587+
schema_provider,
588+
config,
589+
legacy_syntax,
590+
is_explain,
591+
is_view_optimize](CalciteJNI* calcite_jni) {
592+
CHECK(calcite_jni);
593+
return calcite_jni->process(db_name,
594+
sql_string,
595+
schema_provider,
596+
config,
597+
filter_push_down_info,
598+
legacy_syntax,
599+
is_explain,
600+
is_view_optimize);
601+
});
602+
auto result = task.get_future();
603+
submitTaskToQueue(std::move(task));
604+
605+
result.wait();
606+
return result.get();
590607
}
591608

592-
std::string CalciteJNI::getExtensionFunctionWhitelist() {
593-
return impl_->getExtensionFunctionWhitelist();
609+
std::string CalciteMgr::getExtensionFunctionWhitelist() {
610+
auto task = Task([](CalciteJNI* calcite_jni) {
611+
CHECK(calcite_jni);
612+
return calcite_jni->getExtensionFunctionWhitelist();
613+
});
614+
615+
auto result = task.get_future();
616+
submitTaskToQueue(std::move(task));
617+
618+
result.wait();
619+
return result.get();
594620
}
595-
std::string CalciteJNI::getUserDefinedFunctionWhitelist() {
596-
return impl_->getUserDefinedFunctionWhitelist();
621+
622+
std::string CalciteMgr::getUserDefinedFunctionWhitelist() {
623+
auto task = Task([](CalciteJNI* calcite_jni) {
624+
CHECK(calcite_jni);
625+
return calcite_jni->getUserDefinedFunctionWhitelist();
626+
});
627+
628+
auto result = task.get_future();
629+
submitTaskToQueue(std::move(task));
630+
631+
result.wait();
632+
return result.get();
597633
}
598-
std::string CalciteJNI::getRuntimeExtensionFunctionWhitelist() {
599-
return impl_->getRuntimeExtensionFunctionWhitelist();
634+
635+
std::string CalciteMgr::getRuntimeExtensionFunctionWhitelist() {
636+
auto task = Task([](CalciteJNI* calcite_jni) {
637+
CHECK(calcite_jni);
638+
return calcite_jni->getRuntimeExtensionFunctionWhitelist();
639+
});
640+
641+
auto result = task.get_future();
642+
submitTaskToQueue(std::move(task));
643+
644+
result.wait();
645+
return result.get();
600646
}
601-
void CalciteJNI::setRuntimeExtensionFunctions(const std::vector<ExtensionFunction>& udfs,
647+
648+
void CalciteMgr::setRuntimeExtensionFunctions(const std::vector<ExtensionFunction>& udfs,
602649
bool is_runtime) {
603-
return impl_->setRuntimeExtensionFunctions(udfs, is_runtime);
650+
auto task = Task([&udfs, is_runtime](CalciteJNI* calcite_jni) {
651+
CHECK(calcite_jni);
652+
calcite_jni->setRuntimeExtensionFunctions(udfs, is_runtime);
653+
return ""; // all tasks return strings
654+
});
655+
656+
auto result = task.get_future();
657+
submitTaskToQueue(std::move(task));
658+
659+
result.wait();
604660
}
661+
662+
CalciteMgr::CalciteMgr(const std::string& udf_filename, size_t calcite_max_mem_mb) {
663+
// todo: should register an exit handler for ctrl + c
664+
worker_ = std::thread(&CalciteMgr::worker, this, udf_filename, calcite_max_mem_mb);
665+
}
666+
667+
void CalciteMgr::worker(const std::string& udf_filename, size_t calcite_max_mem_mb) {
668+
auto calcite_jni = std::make_unique<CalciteJNI>(udf_filename, calcite_max_mem_mb);
669+
670+
std::unique_lock<std::mutex> lock(queue_mutex_);
671+
while (true) {
672+
worker_cv_.wait(lock, [this] { return !queue_.empty() || should_exit_; });
673+
if (should_exit_) {
674+
return;
675+
}
676+
677+
if (!queue_.empty()) {
678+
auto task = std::move(queue_.front());
679+
queue_.pop();
680+
681+
lock.unlock();
682+
task(calcite_jni.get());
683+
684+
lock.lock();
685+
}
686+
}
687+
}
688+
689+
void CalciteMgr::submitTaskToQueue(Task&& task) {
690+
std::unique_lock<decltype(queue_mutex_)> lock(queue_mutex_);
691+
692+
queue_.push(std::move(task));
693+
694+
lock.unlock();
695+
worker_cv_.notify_all();
696+
}
697+
698+
std::once_flag CalciteMgr::instance_init_flag_;
699+
std::unique_ptr<CalciteMgr> CalciteMgr::instance_;

omniscidb/Calcite/CalciteJNI.h

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
#pragma once
1818

19+
#include <future>
20+
#include <queue>
21+
1922
#include "QueryEngine/ExtensionFunctionsWhitelist.h"
2023
#include "SchemaMgr/SchemaProvider.h"
2124
#include "Shared/Config.h"
@@ -26,28 +29,57 @@ struct FilterPushDownInfo {
2629
int input_next;
2730
};
2831

29-
class CalciteJNI {
32+
class CalciteJNI;
33+
34+
/**
35+
* Run CalciteJNI on a single thread.
36+
*/
37+
class CalciteMgr {
3038
public:
31-
CalciteJNI(SchemaProviderPtr schema_provider,
32-
ConfigPtr config,
33-
const std::string& udf_filename = "",
34-
size_t calcite_max_mem_mb = 1024);
35-
~CalciteJNI();
39+
using Task = std::packaged_task<std::string(CalciteJNI* calcite_jni)>;
40+
41+
CalciteMgr(const CalciteMgr&) = delete;
42+
43+
~CalciteMgr();
44+
45+
static CalciteMgr* get(const std::string& udf_filename = "",
46+
size_t calcite_max_mem_mb = 1024) {
47+
std::call_once(instance_init_flag_, [=] {
48+
instance_ =
49+
std::unique_ptr<CalciteMgr>(new CalciteMgr(udf_filename, calcite_max_mem_mb));
50+
});
51+
return instance_.get();
52+
}
3653

3754
std::string process(const std::string& db_name,
3855
const std::string& sql_string,
56+
SchemaProviderPtr schema_provider,
57+
ConfigPtr config,
3958
const std::vector<FilterPushDownInfo>& filter_push_down_info = {},
4059
const bool legacy_syntax = false,
4160
const bool is_explain = false,
4261
const bool is_view_optimize = false);
43-
4462
std::string getExtensionFunctionWhitelist();
4563
std::string getUserDefinedFunctionWhitelist();
4664
std::string getRuntimeExtensionFunctionWhitelist();
65+
4766
void setRuntimeExtensionFunctions(const std::vector<ExtensionFunction>& udfs,
4867
bool is_runtime = true);
4968

5069
private:
51-
class Impl;
52-
std::unique_ptr<Impl> impl_;
70+
explicit CalciteMgr(const std::string& udf_filename, size_t calcite_max_mem_mb);
71+
72+
void worker(const std::string& udf_filename, size_t calcite_max_mem_mb);
73+
74+
void submitTaskToQueue(Task&& task);
75+
76+
std::mutex queue_mutex_;
77+
std::condition_variable worker_cv_;
78+
std::thread worker_;
79+
80+
std::queue<Task> queue_;
81+
82+
bool should_exit_{false};
83+
static std::once_flag instance_init_flag_;
84+
static std::unique_ptr<CalciteMgr> instance_;
5385
};

omniscidb/Tests/ArrowSQLRunner/ArrowSQLRunner.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -298,14 +298,13 @@ class ArrowSQLRunnerImpl {
298298

299299
Executor* getExecutor() { return executor_.get(); }
300300

301-
std::shared_ptr<CalciteJNI> getCalcite() { return calcite_; }
301+
CalciteMgr* getCalcite() { return calcite_; }
302302

303303
~ArrowSQLRunnerImpl() {
304304
executor_.reset();
305305
storage_.reset();
306306
rs_registry_.reset();
307307
schema_mgr_.reset();
308-
calcite_.reset();
309308
rel_alg_cache_.reset();
310309

311310
Executor::resetCodeCache(); // flush caches before tearing down buffer mgrs
@@ -334,7 +333,7 @@ class ArrowSQLRunnerImpl {
334333
executor_->setSchemaProvider(schema_mgr_);
335334

336335
if (config_->debug.use_ra_cache.empty() || !config_->debug.build_ra_cache.empty()) {
337-
calcite_ = std::make_shared<CalciteJNI>(schema_mgr_, config_, udf_filename, 1024);
336+
calcite_ = CalciteMgr::get(udf_filename, 1024);
338337

339338
if (config_->debug.use_ra_cache.empty()) {
340339
ExtensionFunctionsWhitelist::add(calcite_->getExtensionFunctionWhitelist());
@@ -356,7 +355,7 @@ class ArrowSQLRunnerImpl {
356355
std::shared_ptr<ArrowStorage> storage_;
357356
std::shared_ptr<hdk::ResultSetRegistry> rs_registry_;
358357
std::shared_ptr<SchemaMgr> schema_mgr_;
359-
std::shared_ptr<CalciteJNI> calcite_;
358+
CalciteMgr* calcite_;
360359
std::shared_ptr<RelAlgCache> rel_alg_cache_;
361360

362361
SQLiteComparator sqlite_comparator_;
@@ -520,7 +519,7 @@ Executor* getExecutor() {
520519
return ArrowSQLRunnerImpl::get()->getExecutor();
521520
}
522521

523-
std::shared_ptr<CalciteJNI> getCalcite() {
522+
CalciteMgr* getCalcite() {
524523
return ArrowSQLRunnerImpl::get()->getCalcite();
525524
}
526525

omniscidb/Tests/ArrowSQLRunner/ArrowSQLRunner.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
#include "BufferPoolStats.h"
2828

29-
class CalciteJNI;
29+
class CalciteMgr;
3030
class Executor;
3131
class RelAlgExecutor;
3232

@@ -124,7 +124,7 @@ DataMgr* getDataMgr();
124124

125125
Executor* getExecutor();
126126

127-
std::shared_ptr<CalciteJNI> getCalcite();
127+
CalciteMgr* getCalcite();
128128

129129
std::unique_ptr<RelAlgExecutor> makeRelAlgExecutor(const std::string& query_str);
130130

omniscidb/Tests/ArrowSQLRunner/RelAlgCache.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
#include <fstream>
2929

30-
RelAlgCache::RelAlgCache(std::shared_ptr<CalciteJNI> calcite,
30+
RelAlgCache::RelAlgCache(CalciteMgr* calcite,
3131
SchemaProviderPtr schema_provider,
3232
ConfigPtr config)
3333
: calcite_(calcite), schema_provider_(schema_provider), config_(config) {
@@ -89,6 +89,8 @@ std::string RelAlgCache::process(
8989

9090
auto ra = calcite_->process(db_name,
9191
sql_string,
92+
schema_provider_,
93+
config_,
9294
filter_push_down_info,
9395
legacy_syntax,
9496
is_explain,

0 commit comments

Comments
 (0)