Skip to content

Commit c4c56e2

Browse files
committed
Prewarm gandiva cache on startup. Make cache size configurable
1 parent 5598bc6 commit c4c56e2

File tree

8 files changed

+271
-11
lines changed

8 files changed

+271
-11
lines changed

cpp/cmake_modules/ThirdpartyToolchain.cmake

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,8 @@ else()
276276
BOOST_SOURCE_URL
277277
# These are trimmed boost bundles we maintain.
278278
# See cpp/build_support/trim-boost.sh
279-
"https://dl.bintray.com/ursalabs/arrow-boost/boost_${ARROW_BOOST_BUILD_VERSION_UNDERSCORES}.tar.gz"
280279
"https://dl.bintray.com/boostorg/release/${ARROW_BOOST_BUILD_VERSION}/source/boost_${ARROW_BOOST_BUILD_VERSION_UNDERSCORES}.tar.gz"
280+
"https://dl.bintray.com/ursalabs/arrow-boost/boost_${ARROW_BOOST_BUILD_VERSION_UNDERSCORES}.tar.gz"
281281
"https://github.com/boostorg/boost/archive/boost-${ARROW_BOOST_BUILD_VERSION}.tar.gz"
282282
# FIXME(ARROW-6407) automate uploading this archive to ensure it reflects
283283
# our currently used packages and doesn't fall out of sync with

cpp/src/gandiva/cache.h

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
#pragma once
1919

20+
#include <cstdlib>
21+
#include <iostream>
2022
#include <mutex>
2123

