1919#include " presto_cpp/main/thrift/ThriftIO.h"
2020#include " presto_cpp/main/thrift/gen-cpp2/PrestoThrift.h"
2121#include " presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
22+ #include " presto_cpp/main/thrift/ThriftUtils.h"
2223
2324namespace facebook ::presto {
2425
@@ -210,25 +211,35 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
210211 const protocol::TaskId& taskId,
211212 const std::string& updateJson,
212213 const bool summarize,
213- long startProcessCpuTime)>& createOrUpdateFunc) {
214+ long startProcessCpuTime,
215+ bool receiveThrift)>& createOrUpdateFunc) {
214216 protocol::TaskId taskId = pathMatch[1 ];
215217 bool summarize = message->hasQueryParam (" summarize" );
218+
219+ auto & headers = message->getHeaders ();
220+ auto acceptHeader = headers.getSingleOrEmpty (proxygen::HTTP_HEADER_ACCEPT);
221+ auto sendThrift =
222+ acceptHeader.find (http::kMimeTypeApplicationThrift ) != std::string::npos;
223+ auto contentHeader = headers.getSingleOrEmpty (proxygen::HTTP_HEADER_CONTENT_TYPE);
224+ auto receiveThrift =
225+ contentHeader.find (http::kMimeTypeApplicationThrift ) != std::string::npos;
226+
216227 return new http::CallbackRequestHandler (
217- [this , taskId, summarize, createOrUpdateFunc](
228+ [this , taskId, summarize, createOrUpdateFunc, sendThrift, receiveThrift ](
218229 proxygen::HTTPMessage* /* message*/ ,
219230 const std::vector<std::unique_ptr<folly::IOBuf>>& body,
220231 proxygen::ResponseHandler* downstream,
221232 std::shared_ptr<http::CallbackRequestHandlerState> handlerState) {
222233 folly::via (
223234 httpSrvCpuExecutor_,
224- [this , &body, taskId, summarize, createOrUpdateFunc]() {
235+ [this , &body, taskId, summarize, createOrUpdateFunc, receiveThrift ]() {
225236 const auto startProcessCpuTimeNs = util::getProcessCpuTimeNs ();
226237 std::string updateJson = util::extractMessageBody (body);
227238
228239 std::unique_ptr<protocol::TaskInfo> taskInfo;
229240 try {
230241 taskInfo = createOrUpdateFunc (
231- taskId, updateJson, summarize, startProcessCpuTimeNs);
242+ taskId, updateJson, summarize, startProcessCpuTimeNs, receiveThrift );
232243 } catch (const velox::VeloxException& e) {
233244 // Creating an empty task, putting errors inside so that next
234245 // status fetch from coordinator will catch the error and well
@@ -243,12 +254,19 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
243254 throw ;
244255 }
245256 }
246- return json (* taskInfo) ;
257+ return taskInfo;
247258 })
248259 .via (folly::EventBaseManager::get ()->getEventBase ())
249- .thenValue ([downstream, handlerState]( auto && taskInfoJson ) {
260+ .thenValue ([downstream, handlerState, sendThrift](std::unique_ptr<protocol::TaskInfo> taskInfo ) {
250261 if (!handlerState->requestExpired ()) {
251- http::sendOkResponse (downstream, taskInfoJson);
262+ if (sendThrift) {
263+ thrift::TaskInfo thriftTaskInfo;
264+ protocol::toThrift (*taskInfo, thriftTaskInfo);
265+ http::sendOkThriftResponse (
266+ downstream, thriftWrite (thriftTaskInfo));
267+ } else {
268+ http::sendOkResponse (downstream, json (*taskInfo));
269+ }
252270 }
253271 })
254272 .thenError (
@@ -277,7 +295,8 @@ proxygen::RequestHandler* TaskResource::createOrUpdateBatchTask(
277295 [&](const protocol::TaskId& taskId,
278296 const std::string& updateJson,
279297 const bool summarize,
280- long startProcessCpuTime) {
298+ long startProcessCpuTime,
299+ bool /* receiveThrift*/ ) {
281300 protocol::BatchTaskUpdateRequest batchUpdateRequest =
282301 json::parse (updateJson);
283302 auto updateRequest = batchUpdateRequest.taskUpdateRequest ;
@@ -329,13 +348,25 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTask(
329348 [&](const protocol::TaskId& taskId,
330349 const std::string& updateJson,
331350 const bool summarize,
332- long startProcessCpuTime) {
333- protocol::TaskUpdateRequest updateRequest = json::parse (updateJson);
351+ long startProcessCpuTime,
352+ bool receiveThrift) {
353+ protocol::TaskUpdateRequest updateRequest;
354+ if (receiveThrift) {
355+ auto thriftTaskUpdateRequest = std::make_shared<thrift::TaskUpdateRequest>();
356+ thriftRead (updateJson, thriftTaskUpdateRequest);
357+ protocol::fromThrift (*thriftTaskUpdateRequest, updateRequest);
358+ } else {
359+ updateRequest = json::parse (updateJson);
360+ }
334361 velox::core::PlanFragment planFragment;
335362 std::shared_ptr<velox::core::QueryCtx> queryCtx;
336363 if (updateRequest.fragment ) {
337- auto fragment =
338- velox::encoding::Base64::decode (*updateRequest.fragment );
364+ std::string fragment;
365+ if (receiveThrift) {
366+ fragment = *updateRequest.fragment ;
367+ } else {
368+ fragment = velox::encoding::Base64::decode (*updateRequest.fragment );
369+ }
339370 protocol::PlanFragment prestoPlan = json::parse (fragment);
340371
341372 queryCtx =
@@ -511,11 +542,11 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
511542
512543 auto & headers = message->getHeaders ();
513544 auto acceptHeader = headers.getSingleOrEmpty (proxygen::HTTP_HEADER_ACCEPT);
514- auto useThrift =
545+ auto sendThrift =
515546 acceptHeader.find (http::kMimeTypeApplicationThrift ) != std::string::npos;
516547
517548 return new http::CallbackRequestHandler (
518- [this , useThrift , taskId, currentState, maxWait](
549+ [this , sendThrift , taskId, currentState, maxWait](
519550 proxygen::HTTPMessage* /* message*/ ,
520551 const std::vector<std::unique_ptr<folly::IOBuf>>& /* body*/ ,
521552 proxygen::ResponseHandler* downstream,
@@ -525,7 +556,7 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
525556 httpSrvCpuExecutor_,
526557 [this ,
527558 evb,
528- useThrift ,
559+ sendThrift ,
529560 taskId,
530561 currentState,
531562 maxWait,
@@ -535,12 +566,12 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
535566 .getTaskStatus (taskId, currentState, maxWait, handlerState)
536567 .via (evb)
537568 .thenValue (
538- [useThrift , downstream, taskId, handlerState](
569+ [sendThrift , downstream, taskId, handlerState](
539570 std::unique_ptr<protocol::TaskStatus> taskStatus) {
540571 if (!handlerState->requestExpired ()) {
541- if (useThrift ) {
572+ if (sendThrift ) {
542573 thrift::TaskStatus thriftTaskStatus;
543- toThrift (*taskStatus, thriftTaskStatus);
574+ protocol:: toThrift (*taskStatus, thriftTaskStatus);
544575 http::sendOkThriftResponse (
545576 downstream, thriftWrite (thriftTaskStatus));
546577 } else {
0 commit comments