Skip to content

Commit 79df6e8

Browse files
committed
sss
1 parent 6a51bc1 commit 79df6e8

File tree

10 files changed

+625
-4
lines changed

10 files changed

+625
-4
lines changed

presto-native-execution/presto_cpp/main/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ target_link_libraries(
6363
presto_function_metadata
6464
presto_connectors
6565
presto_http
66+
presto_thrift_server
6667
presto_operators
6768
presto_session_properties
6869
presto_velox_plan_conversion

presto-native-execution/presto_cpp/main/PrestoServer.cpp

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
#include "presto_cpp/main/operators/ShuffleRead.h"
4444
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
4545
#include "presto_cpp/main/types/VeloxPlanConversion.h"
46+
#include "thrift/server/ThriftServer.h"
4647
#include "velox/common/base/Counters.h"
4748
#include "velox/common/base/StatsReporter.h"
4849
#include "velox/common/caching/CacheTTLController.h"
@@ -619,6 +620,8 @@ void PrestoServer::run() {
619620
}
620621
};
621622

623+
startThriftServer(bindToNodeInternalAddressOnly, certPath, keyPath, ciphers);
624+
622625
// Start everything. After the return from the following call we are shutting
623626
// down.
624627
httpServer_->start(getHttpServerFilters(), [&](proxygen::HTTPServer* server) {
@@ -679,6 +682,7 @@ void PrestoServer::run() {
679682
taskManager_.reset();
680683
PRESTO_SHUTDOWN_LOG(INFO) << "Destroying HTTP Server";
681684
httpServer_.reset();
685+
thriftServer_.reset();
682686

683687
unregisterFileReadersAndWriters();
684688
unregisterFileSystems();
@@ -1070,6 +1074,7 @@ void PrestoServer::stop() {
10701074
httpServer_->stop();
10711075
PRESTO_SHUTDOWN_LOG(INFO) << "HTTP Server stopped.";
10721076
}
1077+
shutdownThriftServer();
10731078
}
10741079

10751080
size_t PrestoServer::numDriverThreads() const {
@@ -1492,7 +1497,7 @@ void PrestoServer::enableWorkerStatsReporting() {
14921497

14931498
void PrestoServer::initVeloxPlanValidator() {
14941499
VELOX_CHECK_NULL(planValidator_);
1495-
planValidator_ = std::make_unique<VeloxPlanValidator>();
1500+
planValidator_ = std::make_shared<VeloxPlanValidator>();
14961501
}
14971502

14981503
VeloxPlanValidator* PrestoServer::getVeloxPlanValidator() {
@@ -1793,6 +1798,75 @@ void PrestoServer::createTaskManager() {
17931798
driverExecutor_.get(), httpSrvCpuExecutor_.get(), spillerExecutor_.get());
17941799
}
17951800

1801+
void PrestoServer::startThriftServer(
1802+
bool bindToNodeInternalAddressOnly,
1803+
const std::string& certPath,
1804+
const std::string& keyPath,
1805+
const std::string& ciphers) {
1806+
auto* systemConfig = SystemConfig::instance();
1807+
bool thriftServerEnabled = systemConfig
1808+
->optionalProperty<bool>(std::string_view(
1809+
"presto.thrift-server.enabled"))
1810+
.value_or(true);
1811+
1812+
if (thriftServerEnabled) {
1813+
std::unique_ptr<thrift::ThriftConfig> thriftConfig;
1814+
folly::SocketAddress thriftAddress;
1815+
int thriftPort = systemConfig
1816+
->optionalProperty<int>(
1817+
std::string_view("presto.thrift-server.port"))
1818+
.value_or(9090);
1819+
if (bindToNodeInternalAddressOnly) {
1820+
thriftAddress.setFromHostPort(address_, thriftPort);
1821+
} else {
1822+
thriftAddress.setFromLocalPort(thriftPort);
1823+
}
1824+
thriftConfig = std::make_unique<thrift::ThriftConfig>(
1825+
thriftAddress, certPath, keyPath, ciphers);
1826+
thriftServer_ = std::make_unique<thrift::ThriftServer>(
1827+
std::move(thriftConfig),
1828+
pool_,
1829+
planValidator_,
1830+
taskManager_);
1831+
1832+
thriftServerFuture_ =
1833+
folly::via(folly::getGlobalCPUExecutor().get())
1834+
.thenTry([this](folly::Try<folly::Unit>) {
1835+
try {
1836+
PRESTO_STARTUP_LOG(INFO)
1837+
<< "Starting Thrift server asynchronously...";
1838+
thriftServer_->start();
1839+
PRESTO_STARTUP_LOG(INFO)
1840+
<< "Thrift server started successfully";
1841+
} catch (const std::exception& e) {
1842+
PRESTO_STARTUP_LOG(ERROR)
1843+
<< "Thrift server failed to start: " << e.what();
1844+
throw;
1845+
}
1846+
});
1847+
}
1848+
}
1849+
1850+
void PrestoServer::shutdownThriftServer() {
1851+
if (thriftServer_) {
1852+
PRESTO_SHUTDOWN_LOG(INFO) << "Stopping Thrift server";
1853+
thriftServer_->stop();
1854+
1855+
// Wait for Thrift server thread to complete with timeout
1856+
try {
1857+
std::move(thriftServerFuture_)
1858+
.within(std::chrono::seconds(5)) // 5-second timeout
1859+
.get();
1860+
PRESTO_SHUTDOWN_LOG(INFO) << "Thrift server stopped gracefully";
1861+
} catch (const std::exception& e) {
1862+
PRESTO_SHUTDOWN_LOG(WARNING)
1863+
<< "Thrift server shutdown timeout or error: " << e.what();
1864+
}
1865+
1866+
thriftServer_.reset();
1867+
}
1868+
}
1869+
17961870
void PrestoServer::reportNodeStats(proxygen::ResponseHandler* downstream) {
17971871
protocol::NodeStats nodeStats;
17981872

presto-native-execution/presto_cpp/main/PrestoServer.h

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ namespace facebook::presto::http {
4848
class HttpServer;
4949
}
5050

51+
namespace facebook::presto::thrift {
52+
class ThriftServer;
53+
}
54+
5155
namespace proxygen {
5256
class ResponseHandler;
5357
} // namespace proxygen
@@ -231,6 +235,16 @@ class PrestoServer {
231235

232236
virtual void createTaskManager();
233237

238+
/// Utility method to start the Thrift server if enabled
239+
void startThriftServer(
240+
bool bindToNodeInternalAddressOnly,
241+
const std::string& certPath,
242+
const std::string& keyPath,
243+
const std::string& ciphers);
244+
245+
/// Utility method to safely shutdown the Thrift server if running
246+
void shutdownThriftServer();
247+
234248
const std::string configDirectoryPath_;
235249

236250
std::shared_ptr<CoordinatorDiscoverer> coordinatorDiscoverer_;
@@ -269,20 +283,22 @@ class PrestoServer {
269283
// Executor for spilling.
270284
std::unique_ptr<folly::CPUThreadPoolExecutor> spillerExecutor_;
271285

272-
std::unique_ptr<VeloxPlanValidator> planValidator_;
286+
std::shared_ptr<VeloxPlanValidator> planValidator_;
273287

274288
std::unique_ptr<http::HttpClientConnectionPool> exchangeSourceConnectionPool_;
275289

276290
// If not null, the instance of AsyncDataCache used for in-memory file cache.
277291
std::shared_ptr<velox::cache::AsyncDataCache> cache_;
278292

279293
std::unique_ptr<http::HttpServer> httpServer_;
294+
std::unique_ptr<thrift::ThriftServer> thriftServer_;
295+
folly::Future<folly::Unit> thriftServerFuture_{folly::makeFuture()};
280296
std::unique_ptr<SignalHandler> signalHandler_;
281297
std::unique_ptr<Announcer> announcer_;
282298
std::unique_ptr<PeriodicHeartbeatManager> heartbeatManager_;
283299
std::shared_ptr<velox::memory::MemoryPool> pool_;
284300
std::shared_ptr<velox::memory::MemoryPool> nativeWorkerPool_;
285-
std::unique_ptr<TaskManager> taskManager_;
301+
std::shared_ptr<TaskManager> taskManager_;
286302
std::unique_ptr<TaskResource> taskResource_;
287303
std::atomic<NodeState> nodeState_{NodeState::kActive};
288304
folly::Synchronized<bool> shuttingDown_{false};

presto-native-execution/presto_cpp/main/common/tests/ConfigTest.cpp

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,88 @@ TEST_F(ConfigTest, optionalSystemConfigs) {
191191
ASSERT_EQ(config.discoveryUri(), "my uri");
192192
}
193193

194+
TEST_F(ConfigTest, thriftServerConfigs) {
195+
SystemConfig config;
196+
197+
// Test default values (when no thrift server configs are provided)
198+
init(config, {});
199+
200+
// Test with thrift server enabled
201+
init(config, {{"presto.thrift-server.enabled", "true"}});
202+
ASSERT_EQ(
203+
config.optionalProperty<bool>(std::string_view("presto.thrift-server.enabled")).value_or(false),
204+
true);
205+
206+
// Test thrift server port configuration
207+
init(config, {{"presto.thrift-server.port", "9090"}});
208+
ASSERT_EQ(
209+
config.optionalProperty<int>(std::string_view("presto.thrift-server.port")).value_or(9090),
210+
9090);
211+
212+
// Test thrift server max connections
213+
init(config, {{"presto.thrift-server.max-connections", "5000"}});
214+
ASSERT_EQ(
215+
config.optionalProperty<int>(std::string_view("presto.thrift-server.max-connections")).value_or(10000),
216+
5000);
217+
218+
// Test thrift server max requests
219+
init(config, {{"presto.thrift-server.max-requests", "5000"}});
220+
ASSERT_EQ(
221+
config.optionalProperty<int>(std::string_view("presto.thrift-server.max-requests")).value_or(10000),
222+
5000);
223+
224+
// Test thrift server idle timeout
225+
init(config, {{"presto.thrift-server.idle-timeout", "600000"}});
226+
ASSERT_EQ(
227+
config.optionalProperty<int>(std::string_view("presto.thrift-server.idle-timeout")).value_or(300000),
228+
600000);
229+
230+
// Test thrift server task expire time
231+
init(config, {{"presto.thrift-server.task-expire-time-ms", "600000"}});
232+
ASSERT_EQ(
233+
config.optionalProperty<int>(std::string_view("presto.thrift-server.task-expire-time-ms")).value_or(300000),
234+
600000);
235+
236+
// Test thrift server stream expire time
237+
init(config, {{"presto.thrift-server.stream-expire-time", "600000"}});
238+
ASSERT_EQ(
239+
config.optionalProperty<int>(std::string_view("presto.thrift-server.stream-expire-time")).value_or(300000),
240+
600000);
241+
242+
// Test multiple thrift server configs together
243+
init(config, {
244+
{"presto.thrift-server.enabled", "true"},
245+
{"presto.thrift-server.port", "9091"},
246+
{"presto.thrift-server.max-connections", "8000"},
247+
{"presto.thrift-server.max-requests", "8000"},
248+
{"presto.thrift-server.idle-timeout", "900000"},
249+
{"presto.thrift-server.task-expire-time-ms", "900000"},
250+
{"presto.thrift-server.stream-expire-time", "900000"}
251+
});
252+
253+
ASSERT_EQ(
254+
config.optionalProperty<bool>(std::string_view("presto.thrift-server.enabled")).value_or(false),
255+
true);
256+
ASSERT_EQ(
257+
config.optionalProperty<int>(std::string_view("presto.thrift-server.port")).value_or(9090),
258+
9091);
259+
ASSERT_EQ(
260+
config.optionalProperty<int>(std::string_view("presto.thrift-server.max-connections")).value_or(10000),
261+
8000);
262+
ASSERT_EQ(
263+
config.optionalProperty<int>(std::string_view("presto.thrift-server.max-requests")).value_or(10000),
264+
8000);
265+
ASSERT_EQ(
266+
config.optionalProperty<int>(std::string_view("presto.thrift-server.idle-timeout")).value_or(300000),
267+
900000);
268+
ASSERT_EQ(
269+
config.optionalProperty<int>(std::string_view("presto.thrift-server.task-expire-time-ms")).value_or(300000),
270+
900000);
271+
ASSERT_EQ(
272+
config.optionalProperty<int>(std::string_view("presto.thrift-server.stream-expire-time")).value_or(300000),
273+
900000);
274+
}
275+
194276
TEST_F(ConfigTest, optionalNodeConfigs) {
195277
NodeConfig config;
196278
init(config, {});

presto-native-execution/presto_cpp/main/thrift/CMakeLists.txt

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,32 @@ add_dependencies(presto_thrift_extra presto_thrift-cpp2)
7070
if(PRESTO_ENABLE_TESTING)
7171
add_subdirectory(tests)
7272
endif()
73+
74+
add_library(
75+
presto_thrift_server
76+
server/ThriftServer.cpp
77+
server/PrestoThriftServiceHandler.cpp
78+
)
79+
80+
target_include_directories(
81+
presto_thrift_server PUBLIC
82+
${presto_thrift_INCLUDES}
83+
${THRIFT_INCLUDES}
84+
${GLOG_INCLUDE_DIR}
85+
# Include main directory for PrestoThriftServiceHandler.h
86+
${CMAKE_SOURCE_DIR}/presto_cpp/main
87+
)
88+
89+
target_link_libraries(
90+
presto_thrift_server
91+
presto_thrift
92+
${presto_thrift_LIBRARIES}
93+
${THRIFT_LIBRARIES}
94+
${GLOG_LIBRARY}
95+
presto_cpp_main_common
96+
presto_cpp_main_types
97+
presto_task_lib
98+
${THRIFT_TRANSPORT}
99+
xsimd
100+
)
101+

presto-native-execution/presto_cpp/main/thrift/presto_thrift.thrift

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,45 @@ struct TaskUpdateRequest {
705705
6: optional TableWriteInfo tableWriteInfo;
706706
}
707707

708+
struct TaskResult {
709+
1: i64 sequence;
710+
2: i64 nextSequence;
711+
3: optional IOBufPtr data;
712+
4: bool complete;
713+
5: optional list<i64> remainingBytes;
714+
}
715+
708716
service PrestoThrift {
709-
void fake();
717+
/**
718+
* Get task results - corresponds to /v1/task/{taskId}/results/{bufferId}/{token}
719+
* @param taskId The ID of the task to get results for
720+
* @param bufferId The buffer ID to get results from
721+
* @param token Continuation token for paging
722+
* @param maxSizeBytes Maximum number of bytes to return
723+
* @param maxWaitMicros Maximum time to wait in microseconds
724+
* @param getDataSize Two phase protocol: if true, return the size of the data in the first phrase
725+
* @return TaskResult containing the data and metadata
726+
*/
727+
TaskResult getTaskResults(
728+
1: string taskId,
729+
2: i64 bufferId,
730+
3: i64 token,
731+
4: i64 maxSizeBytes,
732+
5: i64 maxWaitMicros,
733+
6: bool getDataSize,
734+
);
735+
736+
/**
737+
* Acknowledge task results - corresponds to /v1/task/{taskId}/results/{bufferId}/{token}/acknowledge
738+
* @param taskId The ID of the task to acknowledge results for
739+
* @param bufferId The buffer ID to acknowledge results for
740+
* @param token The token to acknowledge up to
741+
*/
742+
void acknowledgeTaskResults(1: string taskId, 2: i64 bufferId, 3: i64 token);
743+
744+
/**
745+
* Abort task results - corresponds to /v1/task/{taskId}/results
746+
* @param taskId The ID of the task to abort results for
747+
*/
748+
void abortTaskResults(1: string taskId, 2: i64 destination);
710749
}

0 commit comments

Comments
 (0)