Skip to content

Commit 6e3bca8

Browse files
authored
[k2] fix TlRpcError::try_fetch (#1518)
1 parent 01a7aa6 commit 6e3bca8

File tree

6 files changed

+161
-89
lines changed

6 files changed

+161
-89
lines changed

common/tl/constants/common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ inline constexpr uint32_t TL_INT_KEY_DICTIONARY_FIELD = 0x721ea8b9U;
2020
inline constexpr uint32_t TL_LEFT = 0x0a29cd5dU;
2121
inline constexpr uint32_t TL_LONG = 0x22076cbaU;
2222
inline constexpr uint32_t TL_LONG_KEY_DICTIONARY = 0xb424d8f1U;
23+
inline constexpr uint32_t TL_REQ_ERROR = 0xb527877dU;
2324
inline constexpr uint32_t TL_REQ_RESULT_HEADER = 0x8cc84ce1U;
2425
inline constexpr uint32_t TL_MAYBE_FALSE = 0x27930a7bU;
2526
inline constexpr uint32_t TL_MAYBE_TRUE = 0x3f9c8ef8U;

runtime-light/stdlib/rpc/rpc-exceptions.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,36 @@ inline auto make(std::string_view func_name, std::source_location loc = std::sou
8383

8484
} // namespace cant_store_function
8585

86+
namespace cant_fetch_error {
87+
88+
namespace details {
89+
90+
inline constexpr int64_t CODE = -5;
91+
inline constexpr std::string_view DESCRIPTION = "can't fetch error";
92+
93+
} // namespace details
94+
95+
inline auto make(std::source_location loc = std::source_location::current()) noexcept {
96+
return kphp::exception::make_throwable<C$Exception>(string{loc.file_name()}, loc.line(), details::CODE,
97+
string{details::DESCRIPTION.data(), details::DESCRIPTION.size()});
98+
}
99+
100+
} // namespace cant_fetch_error
101+
102+
namespace cant_fetch_header {
103+
104+
namespace details {
105+
106+
inline constexpr int64_t CODE = -6;
107+
inline constexpr std::string_view DESCRIPTION = "can't fetch header";
108+
109+
} // namespace details
110+
111+
inline auto make(std::source_location loc = std::source_location::current()) noexcept {
112+
return kphp::exception::make_throwable<C$Exception>(string{loc.file_name()}, loc.line(), details::CODE,
113+
string{details::DESCRIPTION.data(), details::DESCRIPTION.size()});
114+
}
115+
116+
} // namespace cant_fetch_header
117+
86118
} // namespace kphp::rpc::exception

runtime-light/stdlib/rpc/rpc-tl-error.cpp

Lines changed: 25 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -4,68 +4,39 @@
44

55
#include "runtime-light/stdlib/rpc/rpc-tl-error.h"
66

7-
#include <tuple>
8-
97
#include "common/tl/constants/common.h"
8+
#include "runtime-light/server/rpc/rpc-server-state.h"
109
#include "runtime-light/stdlib/diagnostics/exception-functions.h"
11-
#include "runtime-light/stdlib/rpc/rpc-api.h"
12-
#include "runtime-light/stdlib/rpc/rpc-tl-builtins.h"
10+
#include "runtime-light/stdlib/rpc/rpc-exceptions.h"
11+
#include "runtime-light/tl/tl-core.h"
12+
#include "runtime-light/tl/tl-types.h"
1313