2224
#include "gandiva/lru_cache.h"
@@ -26,7 +28,12 @@ namespace gandiva {
2628
template <class KeyType, typename ValueType>
2729
class Cache {
2830
public:
29-
explicit Cache(size_t capacity = CACHE_SIZE) : cache_(capacity) {}
31+
explicit Cache(size_t capacity) : cache_(capacity) {
32+
std::cout << "Creating gandiva cache with capacity: " << capacity << std::endl;
33+
}
34+
35+
Cache() : Cache(GetCapacity()) {}
36+
3037
ValueType GetModule(KeyType cache_key) {
3138
arrow::util::optional<ValueType> result;
3239
mtx_.lock();
@@ -42,8 +49,24 @@ class Cache {
4249
}
4350

4451
private:
52+
static int GetCapacity() {
53+
int capacity;
54+
const char* env_cache_size = std::getenv("GANDIVA_CACHE_SIZE");
55+
if (env_cache_size != nullptr) {
56+
capacity = std::atoi(env_cache_size);
57+
if (capacity <= 0) {
58+
std::cout << "Invalid cache size provided. Using default cache size."
59+
<< std::endl;
60+
capacity = DEFAULT_CACHE_SIZE;
61+
}
62+
} else {
63+
capacity = DEFAULT_CACHE_SIZE;
64+
}
65+
return capacity;
66+
}
67+
4568
LruCache<KeyType, ValueType> cache_;
46-
static const int CACHE_SIZE = 250;
69+
static const int DEFAULT_CACHE_SIZE = 500;
4770
std::mutex mtx_;
4871
};
4972
} // namespace gandiva

cpp/src/gandiva/jni/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ set(PROTO_HDRS "${PROTO_OUTPUT_DIR}/Types.pb.h")
5454
set(JNI_HEADERS_DIR "${CMAKE_CURRENT_BINARY_DIR}/java")
5555
add_subdirectory(../../../../java/gandiva ./java/gandiva)
5656

57-
set(GANDIVA_LINK_LIBS ${ARROW_PROTOBUF_LIBPROTOBUF})
57+
set(GANDIVA_LINK_LIBS ${ARROW_PROTOBUF_LIBPROTOBUF} ${BOOST_FILESYSTEM_LIBRARY})
5858
if(ARROW_BUILD_STATIC)
5959
list(APPEND GANDIVA_LINK_LIBS gandiva_static)
6060
else()

cpp/src/gandiva/jni/jni_common.cc

Lines changed: 213 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,17 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
#include <arrow/builder.h>
19+
#include <arrow/record_batch.h>
20+
#include <arrow/type.h>
1821
#include <google/protobuf/io/coded_stream.h>
1922

23+
#include <boost/filesystem.hpp>
24+
#include <boost/uuid/uuid.hpp>
25+
#include <boost/uuid/uuid_generators.hpp>
26+
#include <boost/uuid/uuid_io.hpp>
27+
#include <fstream>
28+
#include <iostream>
2029
#include <map>
2130
#include <memory>
2231
#include <mutex>
@@ -25,10 +34,6 @@
2534
#include <utility>
2635
#include <vector>
2736

28-
#include <arrow/builder.h>
29-
#include <arrow/record_batch.h>
30-
#include <arrow/type.h>
31-
3237
#include "Types.pb.h"
3338
#include "gandiva/configuration.h"
3439
#include "gandiva/filter.h"
@@ -62,11 +67,15 @@ using gandiva::ConfigurationBuilder;
6267
using gandiva::FilterHolder;
6368
using gandiva::ProjectorHolder;
6469

70+
namespace fs = boost::filesystem;
71+
6572
// forward declarations
6673
NodePtr ProtoTypeToNode(const types::TreeNode& node);
6774

6875
static jint JNI_VERSION = JNI_VERSION_1_6;
6976

77+
static const char* kEnvPrewarmCacheDir = "GANDIVA_PREWARM_CACHE_DIR";
78+
7079
// extern refs - initialized for other modules.
7180
jclass configuration_builder_class_;
7281

@@ -82,6 +91,8 @@ static jfieldID vector_expander_ret_capacity_;
8291
gandiva::IdToModuleMap<std::shared_ptr<ProjectorHolder>> projector_modules_;
8392
gandiva::IdToModuleMap<std::shared_ptr<FilterHolder>> filter_modules_;
8493

94+
void PrewarmCache();
95+
8596
jint JNI_OnLoad(JavaVM* vm, void* reserved) {
8697
JNIEnv* env;
8798
if (vm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION) != JNI_OK) {
@@ -117,6 +128,9 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
117128
env->GetFieldID(vector_expander_ret_class_, "address", "J");
118129
vector_expander_ret_capacity_ =
119130
env->GetFieldID(vector_expander_ret_class_, "capacity", "J");
131+
132+
PrewarmCache();
133+
120134
return JNI_VERSION;
121135
}
122136

@@ -572,6 +586,192 @@ void releaseProjectorInput(jbyteArray schema_arr, jbyte* schema_bytes,
572586
env->ReleaseByteArrayElements(exprs_arr, exprs_bytes, JNI_ABORT);
573587
}
574588

589+
void PrewarmCache() {
590+
try {
591+
auto prewarm_cache_dir = std::getenv(kEnvPrewarmCacheDir);
592+
if (prewarm_cache_dir == nullptr) {
593+
std::cout << "[Gandiva Cache Prewarm] No cache directory env variable is set. "
594+
"Skipping prewarming"
595+
<< std::endl;
596+
return;
597+
}
598+
599+
fs::path path(prewarm_cache_dir);
600+
std::cout << path.string() << "\n";
601+
if (!fs::is_directory(path)) {
602+
std::cerr << "[Gandiva Cache Prewarm] Prewarm cache dir env variable set is not a "
603+
"directory"
604+
<< std::endl;
605+
return;
606+
}
607+
608+
fs::directory_iterator end_iter;
609+
for (fs::directory_iterator dir_iter(path); dir_iter != end_iter; ++dir_iter) {
610+
try {
611+
if (fs::is_regular_file(dir_iter->status())) {
612+
std::ifstream fin;
613+
fin.open(dir_iter->path().string(), std::ios::binary);
614+
if (!fin.is_open()) {
615+
std::cerr << "[Gandiva Cache Prewarm] Failure opening warmup cache file"
616+
<< std::endl;
617+
continue;
618+
}
619+
620+
fin.seekg(0, std::ios::end);
621+
size_t remaining = fin.tellg();
622+
fin.seekg(0, std::ios::beg);
623+
624+
int32_t schema_len;
625+
if (remaining < sizeof(schema_len)) {
626+
std::cerr << "[Gandiva Cache Prewarm] Invalid file." << std::endl;
627+
continue;
628+
}
629+
fin.read(reinterpret_cast<char*>(&schema_len), sizeof schema_len);
630+
remaining -= sizeof(schema_len);
631+
632+
if (remaining < (size_t)schema_len) {
633+
std::cerr << "[Gandiva Cache Prewarm] Invalid file." << std::endl;
634+
continue;
635+
}
636+
std::vector<char> schema_vec(schema_len);
637+
fin.read(schema_vec.data(), schema_len);
638+
remaining -= (size_t)schema_len;
639+
640+
int32_t exprs_len;
641+
if (remaining < sizeof(exprs_len)) {
642+
std::cerr << "[Gandiva Cache Prewarm] Invalid file." << std::endl;
643+
continue;
644+
}
645+
fin.read(reinterpret_cast<char*>(&exprs_len), sizeof exprs_len);
646+
remaining -= sizeof(exprs_len);
647+
648+
if (remaining != (size_t)exprs_len) {
649+
std::cerr << "[Gandiva Cache Prewarm] Invalid file." << std::endl;
650+
continue;
651+
}
652+
std::vector<char> exprs_vec(exprs_len);
653+
fin.read(exprs_vec.data(), exprs_len);
654+
655+
fin.close();
656+
657+
uint8_t* schema_bytes = reinterpret_cast<uint8_t*>(schema_vec.data());
658+
uint8_t* exprs_bytes = reinterpret_cast<uint8_t*>(exprs_vec.data());
659+
660+
std::shared_ptr<Projector> projector;
661+
types::Schema schema;
662+
types::ExpressionList exprs;
663+
ExpressionVector expr_vector;
664+
SchemaPtr schema_ptr;
665+
FieldVector ret_types;
666+
gandiva::Status status;
667+
auto mode = gandiva::SelectionVector::MODE_NONE;
668+
669+
ConfigurationBuilder configuration_builder;
670+
std::shared_ptr<Configuration> config = configuration_builder.build();
671+
672+
if (!ParseProtobuf(schema_bytes, schema_len, &schema)) {
673+
std::cerr << "[Gandiva Cache Prewarm] Failed to parse protobuf for schema"
674+
<< std::endl;
675+
continue;
676+
}
677+
678+
if (!ParseProtobuf(exprs_bytes, exprs_len, &exprs)) {
679+
std::cerr << "[Gandiva Cache Prewarm] Failed to parse protobuf for expr list"
680+
<< std::endl;
681+
continue;
682+
}
683+
684+
// convert types::Schema to arrow::Schema
685+
schema_ptr = ProtoTypeToSchema(schema);
686+
if (schema_ptr == nullptr) {
687+
std::cerr << "[Gandiva Cache Prewarm] Failed to convert protobuf schema to "
688+
"arrow type"
689+
<< std::endl;
690+
continue;
691+
}
692+
693+
// create Expression out of the list of exprs
694+
for (int i = 0; i < exprs.exprs_size(); i++) {
695+
ExpressionPtr root = ProtoTypeToExpression(exprs.exprs(i));
696+
697+
if (root == nullptr) {
698+
std::cerr << "[Gandiva Cache Prewarm] Failed to convert protobuf expr to "
699+
"arrow type"
700+
<< std::endl;
701+
continue;
702+
}
703+
704+
expr_vector.push_back(root);
705+
ret_types.push_back(root->result());
706+
}
707+
708+
status = Projector::Make(schema_ptr, expr_vector, mode, config, &projector);
709+
710+
if (!status.ok()) {
711+
std::cerr << "[Gandiva Cache Prewarm] Failed to create a projector module";
712+
continue;
713+
}
714+
std::cout << "[Gandiva Cache Prewarm] Built and cached a projector from the "
715+
"expression and schema in the file"
716+
<< std::endl;
717+
}
718+
} catch (const std::exception& ex) {
719+
std::cerr << "[Gandiva Cache Prewarm] " << dir_iter->path().filename() << " "
720+
<< ex.what() << std::endl;
721+
}
722+
}
723+
724+
} catch (const std::exception& ex) {
725+
std::cerr << "[Gandiva Cache Prewarm] Failed prewarming the cache: " << ex.what()
726+
<< std::endl;
727+
}
728+
}
729+
730+
void CacheExpressionAndSchemaForPrewarmOnStartup(char* schema_bytes, int32_t schema_len,
731+
char* exprs_bytes, int32_t exprs_len) {
732+
try {
733+
auto env_path = std::getenv(kEnvPrewarmCacheDir);
734+
if (env_path == nullptr) {
735+
return;
736+
}
737+
738+
fs::path path(env_path);
739+
if (!fs::is_directory(path) && !fs::create_directories(path)) {
740+
std::cerr << "[CacheExpression] Failed to create directory to save the schema and "
741+
"expressions"
742+
<< std::endl;
743+
return;
744+
}
745+
746+
boost::uuids::uuid uuid = boost::uuids::random_generator()();
747+
748+
path /= boost::uuids::to_string(uuid);
749+
750+
std::ofstream fout;
751+
fout.open(path.string(), std::ios::binary | std::ios::out);
752+
if (!fout.is_open()) {
753+
std::cerr
754+
<< "[CacheExpression] Failed to open file to write the schema and expression"
755+
<< std::endl;
756+
return;
757+
}
758+
759+
fout.write(reinterpret_cast<char*>(&schema_len), sizeof(schema_len));
760+
fout.write(schema_bytes, schema_len);
761+
762+
fout.write(reinterpret_cast<char*>(&exprs_len), sizeof(exprs_len));
763+
fout.write(exprs_bytes, exprs_len);
764+
765+
fout.close();
766+
767+
std::cout << "[CacheExpression] Cached schema and expression bytes to a file"
768+
<< std::endl;
769+
} catch (const std::exception& ex) {
770+
std::cerr << "[CacheExpression] Failed to cache the expression to file " << ex.what()
771+
<< std::endl;
772+
}
773+
}
774+
575775
JNIEXPORT jlong JNICALL Java_org_apache_arrow_gandiva_evaluator_JniWrapper_buildProjector(
576776
JNIEnv* env, jobject obj, jbyteArray schema_arr, jbyteArray exprs_arr,
577777
jint selection_vector_type, jlong configuration_id) {
@@ -648,7 +848,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_gandiva_evaluator_JniWrapper_build
648848
break;
649849
}
650850
// good to invoke the evaluator now
651-
status = Projector::Make(schema_ptr, expr_vector, mode, config, &projector);
851+
bool cache_hit;
852+
status = Projector::Make(schema_ptr, expr_vector, mode, config, &projector, &cache_hit);
652853

653854
if (!status.ok()) {
654855
ss << "Failed to make LLVM module due to " << status.message() << "\n";
@@ -660,6 +861,13 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_gandiva_evaluator_JniWrapper_build
660861
holder = std::shared_ptr<ProjectorHolder>(
661862
new ProjectorHolder(schema_ptr, ret_types, std::move(projector)));
662863
module_id = projector_modules_.Insert(holder);
864+
865+
if (!cache_hit) {
866+
CacheExpressionAndSchemaForPrewarmOnStartup(
867+
reinterpret_cast<char*>(schema_bytes), schema_len,
868+
reinterpret_cast<char*>(exprs_bytes), exprs_len);
869+
}
870+
663871
releaseProjectorInput(schema_arr, schema_bytes, exprs_arr, exprs_bytes, env);
664872
return module_id;
665873

cpp/src/gandiva/lru_cache.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#pragma once
1919

20+
#include <iostream>
2021
#include <list>
2122
#include <unordered_map>
2223
#include <utility>
@@ -107,6 +108,7 @@ class LruCache {
107108

108109
private:
109110
void evict() {
111+
std::cout << "Evicted a cache item from the lru cache" << std::endl;
110112
// evict item from the end of most recently used list
111113
typename list_type::iterator i = --lru_list_.end();
112114
map_.erase(*i);

0 commit comments

Comments
 (0)