Skip to content
This repository was archived by the owner on Apr 16, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 35 additions & 33 deletions cpp/src/JniHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
#include "velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.h"
#endif

using namespace facebook::velox;

namespace io::trino::bridge {

const std::string JniHandle::kGlutenTrinoFunctionPrefix("trino.bridge.");
Expand All @@ -64,24 +62,25 @@ JniHandle::JniHandle(const NativeSqlTaskExecutionManagerPtr& javaManager)

void JniHandle::initializeVelox() {
// Setup and register.
filesystems::registerLocalFileSystem();
facebook::velox::filesystems::registerLocalFileSystem();

velox::parquet::registerParquetReaderFactory();
facebook::velox::parquet::registerParquetReaderFactory();

velox::filesystems::registerHdfsFileSystem();
facebook::velox::filesystems::registerHdfsFileSystem();

#ifdef ENABLE_GLUTEN_TRINO_S3
velox::filesystems::registerS3FileSystem();
facebook::velox::filesystems::registerS3FileSystem();
#endif

velox::dwrf::registerDwrfReaderFactory();
facebook::velox::dwrf::registerDwrfReaderFactory();
// Register Velox functions
static const std::string kPrestoDefaultPrefix{"presto.default."};
velox::functions::prestosql::registerAllScalarFunctions(kPrestoDefaultPrefix);
velox::aggregate::prestosql::registerAllAggregateFunctions(kPrestoDefaultPrefix);
velox::window::prestosql::registerAllWindowFunctions(kPrestoDefaultPrefix);
facebook::velox::functions::prestosql::registerAllScalarFunctions(kPrestoDefaultPrefix);
facebook::velox::aggregate::prestosql::registerAllAggregateFunctions(
kPrestoDefaultPrefix);
facebook::velox::window::prestosql::registerAllWindowFunctions(kPrestoDefaultPrefix);

velox::parse::registerTypeResolver();
facebook::velox::parse::registerTypeResolver();

TrinoVectorSerde::registerVectorSerde();

Expand All @@ -99,15 +98,15 @@ void JniHandle::initializeVeloxMemory() {
LOG(INFO) << "Starting with node memory " << (memoryBytes >> 30) << "GB";

if (config.getUseMmapAllocator()) {
memory::MmapAllocator::Options options;
facebook::velox::memory::MmapAllocator::Options options;
options.capacity = memoryBytes;
options.useMmapArena = config.getUseMmapArena();
options.mmapArenaCapacityRatio = config.getMmapArenaCapacityRatio();
allocator_ = std::make_shared<memory::MmapAllocator>(options);
allocator_ = std::make_shared<facebook::velox::memory::MmapAllocator>(options);
} else {
allocator_ = memory::MemoryAllocator::createDefaultInstance();
allocator_ = facebook::velox::memory::MemoryAllocator::createDefaultInstance();
}
memory::MemoryAllocator::setDefaultInstance(allocator_.get());
facebook::velox::memory::MemoryAllocator::setDefaultInstance(allocator_.get());

if (config.getAsyncDataCacheEnabled()) {
std::unique_ptr<cache::SsdCache> ssd;
Expand All @@ -132,7 +131,7 @@ void JniHandle::initializeVeloxMemory() {
}

// Set up velox memory manager.
memory::MemoryManagerOptions options;
facebook::velox::memory::MemoryManagerOptions options;
options.capacity = memoryBytes;
options.checkUsageLeak = config.getEnableMemoryLeakCheck();
if (config.getEnableMemoryArbitration()) {
Expand All @@ -142,11 +141,11 @@ void JniHandle::initializeVeloxMemory() {
options.memoryPoolInitCapacity = config.getInitMemoryPoolCapacity();
options.memoryPoolTransferCapacity = config.getMinMemoryPoolTransferCapacity();
}
const auto& manager = memory::MemoryManager::getInstance(options);
const auto& manager = facebook::velox::memory::MemoryManager::getInstance(options);
LOG(INFO) << "Memory manager has been setup: " << manager.toString();
}

TaskHandlePtr JniHandle::createTaskHandle(const io::trino::TrinoTaskId& id,
TaskHandlePtr JniHandle::createTaskHandle(const TrinoTaskId& id,
const protocol::PlanFragment& plan) {
return withWLock([&id, &plan, this]() {
if (auto iter = taskMap_.find(id.fullId()); iter != taskMap_.end()) {
Expand All @@ -163,10 +162,9 @@ TaskHandlePtr JniHandle::createTaskHandle(const io::trino::TrinoTaskId& id,
}

bool isBroadcast = false;
if (auto handle =
std::dynamic_pointer_cast<io::trino::protocol::SystemPartitioningHandle>(
plan.partitioningScheme.partitioning.handle.connectorHandle)) {
if (handle->function == io::trino::protocol::SystemPartitionFunction::BROADCAST) {
if (auto handle = std::dynamic_pointer_cast<protocol::SystemPartitioningHandle>(
plan.partitioningScheme.partitioning.handle.connectorHandle)) {
if (handle->function == protocol::SystemPartitionFunction::BROADCAST) {
LOG(INFO) << fmt::format("Task {} contains broadcast output buffer.",
id.fullId());
isBroadcast = true;
Expand All @@ -178,19 +176,21 @@ TaskHandlePtr JniHandle::createTaskHandle(const io::trino::TrinoTaskId& id,
numPartitions);

auto& config = NativeConfigs::instance();
auto queryCtx = std::make_shared<core::QueryCtx>(
auto queryCtx = std::make_shared<facebook::velox::core::QueryCtx>(
driverExecutor_.get(), std::move(config.getQueryConfigs()),
std::move(config.getConnectorConfigs()), cache::AsyncDataCache::getInstance(),
memory::defaultMemoryManager().addRootPool(id.fullId(),
config.getQueryMaxMemoryPerNode()));

VeloxInteractiveQueryPlanConverter convertor(getPlanConvertorMemPool().get());
core::PlanFragment fragment = convertor.toVeloxQueryPlan(plan, nullptr, id.fullId());
facebook::velox::core::PlanFragment fragment =
convertor.toVeloxQueryPlan(plan, nullptr, id.fullId());

LOG(INFO) << fmt::format("Task {},\n PlanFragment: {}", id.fullId(),
fragment.planNode->toString(true, true));

auto task = exec::Task::create(id.fullId(), std::move(fragment), id.id(), queryCtx);
auto task = facebook::velox::exec::Task::create(id.fullId(), std::move(fragment),
id.id(), queryCtx);
std::string parentPath = config.getSpillDir();
if (!parentPath.empty()) {
std::string fullPath = parentPath + "/spill-" + id.fullId();
Expand All @@ -206,7 +206,7 @@ TaskHandlePtr JniHandle::createTaskHandle(const io::trino::TrinoTaskId& id,
});
}

TaskHandlePtr JniHandle::getTaskHandle(const io::trino::TrinoTaskId& id) {
TaskHandlePtr JniHandle::getTaskHandle(const TrinoTaskId& id) {
return withRLock([&id, this]() -> TaskHandle* {
if (auto iter = taskMap_.find(id.fullId()); iter != taskMap_.end()) {
return iter->second.get();
Expand All @@ -216,7 +216,7 @@ TaskHandlePtr JniHandle::getTaskHandle(const io::trino::TrinoTaskId& id) {
});
}

bool JniHandle::removeTask(const io::trino::TrinoTaskId& id) {
bool JniHandle::removeTask(const TrinoTaskId& id) {
return withWLock([this, &id]() {
if (auto taskIter = taskMap_.find(id.fullId()); taskIter != taskMap_.end()) {
auto&& task = taskIter->second->getTask();
Expand All @@ -231,7 +231,8 @@ bool JniHandle::removeTask(const io::trino::TrinoTaskId& id) {
});
}

void JniHandle::terminateTask(const io::trino::TrinoTaskId& id, exec::TaskState state) {
void JniHandle::terminateTask(const TrinoTaskId& id,
facebook::velox::exec::TaskState state) {
TaskHandlePtr taskHandle;
withRLock([this, &id, &taskHandle]() {
if (auto taskIter = taskMap_.find(id.fullId()); taskIter != taskMap_.end()) {
Expand All @@ -255,14 +256,15 @@ void JniHandle::terminateTask(const io::trino::TrinoTaskId& id, exec::TaskState
}
}

std::shared_ptr<memory::MemoryPool> JniHandle::getPlanConvertorMemPool() {
static std::shared_ptr<memory::MemoryPool> pool =
velox::memory::addDefaultLeafMemoryPool("PlanConvertor");
std::shared_ptr<facebook::velox::memory::MemoryPool>
JniHandle::getPlanConvertorMemPool() {
static std::shared_ptr<facebook::velox::memory::MemoryPool> pool =
facebook::velox::memory::addDefaultLeafMemoryPool("PlanConvertor");
return pool;
}

void JniHandle::printTaskStatus(const io::trino::TrinoTaskId& id,
const std::shared_ptr<exec::Task>& task) {
void JniHandle::printTaskStatus(
const TrinoTaskId& id, const std::shared_ptr<facebook::velox::exec::Task>& task) {
std::stringstream ss;
ss << fmt::format("Task {} status:\n", id.fullId());

Expand Down
3 changes: 3 additions & 0 deletions cpp/src/NativeConfigs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ void NativeConfigs::initialize(const std::string& configJsonString) {
GET_KEY_FROM_JSON(httpMaxAllocateBytes, configJson);
GET_KEY_FROM_JSON(httpsClientCertAndKeyPath, configJson);
GET_KEY_FROM_JSON(httpsSupportedCiphers, configJson);
GET_KEY_FROM_JSON(exchangeMaxErrorDuration, configJson);
GET_KEY_FROM_JSON(exchangeRequestTimeout, configJson);
GET_KEY_FROM_JSON(exchangeImmediateBufferTransfer, configJson);

if (configJson.contains("logVerboseModules")) {
std::string _logVerboseModules;
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/NativeConfigs.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <unordered_map>

#include "velox/common/base/BitUtil.h"
#include "velox/core/QueryConfig.h"

namespace facebook::velox {
class Config;
Expand Down Expand Up @@ -108,6 +109,15 @@ class NativeConfigs : public boost::noncopyable {
inline const std::string& getHttpsSupportedCiphers() const {
return httpsSupportedCiphers;
}
inline const std::chrono::duration<double> getExchangeMaxErrorDuration() const {
return facebook::velox::core::toDuration(exchangeMaxErrorDuration);
}
inline const std::chrono::duration<double> getExchangeRequestTimeout() const {
return facebook::velox::core::toDuration(exchangeRequestTimeout);
}
inline const bool& getExchangeImmediateBufferTransfer() const {
return exchangeImmediateBufferTransfer;
}

private:
std::atomic_bool initialized{false};
Expand Down Expand Up @@ -158,6 +168,9 @@ class NativeConfigs : public boost::noncopyable {
uint64_t httpMaxAllocateBytes = 64 << 10;
std::string httpsClientCertAndKeyPath = "";
std::string httpsSupportedCiphers = "ECDHE-ECDSA-AES256-GCM-SHA384,AES256-GCM-SHA384";
std::string exchangeMaxErrorDuration = "30s";
std::string exchangeRequestTimeout = "10s";
bool exchangeImmediateBufferTransfer = false;
};

} // namespace io::trino::bridge
5 changes: 2 additions & 3 deletions cpp/src/TrinoBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ JNIEXPORT void JNICALL Java_io_trino_jni_TrinoBridge_registerOutputPartitionList
return;
}
if (taskHandle) {
auto manager = exec::PartitionedOutputBufferManager::getInstance().lock();
auto manager = exec::OutputBufferManager::getInstance().lock();
int destination = jPartitionId;

auto& output = taskHandle->getPartitionOutputData(destination);
Expand Down Expand Up @@ -305,8 +305,7 @@ JNIEXPORT jint JNICALL Java_io_trino_jni_TrinoBridge_getBufferStep1(
size_t data_num = output->withLock(
[&taskId, &destination, &taskHandle](PartitionOutputData& data) {
if (data.noMoreData() && data.getOutputDataNum() == 0) {
auto manager =
exec::PartitionedOutputBufferManager::getInstance().lock();
auto manager = exec::OutputBufferManager::getInstance().lock();
manager->deleteResults(taskId.fullId(), destination);
}
return data.getOutputDataNum();
Expand Down
Loading