1414
bool TlRpcError::try_fetch() noexcept {
15-
const auto backup_pos{tl_parse_save_pos()};
16-
auto op{TRY_CALL(decltype(f$fetch_int()), bool, f$fetch_int())};
17-
if (op == TL_REQ_RESULT_HEADER) {
18-
fetch_and_skip_header();
19-
op = TRY_CALL(decltype(f$fetch_int()), bool, f$fetch_int());
20-
}
21-
if (op != TL_RPC_REQ_ERROR) {
22-
tl_parse_restore_pos(backup_pos);
15+
auto& rpc_server_instance_state_fetcher{RpcServerInstanceState::get().tl_fetcher};
16+
auto fetcher{rpc_server_instance_state_fetcher};
17+
tl::magic magic{};
18+
if (!magic.fetch(fetcher)) [[unlikely]] {
19+
THROW_EXCEPTION(kphp::rpc::exception::not_enough_data_to_fetch::make());
2320
return false;
2421
}
25-
26-
std::ignore = TRY_CALL(decltype(f$fetch_long()), bool, f$fetch_long());
27-
error_code = static_cast<int32_t>(TRY_CALL(decltype(f$fetch_int()), bool, f$fetch_int()));
28-
error_msg = TRY_CALL(decltype(f$fetch_string()), bool, f$fetch_string());
29-
return true;
30-
}
31-
32-
void TlRpcError::fetch_and_skip_header() const noexcept {
33-
const auto flags{static_cast<int32_t>(TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int()))};
34-
35-
if (flags & vk::tl::common::rpc_req_result_extra_flags::binlog_pos) {
36-
std::ignore = TRY_CALL(decltype(f$fetch_long()), void, f$fetch_long());
37-
}
38-
if (flags & vk::tl::common::rpc_req_result_extra_flags::binlog_time) {
39-
std::ignore = TRY_CALL(decltype(f$fetch_long()), void, f$fetch_long());
40-
}
41-
if (flags & vk::tl::common::rpc_req_result_extra_flags::engine_pid) {
42-
std::ignore = TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int());
43-
std::ignore = TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int());
44-
std::ignore = TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int());
45-
}
46-
if (flags & vk::tl::common::rpc_req_result_extra_flags::request_size) {
47-
std::ignore = TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int());
48-
}
49-
if (flags & vk::tl::common::rpc_req_result_extra_flags::response_size) {
50-
std::ignore = TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int());
51-
}
52-
if (flags & vk::tl::common::rpc_req_result_extra_flags::failed_subqueries) {
53-
std::ignore = TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int());
54-
}
55-
if (flags & vk::tl::common::rpc_req_result_extra_flags::compression_version) {
56-
std::ignore = TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int());
57-
}
58-
if (flags & vk::tl::common::rpc_req_result_extra_flags::stats) {
59-
const auto size{TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int())};
60-
for (auto i = 0; i < size; ++i) {
61-
std::ignore = TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int());
62-
std::ignore = TRY_CALL(decltype(f$fetch_int()), void, f$fetch_int());
22+
if (magic.expect(TL_REQ_RESULT_HEADER)) {
23+
tl::reqResultHeader req_result_header{};
24+
if (!req_result_header.fetch(fetcher)) [[unlikely]] {
25+
THROW_EXCEPTION(kphp::rpc::exception::cant_fetch_header::make());
26+
return false;
6327
}
28+
fetcher = tl::fetcher{req_result_header.result};
6429
}
65-
if (flags & vk::tl::common::rpc_req_result_extra_flags::epoch_number) {
66-
std::ignore = TRY_CALL(decltype(f$fetch_long()), void, f$fetch_long());
30+
if (!magic.expect(TL_RPC_REQ_ERROR)) {
31+
return false;
6732
}
68-
if (flags & vk::tl::common::rpc_req_result_extra_flags::view_number) {
69-
std::ignore = TRY_CALL(decltype(f$fetch_long()), void, f$fetch_long());
33+
tl::rpcReqError rpc_req_error{};
34+
if (!rpc_req_error.fetch(fetcher)) [[unlikely]] {
35+
THROW_EXCEPTION(kphp::rpc::exception::cant_fetch_error::make());
36+
return false;
7037
}
38+
error_code = rpc_req_error.error_code.value;
39+
error_msg = {rpc_req_error.error.value.data(), static_cast<string::size_type>(rpc_req_error.error.value.size())};
40+
rpc_server_instance_state_fetcher = std::move(fetcher);
41+
return true;
7142
}

runtime-light/stdlib/rpc/rpc-tl-error.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,6 @@ struct TlRpcError {
3838
}
3939

4040
bool try_fetch() noexcept;
41-
42-
private:
43-
void fetch_and_skip_header() const noexcept;
4441
};
4542

