@@ -210,25 +210,35 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
210210 const protocol::TaskId& taskId,
211211 const std::string& updateJson,
212212 const bool summarize,
213- long startProcessCpuTime)>& createOrUpdateFunc) {
213+ long startProcessCpuTime,
214+ bool receiveThrift)>& createOrUpdateFunc) {
214215 protocol::TaskId taskId = pathMatch[1 ];
215216 bool summarize = message->hasQueryParam (" summarize" );
217+
218+ auto & headers = message->getHeaders ();
219+ auto acceptHeader = headers.getSingleOrEmpty (proxygen::HTTP_HEADER_ACCEPT);
220+ auto sendThrift =
221+ acceptHeader.find (http::kMimeTypeApplicationThrift ) != std::string::npos;
222+ auto contentHeader = headers.getSingleOrEmpty (proxygen::HTTP_HEADER_CONTENT_TYPE);
223+ auto receiveThrift =
224+ contentHeader.find (http::kMimeTypeApplicationThrift ) != std::string::npos;
225+
216226 return new http::CallbackRequestHandler (
217- [this , taskId, summarize, createOrUpdateFunc](
227+ [this , taskId, summarize, createOrUpdateFunc, sendThrift, receiveThrift ](
218228 proxygen::HTTPMessage* /* message*/ ,
219229 const std::vector<std::unique_ptr<folly::IOBuf>>& body,
220230 proxygen::ResponseHandler* downstream,
221231 std::shared_ptr<http::CallbackRequestHandlerState> handlerState) {
222232 folly::via (
223233 httpSrvCpuExecutor_,
224- [this , &body, taskId, summarize, createOrUpdateFunc]() {
234+ [this , &body, taskId, summarize, createOrUpdateFunc, receiveThrift ]() {
225235 const auto startProcessCpuTimeNs = util::getProcessCpuTimeNs ();
226236 std::string updateJson = util::extractMessageBody (body);
227237
228238 std::unique_ptr<protocol::TaskInfo> taskInfo;
229239 try {
230240 taskInfo = createOrUpdateFunc (
231- taskId, updateJson, summarize, startProcessCpuTimeNs);
241+ taskId, updateJson, summarize, startProcessCpuTimeNs, receiveThrift );
232242 } catch (const velox::VeloxException& e) {
233243 // Creating an empty task, putting errors inside so that next
234244 // status fetch from coordinator will catch the error and well
@@ -243,12 +253,19 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
243253 throw ;
244254 }
245255 }
246- return json (* taskInfo) ;
256+ return taskInfo;
247257 })
248258 .via (folly::EventBaseManager::get ()->getEventBase ())
249- .thenValue ([downstream, handlerState]( auto && taskInfoJson ) {
259+ .thenValue ([downstream, handlerState, sendThrift](std::unique_ptr<protocol::TaskInfo> taskInfo ) {
250260 if (!handlerState->requestExpired ()) {
251- http::sendOkResponse (downstream, taskInfoJson);
261+ if (sendThrift) {
262+ thrift::TaskInfo thriftTaskInfo;
263+ toThrift (*taskInfo, thriftTaskInfo);
264+ http::sendOkThriftResponse (
265+ downstream, thriftWrite (thriftTaskInfo));
266+ } else {
267+ http::sendOkResponse (downstream, json (*taskInfo));
268+ }
252269 }
253270 })
254271 .thenError (
@@ -277,7 +294,8 @@ proxygen::RequestHandler* TaskResource::createOrUpdateBatchTask(
277294 [&](const protocol::TaskId& taskId,
278295 const std::string& updateJson,
279296 const bool summarize,
280- long startProcessCpuTime) {
297+ long startProcessCpuTime,
298+ bool /* receiveThrift*/ ) {
281299 protocol::BatchTaskUpdateRequest batchUpdateRequest =
282300 json::parse (updateJson);
283301 auto updateRequest = batchUpdateRequest.taskUpdateRequest ;
@@ -329,13 +347,25 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTask(
329347 [&](const protocol::TaskId& taskId,
330348 const std::string& updateJson,
331349 const bool summarize,
332- long startProcessCpuTime) {
333- protocol::TaskUpdateRequest updateRequest = json::parse (updateJson);
350+ long startProcessCpuTime,
351+ bool receiveThrift) {
352+ protocol::TaskUpdateRequest updateRequest;
353+ if (receiveThrift) {
354+ auto thriftTaskUpdateRequest = std::make_shared<thrift::TaskUpdateRequest>();
355+ thriftRead (updateJson, thriftTaskUpdateRequest);
356+ fromThrift (*thriftTaskUpdateRequest, updateRequest);
357+ } else {
358+ updateRequest = json::parse (updateJson);
359+ }
334360 velox::core::PlanFragment planFragment;
335361 std::shared_ptr<velox::core::QueryCtx> queryCtx;
336362 if (updateRequest.fragment ) {
337- auto fragment =
338- velox::encoding::Base64::decode (*updateRequest.fragment );
363+ std::string fragment;
364+ if (receiveThrift) {
365+ fragment = *updateRequest.fragment ;
366+ } else {
367+ fragment = velox::encoding::Base64::decode (*updateRequest.fragment );
368+ }
339369 protocol::PlanFragment prestoPlan = json::parse (fragment);
340370
341371 queryCtx =
@@ -511,11 +541,11 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
511541
512542 auto & headers = message->getHeaders ();
513543 auto acceptHeader = headers.getSingleOrEmpty (proxygen::HTTP_HEADER_ACCEPT);
514- auto useThrift =
544+ auto sendThrift =
515545 acceptHeader.find (http::kMimeTypeApplicationThrift ) != std::string::npos;
516546
517547 return new http::CallbackRequestHandler (
518- [this , useThrift , taskId, currentState, maxWait](
548+ [this , sendThrift , taskId, currentState, maxWait](
519549 proxygen::HTTPMessage* /* message*/ ,
520550 const std::vector<std::unique_ptr<folly::IOBuf>>& /* body*/ ,
521551 proxygen::ResponseHandler* downstream,
@@ -525,7 +555,7 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
525555 httpSrvCpuExecutor_,
526556 [this ,
527557 evb,
528- useThrift ,
558+ sendThrift ,
529559 taskId,
530560 currentState,
531561 maxWait,
@@ -535,10 +565,10 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
535565 .getTaskStatus (taskId, currentState, maxWait, handlerState)
536566 .via (evb)
537567 .thenValue (
538- [useThrift , downstream, taskId, handlerState](
568+ [sendThrift , downstream, taskId, handlerState](
539569 std::unique_ptr<protocol::TaskStatus> taskStatus) {
540570 if (!handlerState->requestExpired ()) {
541- if (useThrift ) {
571+ if (sendThrift ) {
542572 thrift::TaskStatus thriftTaskStatus;
543573 toThrift (*taskStatus, thriftTaskStatus);
544574 http::sendOkThriftResponse (
0 commit comments