Skip to content

Commit 9bfd860

Browse files
vhsu14shangm2
authored andcommitted
Migrate TaskStatus, TaskUpdateRequest, TaskInfo to Thrift with JSON fields in CPP (prestodb#25079)
1 parent 941ffb2 commit 9bfd860

30 files changed

+2106
-1388
lines changed

presto-native-execution/presto_cpp/main/TaskResource.cpp

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -210,25 +210,35 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
210210
const protocol::TaskId& taskId,
211211
const std::string& updateJson,
212212
const bool summarize,
213-
long startProcessCpuTime)>& createOrUpdateFunc) {
213+
long startProcessCpuTime,
214+
bool receiveThrift)>& createOrUpdateFunc) {
214215
protocol::TaskId taskId = pathMatch[1];
215216
bool summarize = message->hasQueryParam("summarize");
217+
218+
auto& headers = message->getHeaders();
219+
auto acceptHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
220+
auto sendThrift =
221+
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
222+
auto contentHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_CONTENT_TYPE);
223+
auto receiveThrift =
224+
contentHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
225+
216226
return new http::CallbackRequestHandler(
217-
[this, taskId, summarize, createOrUpdateFunc](
227+
[this, taskId, summarize, createOrUpdateFunc, sendThrift, receiveThrift](
218228
proxygen::HTTPMessage* /*message*/,
219229
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
220230
proxygen::ResponseHandler* downstream,
221231
std::shared_ptr<http::CallbackRequestHandlerState> handlerState) {
222232
folly::via(
223233
httpSrvCpuExecutor_,
224-
[this, &body, taskId, summarize, createOrUpdateFunc]() {
234+
[this, &body, taskId, summarize, createOrUpdateFunc, receiveThrift]() {
225235
const auto startProcessCpuTimeNs = util::getProcessCpuTimeNs();
226236
std::string updateJson = util::extractMessageBody(body);
227237

228238
std::unique_ptr<protocol::TaskInfo> taskInfo;
229239
try {
230240
taskInfo = createOrUpdateFunc(
231-
taskId, updateJson, summarize, startProcessCpuTimeNs);
241+
taskId, updateJson, summarize, startProcessCpuTimeNs, receiveThrift);
232242
} catch (const velox::VeloxException& e) {
233243
// Creating an empty task, putting errors inside so that next
234244
// status fetch from coordinator will catch the error and well
@@ -243,12 +253,19 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
243253
throw;
244254
}
245255
}
246-
return json(*taskInfo);
256+
return taskInfo;
247257
})
248258
.via(folly::EventBaseManager::get()->getEventBase())
249-
.thenValue([downstream, handlerState](auto&& taskInfoJson) {
259+
.thenValue([downstream, handlerState, sendThrift](std::unique_ptr<protocol::TaskInfo> taskInfo) {
250260
if (!handlerState->requestExpired()) {
251-
http::sendOkResponse(downstream, taskInfoJson);
261+
if (sendThrift) {
262+
facebook::presto::thrift::TaskInfo thriftTaskInfo;
263+
facebook::presto::thrift::toThrift(*taskInfo, thriftTaskInfo);
264+
http::sendOkThriftResponse(
265+
downstream, thriftWrite(thriftTaskInfo));
266+
} else {
267+
http::sendOkResponse(downstream, json(*taskInfo));
268+
}
252269
}
253270
})
254271
.thenError(
@@ -277,7 +294,8 @@ proxygen::RequestHandler* TaskResource::createOrUpdateBatchTask(
277294
[&](const protocol::TaskId& taskId,
278295
const std::string& updateJson,
279296
const bool summarize,
280-
long startProcessCpuTime) {
297+
long startProcessCpuTime,
298+
bool receiveThrift) {
281299
protocol::BatchTaskUpdateRequest batchUpdateRequest =
282300
json::parse(updateJson);
283301
auto updateRequest = batchUpdateRequest.taskUpdateRequest;
@@ -329,13 +347,25 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTask(
329347
[&](const protocol::TaskId& taskId,
330348
const std::string& updateJson,
331349
const bool summarize,
332-
long startProcessCpuTime) {
333-
protocol::TaskUpdateRequest updateRequest = json::parse(updateJson);
350+
long startProcessCpuTime,
351+
bool receiveThrift) {
352+
protocol::TaskUpdateRequest updateRequest;
353+
if (receiveThrift) {
354+
auto thriftTaskUpdateRequest = std::make_shared<facebook::presto::thrift::TaskUpdateRequest>();
355+
thriftRead(updateJson, thriftTaskUpdateRequest);
356+
facebook::presto::thrift::fromThrift(*thriftTaskUpdateRequest, updateRequest);
357+
} else {
358+
updateRequest = json::parse(updateJson);
359+
}
334360
velox::core::PlanFragment planFragment;
335361
std::shared_ptr<velox::core::QueryCtx> queryCtx;
336362
if (updateRequest.fragment) {
337-
auto fragment =
338-
velox::encoding::Base64::decode(*updateRequest.fragment);
363+
std::string fragment;
364+
if (receiveThrift) {
365+
fragment = *updateRequest.fragment;
366+
} else {
367+
fragment = velox::encoding::Base64::decode(*updateRequest.fragment);
368+
}
339369
protocol::PlanFragment prestoPlan = json::parse(fragment);
340370

341371
queryCtx =
@@ -511,11 +541,11 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
511541

512542
auto& headers = message->getHeaders();
513543
auto acceptHeader = headers.getSingleOrEmpty(proxygen::HTTP_HEADER_ACCEPT);
514-
auto useThrift =
544+
auto sendThrift =
515545
acceptHeader.find(http::kMimeTypeApplicationThrift) != std::string::npos;
516546

517547
return new http::CallbackRequestHandler(
518-
[this, useThrift, taskId, currentState, maxWait](
548+
[this, sendThrift, taskId, currentState, maxWait](
519549
proxygen::HTTPMessage* /*message*/,
520550
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
521551
proxygen::ResponseHandler* downstream,
@@ -525,7 +555,7 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
525555
httpSrvCpuExecutor_,
526556
[this,
527557
evb,
528-
useThrift,
558+
sendThrift,
529559
taskId,
530560
currentState,
531561
maxWait,
@@ -535,12 +565,12 @@ proxygen::RequestHandler* TaskResource::getTaskStatus(
535565
.getTaskStatus(taskId, currentState, maxWait, handlerState)
536566
.via(evb)
537567
.thenValue(
538-
[useThrift, downstream, taskId, handlerState](
568+
[sendThrift, downstream, taskId, handlerState](
539569
std::unique_ptr<protocol::TaskStatus> taskStatus) {
540570
if (!handlerState->requestExpired()) {
541-
if (useThrift) {
542-
thrift::TaskStatus thriftTaskStatus;
543-
toThrift(*taskStatus, thriftTaskStatus);
571+
if (sendThrift) {
572+
facebook::presto::thrift::TaskStatus thriftTaskStatus;
573+
facebook::presto::thrift::toThrift(*taskStatus, thriftTaskStatus);
544574
http::sendOkThriftResponse(
545575
downstream, thriftWrite(thriftTaskStatus));
546576
} else {

presto-native-execution/presto_cpp/main/TaskResource.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ class TaskResource {
7676
const protocol::TaskId&,
7777
const std::string&,
7878
const bool,
79-
long)>& createOrUpdateFunc);
79+
long,
80+
const bool)>& createOrUpdateFunc);
8081

8182
proxygen::RequestHandler* deleteTask(
8283
proxygen::HTTPMessage* message,

presto-native-execution/presto_cpp/main/common/tests/test_json.h

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414
#pragma once
1515

1616
#include <fstream>
17-
#include <ios>
1817
#include <iosfwd>
18+
#include <boost/filesystem.hpp>
19+
#include <boost/algorithm/string.hpp>
1920

2021
#include "presto_cpp/presto_protocol/core/presto_protocol_core.h"
2122

23+
namespace fs = boost::filesystem;
24+
2225
namespace nlohmann {
2326

2427
// This is required avoid stack overflow when a gtest error printer is invoked.
@@ -48,3 +51,19 @@ inline std::string slurp(const std::string& path) {
4851
buf << input.rdbuf();
4952
return buf.str();
5053
}
54+
55+
inline std::string getDataPath(const std::string& dirUnderFbcode, const std::string& fileName) {
56+
std::string currentPath = fs::current_path().c_str();
57+
if (boost::algorithm::ends_with(currentPath, "fbcode")) {
58+
return currentPath + dirUnderFbcode + fileName;
59+
}
60+
61+
// CLion runs the tests from cmake-build-release/ or cmake-build-debug/
62+
// directory. Hard-coded json files are not copied there and test fails with
63+
// file not found. Fixing the path so that we can trigger these tests from
64+
// CLion.
65+
boost::algorithm::replace_all(currentPath, "cmake-build-release/", "");
66+
boost::algorithm::replace_all(currentPath, "cmake-build-debug/", "");
67+
68+
return currentPath + "/data/" + fileName;
69+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.
2+
3+
#include <gtest/gtest.h>
4+
#include "presto_cpp/main/thrift/ProtocolToThrift.h"
5+
#include "presto_cpp/presto_protocol/core/Duration.h"
6+
#include "presto_cpp/main/common/tests/test_json.h"
7+
#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h"
8+
9+
using namespace facebook;
10+
using namespace facebook::presto::protocol;
11+
12+
class TaskInfoTest : public ::testing::Test {};
13+
14+
TEST_F(TaskInfoTest, duration) {
15+
std::unique_ptr<double> thrift = std::make_unique<double>();
16+
cpp2::toThrift(Duration(123, TimeUnit::MILLISECONDS), *thrift);
17+
ASSERT_EQ(*thrift, 123);
18+
}
19+
20+
TEST_F(TaskInfoTest, binaryMetadataUpdates) {
21+
std::string str = slurp(getDataPath("/github/presto-trunk/presto-native-execution/presto_cpp/main/thrift/tests/data/", "MetadataUpdates.json"));
22+
json j = json::parse(str);
23+
registerPrestoToVeloxConnector(std::make_unique<facebook::presto::HivePrestoToVeloxConnector>("hive"));
24+
MetadataUpdates metadataUpdates = j;
25+
std::unique_ptr<std::string> thriftMetadataUpdates = std::make_unique<std::string>();
26+
cpp2::toThrift(metadataUpdates, *thriftMetadataUpdates);
27+
28+
json thriftJson = json::parse(*thriftMetadataUpdates);
29+
ASSERT_EQ(j, thriftJson);
30+
31+
presto::unregisterPrestoToVeloxConnector("hive");
32+
}
33+
34+
TEST_F(TaskInfoTest, taskInfo) {
35+
std::string str = slurp(getDataPath("/github/presto-trunk/presto-native-execution/presto_cpp/main/thrift/tests/data/", "TaskInfo.json"));
36+
json j = json::parse(str);
37+
registerPrestoToVeloxConnector(std::make_unique<facebook::presto::HivePrestoToVeloxConnector>("hive"));
38+
TaskInfo taskInfo = j;
39+
cpp2::TaskInfo thriftTaskInfo;
40+
toThrift(taskInfo, thriftTaskInfo);
41+
42+
json thriftJson = json::parse(*thriftTaskInfo.metadataUpdates()->metadataUpdates());
43+
ASSERT_EQ(taskInfo.metadataUpdates, thriftJson);
44+
ASSERT_EQ(thriftTaskInfo.needsPlan(), false);
45+
ASSERT_EQ(thriftTaskInfo.outputBuffers()->buffers()->size(), 2);
46+
ASSERT_EQ(thriftTaskInfo.outputBuffers()->buffers()[0].bufferId()->id(), 100);
47+
ASSERT_EQ(thriftTaskInfo.outputBuffers()->buffers()[1].bufferId()->id(), 200);
48+
ASSERT_EQ(thriftTaskInfo.stats()->blockedReasons()->count(cpp2::BlockedReason::WAITING_FOR_MEMORY), 1);
49+
ASSERT_EQ(thriftTaskInfo.stats()->runtimeStats()->metrics()->size(), 2);
50+
ASSERT_EQ(thriftTaskInfo.stats()->runtimeStats()->metrics()["test_metric1"].sum(), 123);
51+
ASSERT_EQ(thriftTaskInfo.stats()->runtimeStats()->metrics()["test_metric2"].name(), "test_metric2");
52+
53+
presto::unregisterPrestoToVeloxConnector("hive");
54+
}
55+
56+
TEST_F(TaskInfoTest, taskId) {
57+
TaskId taskId = "queryId.1.2.3.4";
58+
cpp2::TaskId thriftTaskId;
59+
toThrift(taskId, thriftTaskId);
60+
61+
ASSERT_EQ(thriftTaskId.stageExecutionId()->stageId()->queryId(), "queryId");
62+
ASSERT_EQ(thriftTaskId.stageExecutionId()->stageId()->id(), 1);
63+
ASSERT_EQ(thriftTaskId.stageExecutionId()->id(), 2);
64+
ASSERT_EQ(thriftTaskId.id(), 3);
65+
ASSERT_EQ(thriftTaskId.attemptNumber(), 4);
66+
}
67+
68+
69+
TEST_F(TaskInfoTest, operatorStatsEmptyBlockedReason) {
70+
std::string str = slurp(getDataPath("/github/presto-trunk/presto-native-execution/presto_cpp/main/thrift/tests/data/", "OperatorStatsEmptyBlockedReason.json"));
71+
json j = json::parse(str);
72+
OperatorStats operatorStats = j;
73+
cpp2::OperatorStats thriftOperatorStats;
74+
toThrift(operatorStats, thriftOperatorStats);
75+
76+
ASSERT_EQ(thriftOperatorStats.blockedReason().has_value(), false);
77+
ASSERT_EQ(thriftOperatorStats.blockedWall(), 80);
78+
ASSERT_EQ(thriftOperatorStats.finishCpu(), 1000);
79+
}
80+
81+
TEST_F(TaskInfoTest, operatorStats) {
82+
std::string str = slurp(getDataPath("/github/presto-trunk/presto-native-execution/presto_cpp/main/thrift/tests/data/", "OperatorStats.json"));
83+
json j = json::parse(str);
84+
OperatorStats operatorStats = j;
85+
cpp2::OperatorStats thriftOperatorStats;
86+
toThrift(operatorStats, thriftOperatorStats);
87+
88+
ASSERT_EQ(thriftOperatorStats.blockedReason(), cpp2::BlockedReason::WAITING_FOR_MEMORY);
89+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.
2+
3+
#include <gtest/gtest.h>
4+
#include "presto_cpp/main/thrift/ProtocolToThrift.h"
5+
#include "presto_cpp/main/common/tests/test_json.h"
6+
7+
using namespace facebook;
8+
using namespace facebook::presto::protocol;
9+
10+
class TaskStatusTest : public ::testing::Test {};
11+
12+
TEST_F(TaskStatusTest, lifeSpan) {
13+
std::string str = R"("Group1001")";
14+
15+
json j = json::parse(str);
16+
Lifespan lifeSpan = j;
17+
cpp2::Lifespan thriftLifespan;
18+
toThrift(lifeSpan, thriftLifespan);
19+
20+
ASSERT_EQ(thriftLifespan.grouped(), true);
21+
ASSERT_EQ(thriftLifespan.groupId(), 1001);
22+
}
23+
24+
TEST_F(TaskStatusTest, errorCode) {
25+
std::string str = R"({
26+
"code": 1234,
27+
"name": "name",
28+
"type": "INTERNAL_ERROR",
29+
"retriable": false
30+
})";
31+
32+
json j = json::parse(str);
33+
ErrorCode errorCode = j;
34+
cpp2::ErrorCode thriftErrorCode;
35+
toThrift(errorCode, thriftErrorCode);
36+
37+
ASSERT_EQ(thriftErrorCode.code(), 1234);
38+
ASSERT_EQ(thriftErrorCode.name(), "name");
39+
ASSERT_EQ(thriftErrorCode.type(), cpp2::ErrorType::INTERNAL_ERROR);
40+
ASSERT_EQ(thriftErrorCode.retriable(), false);
41+
}
42+
43+
TEST_F(TaskStatusTest, executionFailureInfoOptionalFieldsEmpty) {
44+
std::string str = R"({
45+
"type": "type",
46+
"message": "message",
47+
"suppressed": [],
48+
"stack": [],
49+
"errorLocation": {
50+
"lineNumber": 1,
51+
"columnNumber": 2
52+
},
53+
"errorCode": {
54+
"code": 1234,
55+
"name": "name",
56+
"type": "INTERNAL_ERROR",
57+
"retriable": false
58+
},
59+
"remoteHost": "localhost:8080",
60+
"errorCause": "EXCEEDS_BROADCAST_MEMORY_LIMIT"
61+
})";
62+
63+
json j = json::parse(str);
64+
ExecutionFailureInfo executionFailureInfo = j;
65+
cpp2::ExecutionFailureInfo thriftExecutionFailureInfo;
66+
toThrift(executionFailureInfo, thriftExecutionFailureInfo);
67+
68+
ASSERT_EQ(thriftExecutionFailureInfo.type(), "type");
69+
ASSERT_EQ(thriftExecutionFailureInfo.errorLocation()->columnNumber(), 2);
70+
ASSERT_EQ(thriftExecutionFailureInfo.remoteHost()->host(), "localhost");
71+
ASSERT_EQ(thriftExecutionFailureInfo.remoteHost()->port(), 8080);
72+
ASSERT_EQ(thriftExecutionFailureInfo.errorCode()->type(), cpp2::ErrorType::INTERNAL_ERROR);
73+
ASSERT_EQ(thriftExecutionFailureInfo.errorCode()->retriable(), false);
74+
ASSERT_EQ(thriftExecutionFailureInfo.errorCause(), cpp2::ErrorCause::EXCEEDS_BROADCAST_MEMORY_LIMIT);
75+
ASSERT_EQ(thriftExecutionFailureInfo.cause(), nullptr);
76+
ASSERT_EQ(thriftExecutionFailureInfo.suppressed()->size(), 0);
77+
}
78+
79+
TEST_F(TaskStatusTest, executionFailureInfoOptionalFieldsNonempty) {
80+
std::string str = slurp(getDataPath("/github/presto-trunk/presto-native-execution/presto_cpp/main/thrift/tests/data/", "ExecutionFailureInfo.json"));
81+
82+
json j = json::parse(str);
83+
ExecutionFailureInfo executionFailureInfo = j;
84+
cpp2::ExecutionFailureInfo thriftExecutionFailureInfo;
85+
toThrift(executionFailureInfo, thriftExecutionFailureInfo);
86+
87+
ASSERT_EQ((*thriftExecutionFailureInfo.cause()).type(), "cause");
88+
ASSERT_EQ((*thriftExecutionFailureInfo.cause()).errorCause(), cpp2::ErrorCause::UNKNOWN);
89+
ASSERT_EQ((*thriftExecutionFailureInfo.cause()).errorCode()->type(), cpp2::ErrorType::INSUFFICIENT_RESOURCES);
90+
ASSERT_EQ((*thriftExecutionFailureInfo.cause()).errorCode()->retriable(), true);
91+
ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[0].type(), "suppressed1");
92+
ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[0].errorCause(), cpp2::ErrorCause::LOW_PARTITION_COUNT);
93+
ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[0].errorCode()->type(), cpp2::ErrorType::EXTERNAL);
94+
ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[1].type(), "suppressed2");
95+
ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[1].errorCause(), cpp2::ErrorCause::EXCEEDS_BROADCAST_MEMORY_LIMIT);
96+
ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[1].errorCode()->type(), cpp2::ErrorType::INTERNAL_ERROR);
97+
}

0 commit comments

Comments
 (0)