4643
class RpcErrorFactory {

runtime-light/tl/tl-types.cpp

Lines changed: 65 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -218,62 +218,105 @@ tl::mask rpcInvokeReqExtra::get_flags() const noexcept {
218218
return flags;
219219
}
220220

221+
bool rpcReqResultExtra::fetch(tl::fetcher& tlf, const tl::mask& flags) noexcept {
222+
bool ok{true};
223+
if (ok && static_cast<bool>(flags.value & BINLOG_POS_FLAG)) {
224+
ok = opt_binlog_pos.emplace().fetch(tlf);
225+
}
226+
if (ok && static_cast<bool>(flags.value & BINLOG_TIME_FLAG)) {
227+
ok = opt_binlog_time.emplace().fetch(tlf);
228+
}
229+
if (ok && static_cast<bool>(flags.value & ENGINE_PID_FLAG)) {
230+
ok = opt_engine_pid.emplace().fetch(tlf);
231+
}
232+
if (ok && static_cast<bool>(flags.value & REQUEST_SIZE_FLAG)) {
233+
kphp::log::assertion(static_cast<bool>(flags.value & RESPONSE_SIZE_FLAG));
234+
ok = opt_request_size.emplace().fetch(tlf) && opt_response_size.emplace().fetch(tlf);
235+
}
236+
if (ok && static_cast<bool>(flags.value & FAILED_SUBQUERIES_FLAG)) {
237+
ok = opt_failed_subqueries.emplace().fetch(tlf);
238+
}
239+
if (ok && static_cast<bool>(flags.value & COMPRESSION_VERSION_FLAG)) {
240+
ok = opt_compression_version.emplace().fetch(tlf);
241+
}
242+
if (ok && static_cast<bool>(flags.value & STATS_FLAG)) {
243+
ok = opt_stats.emplace().fetch(tlf);
244+
}
245+
if (ok && static_cast<bool>(flags.value & EPOCH_NUMBER_FLAG)) {
246+
kphp::log::assertion(static_cast<bool>(flags.value & VIEW_NUMBER_FLAG));
247+
ok = opt_epoch_number.emplace().fetch(tlf) && opt_view_number.emplace().fetch(tlf);
248+
}
249+
return ok;
250+
}
251+
221252
void rpcReqResultExtra::store(tl::storer& tls, const tl::mask& flags) const noexcept {
222253
if (static_cast<bool>(flags.value & BINLOG_POS_FLAG)) {
223-
binlog_pos.store(tls);
254+
kphp::log::assertion(opt_binlog_pos.has_value());
255+
opt_binlog_pos->store(tls);
224256
}
225257
if (static_cast<bool>(flags.value & BINLOG_TIME_FLAG)) {
226-
binlog_time.store(tls);
258+
kphp::log::assertion(opt_binlog_time.has_value());
259+
opt_binlog_time->store(tls);
227260
}
228-
if (static_cast<bool>(flags.value) & ENGINE_PID_FLAG) {
229-
engine_pid.store(tls);
261+
if (static_cast<bool>(flags.value & ENGINE_PID_FLAG)) {
262+
kphp::log::assertion(opt_engine_pid.has_value());
263+
opt_engine_pid->store(tls);
230264
}
231265
if (static_cast<bool>(flags.value & REQUEST_SIZE_FLAG)) {
232-
kphp::log::assertion(static_cast<bool>(flags.value & RESPONSE_SIZE_FLAG));
233-
request_size.store(tls), response_size.store(tls);
266+
kphp::log::assertion(opt_request_size.has_value() && static_cast<bool>(flags.value & RESPONSE_SIZE_FLAG) && opt_response_size.has_value());
267+
opt_request_size->store(tls), opt_response_size->store(tls);
234268
}
235269
if (static_cast<bool>(flags.value & FAILED_SUBQUERIES_FLAG)) {
236-
failed_subqueries.store(tls);
270+
kphp::log::assertion(opt_failed_subqueries.has_value());
271+
opt_failed_subqueries->store(tls);
237272
}
238273
if (static_cast<bool>(flags.value & COMPRESSION_VERSION_FLAG)) {
239-
compression_version.store(tls);
274+
kphp::log::assertion(opt_compression_version.has_value());
275+
opt_compression_version->store(tls);
240276
}
241277
if (static_cast<bool>(flags.value & STATS_FLAG)) {
242-
stats.store(tls);
278+
kphp::log::assertion(opt_stats.has_value());
279+
opt_stats->store(tls);
243280
}
244281
if (static_cast<bool>(flags.value & EPOCH_NUMBER_FLAG)) {
245-
kphp::log::assertion(static_cast<bool>(flags.value & VIEW_NUMBER_FLAG));
246-
epoch_number.store(tls), view_number.store(tls);
282+
kphp::log::assertion(opt_epoch_number.has_value() && static_cast<bool>(flags.value & VIEW_NUMBER_FLAG) && opt_view_number.has_value());
283+
opt_epoch_number->store(tls), opt_view_number->store(tls);
247284
}
248285
}
249286

250287
size_t rpcReqResultExtra::footprint(const tl::mask& flags) const noexcept {
251288
size_t footprint{};
252289
if (static_cast<bool>(flags.value & BINLOG_POS_FLAG)) {
253-
footprint += binlog_pos.footprint();
290+
kphp::log::assertion(opt_binlog_pos.has_value());
291+
footprint += opt_binlog_pos->footprint();
254292
}
255293
if (static_cast<bool>(flags.value & BINLOG_TIME_FLAG)) {
256-
footprint += binlog_time.footprint();
294+
kphp::log::assertion(opt_binlog_time.has_value());
295+
footprint += opt_binlog_time->footprint();
257296
}
258-
if (static_cast<bool>(flags.value) & ENGINE_PID_FLAG) {
259-
footprint += engine_pid.footprint();
297+
if (static_cast<bool>(flags.value & ENGINE_PID_FLAG)) {
298+
kphp::log::assertion(opt_engine_pid.has_value());
299+
footprint += opt_engine_pid->footprint();
260300
}
261301
if (static_cast<bool>(flags.value & REQUEST_SIZE_FLAG)) {
262-
kphp::log::assertion(static_cast<bool>(flags.value & RESPONSE_SIZE_FLAG));
263-
footprint += request_size.footprint() + response_size.footprint();
302+
kphp::log::assertion(opt_request_size.has_value() && static_cast<bool>(flags.value & RESPONSE_SIZE_FLAG) && opt_response_size.has_value());
303+
footprint += opt_request_size->footprint() + opt_response_size->footprint();
264304
}
265305
if (static_cast<bool>(flags.value & FAILED_SUBQUERIES_FLAG)) {
266-
footprint += failed_subqueries.footprint();
306+
kphp::log::assertion(opt_failed_subqueries.has_value());
307+
footprint += opt_failed_subqueries->footprint();
267308
}
268309
if (static_cast<bool>(flags.value & COMPRESSION_VERSION_FLAG)) {
269-
footprint += compression_version.footprint();
310+
kphp::log::assertion(opt_compression_version.has_value());
311+
footprint += opt_compression_version->footprint();
270312
}
271313
if (static_cast<bool>(flags.value & STATS_FLAG)) {
272-
footprint += stats.footprint();
314+
kphp::log::assertion(opt_stats.has_value());
315+
footprint += opt_stats->footprint();
273316
}
274317
if (static_cast<bool>(flags.value & EPOCH_NUMBER_FLAG)) {
275-
kphp::log::assertion(static_cast<bool>(flags.value & VIEW_NUMBER_FLAG));
276-
footprint += epoch_number.footprint() + view_number.footprint();
318+
kphp::log::assertion(opt_epoch_number.has_value() && static_cast<bool>(flags.value & VIEW_NUMBER_FLAG) && opt_view_number.has_value());
319+
footprint += opt_epoch_number->footprint() + opt_view_number->footprint();
277320
}
278321
return footprint;
279322
}

runtime-light/tl/tl-types.h

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "common/tl/constants/common.h"
2222
#include "runtime-common/core/allocator/script-allocator.h"
2323
#include "runtime-common/core/std/containers.h"
24+
#include "runtime-light/stdlib/diagnostics/logs.h"
2425
#include "runtime-light/tl/tl-core.h"
2526

2627
namespace tl {
@@ -1142,17 +1143,18 @@ class rpcReqResultExtra final {
11421143
static constexpr uint32_t VIEW_NUMBER_FLAG = vk::tl::common::rpc_req_result_extra_flags::view_number;
11431144

11441145
public:
1145-
tl::i64 binlog_pos{};
1146-
tl::i64 binlog_time{};
1147-
tl::netPid engine_pid{};
1148-
tl::i32 request_size{};
1149-
tl::i32 response_size{};
1150-
tl::i32 failed_subqueries{};
1151-
tl::i32 compression_version{};
1152-
tl::dictionary<tl::string> stats{};
1153-
tl::i64 epoch_number{};
1154-
tl::i64 view_number{};
1146+
std::optional<tl::i64> opt_binlog_pos;
1147+
std::optional<tl::i64> opt_binlog_time;
1148+
std::optional<tl::netPid> opt_engine_pid;
1149+
std::optional<tl::i32> opt_request_size;
1150+
std::optional<tl::i32> opt_response_size;
1151+
std::optional<tl::i32> opt_failed_subqueries;
1152+
std::optional<tl::i32> opt_compression_version;
1153+
std::optional<tl::dictionary<tl::string>> opt_stats;
1154+
std::optional<tl::i64> opt_epoch_number;
1155+
std::optional<tl::i64> opt_view_number;
11551156

1157+
bool fetch(tl::fetcher& tlf, const tl::mask& flags) noexcept;
11561158
void store(tl::storer& tls, const tl::mask& flags) const noexcept;
11571159

11581160
size_t footprint(const tl::mask& flags) const noexcept;
@@ -1166,6 +1168,32 @@ struct RpcReqResultExtra final {
11661168
}
11671169
};
11681170

1171+
struct reqResultHeader final {
1172+
tl::mask flags{};
1173+
tl::rpcReqResultExtra extra{};
1174+
std::span<const std::byte> result;
1175+
1176+
bool fetch(tl::fetcher& tlf) noexcept {
1177+
if (!flags.fetch(tlf) || !extra.fetch(tlf, flags)) [[unlikely]] {
1178+
return false;
1179+
}
1180+
auto opt_result{tlf.fetch_bytes(tlf.remaining())};
1181+
kphp::log::assertion(opt_result.has_value());
1182+
result = *opt_result;
1183+
return true;
1184+
}
1185+
};
1186+
1187+
struct rpcReqError final {
1188+
tl::i64 query_id{};
1189+
tl::i32 error_code{};
1190+
tl::string error{};
1191+
1192+
bool fetch(tl::fetcher& tlf) noexcept {
1193+
return query_id.fetch(tlf) && error_code.fetch(tlf) && error.fetch(tlf);
1194+
}
1195+
};
1196+
11691197
struct k2RpcResponseError final {
11701198
tl::i32 error_code{};
11711199
tl::string error{};

0 commit comments

Comments
 (0)