@@ -210,25 +210,35 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
210210 const protocol::TaskId& taskId,
211211 const std::string& updateJson,
212212 const bool summarize,
213- long startProcessCpuTime)>& createOrUpdateFunc) {
213+ long startProcessCpuTime,
214+ bool receiveThrift)>& createOrUpdateFunc) {
214215 protocol::TaskId taskId = pathMatch[1 ];
215216 bool summarize = message->hasQueryParam (" summarize" );
217+
218+ auto & headers = message->getHeaders ();
219+ const auto & acceptHeader = headers.getSingleOrEmpty (proxygen::HTTP_HEADER_ACCEPT);
220+ const auto sendThrift =
221+ acceptHeader.find (http::kMimeTypeApplicationThrift ) != std::string::npos;
222+ const auto & contentHeader = headers.getSingleOrEmpty (proxygen::HTTP_HEADER_CONTENT_TYPE);
223+ const auto receiveThrift =
224+ contentHeader.find (http::kMimeTypeApplicationThrift ) != std::string::npos;
225+
216226 return new http::CallbackRequestHandler (
217- [this , taskId, summarize, createOrUpdateFunc](
227+ [this , taskId, summarize, createOrUpdateFunc, sendThrift, receiveThrift ](
218228 proxygen::HTTPMessage* /* message*/ ,
219229 const std::vector<std::unique_ptr<folly::IOBuf>>& body,
220230 proxygen::ResponseHandler* downstream,
221231 std::shared_ptr<http::CallbackRequestHandlerState> handlerState) {
222232 folly::via (
223233 httpSrvCpuExecutor_,
224- [this , &body, taskId, summarize, createOrUpdateFunc]() {
234+ [this , &body, taskId, summarize, createOrUpdateFunc, receiveThrift ]() {
225235 const auto startProcessCpuTimeNs = util::getProcessCpuTimeNs ();
226236 std::string updateJson = util::extractMessageBody (body);
227237
228238 std::unique_ptr<protocol::TaskInfo> taskInfo;
229239 try {
230240 taskInfo = createOrUpdateFunc (
231- taskId, updateJson, summarize, startProcessCpuTimeNs);
241+ taskId, updateJson, summarize, startProcessCpuTimeNs, receiveThrift );
232242 } catch (const velox::VeloxException& e) {
233243 // Creating an empty task, putting errors inside so that next
234244 // status fetch from coordinator will catch the error and well
@@ -243,12 +253,19 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
243253 throw ;
244254 }
245255 }
246- return json (* taskInfo) ;
256+ return taskInfo;
247257 })
248258 .via (folly::EventBaseManager::get ()->getEventBase ())
249- .thenValue ([downstream, handlerState](auto && taskInfoJson ) {
259+ .thenValue ([downstream, handlerState, sendThrift ](auto taskInfo ) {
250260 if (!handlerState->requestExpired ()) {
251- http::sendOkResponse (downstream, taskInfoJson);
261+ if (sendThrift) {
262+ thrift::TaskInfo thriftTaskInfo;
263+ toThrift (*taskInfo, thriftTaskInfo);
264+ http::sendOkThriftResponse (
265+ downstream, thriftWrite (thriftTaskInfo));
266+ } else {
267+ http::sendOkResponse (downstream, json (*taskInfo));
268+ }
252269 }
253270 })
254271 .thenError (
@@ -277,7 +294,8 @@ proxygen::RequestHandler* TaskResource::createOrUpdateBatchTask(
277294 [&](const protocol::TaskId& taskId,
278295 const std::string& updateJson,
279296 const bool summarize,
280- long startProcessCpuTime) {
297+ long startProcessCpuTime,
298+ bool /* receiveThrift*/ ) {
281299 protocol::BatchTaskUpdateRequest batchUpdateRequest =
282300 json::parse (updateJson);
283301 auto updateRequest = batchUpdateRequest.taskUpdateRequest ;
@@ -329,14 +347,20 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTask(
329347 [&](const protocol::TaskId& taskId,
330348 const std::string& updateJson,
331349 const bool summarize,
332- long startProcessCpuTime) {
333- protocol::TaskUpdateRequest updateRequest = json::parse (updateJson);
350+ long startProcessCpuTime,
351+ bool receiveThrift) {
352+ protocol::TaskUpdateRequest updateRequest;
353+ if (receiveThrift) {
354+ auto thriftTaskUpdateRequest = std::make_shared<thrift::TaskUpdateRequest>();
355+ thriftRead (updateJson, thriftTaskUpdateRequest);
356+ fromThrift (*thriftTaskUpdateRequest, updateRequest);
357+ } else {
358+ updateRequest = json::parse (updateJson);
359+ }
334360 velox::core::PlanFragment planFragment;
335361 std::shared_ptr<velox::core::QueryCtx> queryCtx;
336362 if (updateRequest.fragment ) {
337- auto fragment =
338- velox::encoding::Base64::decode (*updateRequest.fragment );
339- protocol::PlanFragment prestoPlan = json::parse (fragment);
363+ protocol::PlanFragment prestoPlan = json::parse (receiveThrift ? *updateRequest.fragment : velox::encoding::Base64::decode (*updateRequest.fragment ));
340364
341365 queryCtx =
342366 taskManager_.getQueryContextManager ()->findOrCreateQueryCtx (
@@ -510,12 +534,12 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
510534 auto maxWait = getMaxWait (message);
511535
512536 auto & headers = message->getHeaders ();
513- auto acceptHeader = headers.getSingleOrEmpty (proxygen::HTTP_HEADER_ACCEPT);
514- auto useThrift =
537+ const auto & acceptHeader = headers.getSingleOrEmpty (proxygen::HTTP_HEADER_ACCEPT);
538+ const auto sendThrift =
515539 acceptHeader.find (http::kMimeTypeApplicationThrift ) != std::string::npos;
516540
517541 return new http::CallbackRequestHandler (
518- [this , useThrift , taskId, currentState, maxWait](
542+ [this , sendThrift , taskId, currentState, maxWait](
519543 proxygen::HTTPMessage* /* message*/ ,
520544 const std::vector<std::unique_ptr<folly::IOBuf>>& /* body*/ ,
521545 proxygen::ResponseHandler* downstream,
@@ -525,7 +549,7 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
525549 httpSrvCpuExecutor_,
526550 [this ,
527551 evb,
528- useThrift ,
552+ sendThrift ,
529553 taskId,
530554 currentState,
531555 maxWait,
@@ -535,10 +559,10 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
535559 .getTaskStatus (taskId, currentState, maxWait, handlerState)
536560 .via (evb)
537561 .thenValue (
538- [useThrift , downstream, taskId, handlerState](
562+ [sendThrift , downstream, taskId, handlerState](
539563 std::unique_ptr<protocol::TaskStatus> taskStatus) {
540564 if (!handlerState->requestExpired ()) {
541- if (useThrift ) {
565+ if (sendThrift ) {
542566 thrift::TaskStatus thriftTaskStatus;
543567 toThrift (*taskStatus, thriftTaskStatus);
544568 http::sendOkThriftResponse (
0 commit comments