Skip to content

Commit 7b50230

Browse files
author
bystrovserg
committed
YT-24301: Introduce list_operation_events
commit_hash:2bb31fc84a02730f1857c630e911f8dee67885e8
1 parent 26db383 commit 7b50230

File tree

16 files changed

+281
-0
lines changed

16 files changed

+281
-0
lines changed

yt/yt/client/api/delegating_client.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,11 @@ class TDelegatingClient
548548
const TGetJobFailContextOptions& options),
549549
(operationIdOrAlias, jobId, options))
550550

551+
DELEGATE_METHOD(TFuture<std::vector<TOperationEvent>>, ListOperationEvents, (
552+
const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
553+
const TListOperationEventsOptions& options),
554+
(operationIdOrAlias, options))
555+
551556
DELEGATE_METHOD(TFuture<TListOperationsResult>, ListOperations, (
552557
const TListOperationsOptions& options),
553558
(options))

yt/yt/client/api/operation_client.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,17 @@ void Serialize(const TJobTraceEvent& traceEvent, NYson::IYsonConsumer* consumer)
330330
.EndMap();
331331
}
332332

333+
void Serialize(const TOperationEvent& operationEvent, NYson::IYsonConsumer* consumer) {
334+
NYTree::BuildYsonFluently(consumer)
335+
.BeginMap()
336+
.Item("timestamp").Value(operationEvent.Timestamp)
337+
.Item("event_type").Value(operationEvent.EventType)
338+
.OptionalItem("incarnation", operationEvent.Incarnation)
339+
.OptionalItem("incarnation_switch_reason", operationEvent.IncarnationSwitchReason)
340+
.OptionalItem("incarnation_switch_info", operationEvent.IncarnationSwitchInfo)
341+
.EndMap();
342+
}
343+
333344
////////////////////////////////////////////////////////////////////////////////
334345

335346
void TListOperationsAccessFilter::Register(TRegistrar registrar)

yt/yt/client/api/operation_client.h

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
#include <yt/yt/client/node_tracker_client/public.h>
99

10+
#include <yt/yt/client/controller_agent/public.h>
11+
1012
namespace NYT::NApi {
1113

1214
////////////////////////////////////////////////////////////////////////////////
@@ -112,6 +114,19 @@ struct TGetJobFailContextOptions
112114
, public TMasterReadOptions
113115
{ };
114116

117+
DEFINE_ENUM(EOperationEventType,
118+
((IncarnationStarted) (0))
119+
);
120+
121+
struct TListOperationEventsOptions
122+
: public TTimeoutOptions
123+
, public TMasterReadOptions
124+
{
125+
std::optional<EOperationEventType> EventType;
126+
127+
i64 Limit = 1000;
128+
};
129+
115130
struct TListOperationsAccessFilter
116131
: public NYTree::TYsonStruct
117132
{
@@ -419,6 +434,21 @@ struct TJobTraceEvent
419434

420435
void Serialize(const TJobTraceEvent& traceEvent, NYson::IYsonConsumer* consumer);
421436

437+
struct TOperationEvent
438+
{
439+
TInstant Timestamp;
440+
EOperationEventType EventType;
441+
442+
// Incarnation started
443+
std::optional<std::string> Incarnation;
444+
445+
// Empty IncarnationSwitchReason and filled Incarnation means switch reason is "operation started".
446+
std::optional<NControllerAgent::EOperationIncarnationSwitchReason> IncarnationSwitchReason;
447+
std::optional<NYson::TYsonString> IncarnationSwitchInfo;
448+
};
449+
450+
void Serialize(const TOperationEvent& operationEvent, NYson::IYsonConsumer* consumer);
451+
422452
struct TListJobsStatistics
423453
{
424454
TEnumIndexedArray<NJobTrackerClient::EJobState, i64> StateCounts;
@@ -532,6 +562,10 @@ struct IOperationClient
532562
NJobTrackerClient::TJobId jobId,
533563
const TGetJobFailContextOptions& options = {}) = 0;
534564

565+
virtual TFuture<std::vector<TOperationEvent>> ListOperationEvents(
566+
const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
567+
const TListOperationEventsOptions& options) = 0;
568+
535569
virtual TFuture<TListOperationsResult> ListOperations(
536570
const TListOperationsOptions& options = {}) = 0;
537571

yt/yt/client/api/rpc_proxy/api_service_proxy.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ class TApiServiceProxy
112112
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, PatchOperationSpec);
113113
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, GetOperation);
114114
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ListOperations);
115+
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ListOperationEvents);
115116

116117
// Jobs
117118
DEFINE_RPC_PROXY_METHOD(NRpcProxy::NProto, ListJobs);

yt/yt/client/api/rpc_proxy/client_impl.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1382,6 +1382,27 @@ TFuture<std::vector<TJobTraceEvent>> TClient::GetJobTrace(
13821382
}));
13831383
}
13841384

1385+
TFuture<std::vector<TOperationEvent>> TClient::ListOperationEvents(
1386+
const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
1387+
const TListOperationEventsOptions& options)
1388+
{
1389+
auto proxy = CreateApiServiceProxy();
1390+
1391+
auto req = proxy.ListOperationEvents();
1392+
SetTimeoutOptions(*req, options);
1393+
1394+
NScheduler::ToProto(req, operationIdOrAlias);
1395+
1396+
if (options.EventType) {
1397+
req->set_event_type(NProto::ConvertOperationEventTypeToProto(*options.EventType));
1398+
}
1399+
1400+
req->set_limit(options.Limit);
1401+
1402+
return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspListOperationEventsPtr& rsp) {
1403+
return FromProto<std::vector<TOperationEvent>>(rsp->events());
1404+
}));
1405+
}
13851406

13861407
TFuture<TSharedRef> TClient::GetJobFailContext(
13871408
const TOperationIdOrAlias& operationIdOrAlias,

yt/yt/client/api/rpc_proxy/client_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,10 @@ class TClient
303303
NJobTrackerClient::TJobId jobId,
304304
const NApi::TGetJobFailContextOptions& options) override;
305305

306+
TFuture<std::vector<TOperationEvent>> ListOperationEvents(
307+
const NScheduler::TOperationIdOrAlias& operationIdOrAlias,
308+
const TListOperationEventsOptions& options) override;
309+
306310
TFuture<NApi::TListOperationsResult> ListOperations(
307311
const NApi::TListOperationsOptions& options) override;
308312

yt/yt/client/api/rpc_proxy/helpers.cpp

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,40 @@ void FromProto(
424424
result->EventTime = TInstant::FromValue(proto.event_time());
425425
}
426426

427+
void ToProto(
428+
NProto::TOperationEvent* proto,
429+
const NApi::TOperationEvent& result)
430+
{
431+
proto->set_timestamp(ToProto(result.Timestamp));
432+
proto->set_event_type(ConvertOperationEventTypeToProto(result.EventType));
433+
434+
YT_OPTIONAL_TO_PROTO(proto, incarnation, result.Incarnation);
435+
436+
if (result.IncarnationSwitchReason) {
437+
proto->set_incarnation_switch_reason(ConvertIncarnationSwitchReasonToProto(*result.IncarnationSwitchReason));
438+
}
439+
440+
if (result.IncarnationSwitchInfo) {
441+
proto->set_incarnation_switch_info(result.IncarnationSwitchInfo->ToString());
442+
}
443+
}
444+
445+
446+
void FromProto(
447+
NApi::TOperationEvent* result,
448+
const NProto::TOperationEvent& proto)
449+
{
450+
FromProto(&result->Timestamp, proto.timestamp());
451+
result->EventType = ConvertOperationEventTypeFromProto(proto.event_type());
452+
result->Incarnation = YT_OPTIONAL_FROM_PROTO(proto, incarnation);
453+
if (proto.has_incarnation_switch_reason()) {
454+
result->IncarnationSwitchReason = ConvertIncarnationSwitchReasonFromProto(proto.incarnation_switch_reason());
455+
}
456+
if (proto.has_incarnation_switch_info()) {
457+
result->IncarnationSwitchInfo = TYsonString(proto.incarnation_switch_info());
458+
}
459+
}
460+
427461
////////////////////////////////////////////////////////////////////////////////
428462
// MISC
429463
////////////////////////////////////////////////////////////////////////////////
@@ -1478,6 +1512,54 @@ NScheduler::EOperationState ConvertOperationStateFromProto(
14781512
YT_ABORT();
14791513
}
14801514

1515+
NProto::EOperationEventType ConvertOperationEventTypeToProto(
1516+
NApi::EOperationEventType operationEventType)
1517+
{
1518+
switch (operationEventType) {
1519+
case NApi::EOperationEventType::IncarnationStarted:
1520+
return NProto::EOperationEventType::OET_INCARNATION_STARTED;
1521+
}
1522+
}
1523+
1524+
NApi::EOperationEventType ConvertOperationEventTypeFromProto(
1525+
NProto::EOperationEventType proto)
1526+
{
1527+
switch (proto) {
1528+
case NProto::EOperationEventType::OET_INCARNATION_STARTED:
1529+
return NApi::EOperationEventType::IncarnationStarted;
1530+
}
1531+
}
1532+
1533+
NProto::EIncarnationSwitchReason ConvertIncarnationSwitchReasonToProto(
1534+
NControllerAgent::EOperationIncarnationSwitchReason operationEventType)
1535+
{
1536+
switch (operationEventType) {
1537+
case NYT::NControllerAgent::EOperationIncarnationSwitchReason::JobAborted:
1538+
return NProto::EIncarnationSwitchReason::ISR_JOB_ABORTED;
1539+
case NYT::NControllerAgent::EOperationIncarnationSwitchReason::JobFailed:
1540+
return NProto::EIncarnationSwitchReason::ISR_JOB_FAILED;
1541+
case NYT::NControllerAgent::EOperationIncarnationSwitchReason::JobInterrupted:
1542+
return NProto::EIncarnationSwitchReason::ISR_JOB_INTERRUPTED;
1543+
case NYT::NControllerAgent::EOperationIncarnationSwitchReason::JobLackAfterRevival:
1544+
return NProto::EIncarnationSwitchReason::ISR_JOB_LACK_AFTER_REVIVAL;
1545+
}
1546+
}
1547+
1548+
NControllerAgent::EOperationIncarnationSwitchReason ConvertIncarnationSwitchReasonFromProto(
1549+
NProto::EIncarnationSwitchReason proto)
1550+
{
1551+
switch (proto) {
1552+
case NProto::EIncarnationSwitchReason::ISR_JOB_ABORTED:
1553+
return NYT::NControllerAgent::EOperationIncarnationSwitchReason::JobAborted;
1554+
case NProto::EIncarnationSwitchReason::ISR_JOB_FAILED:
1555+
return NYT::NControllerAgent::EOperationIncarnationSwitchReason::JobFailed;
1556+
case NProto::EIncarnationSwitchReason::ISR_JOB_INTERRUPTED:
1557+
return NYT::NControllerAgent::EOperationIncarnationSwitchReason::JobInterrupted;
1558+
case NProto::EIncarnationSwitchReason::ISR_JOB_LACK_AFTER_REVIVAL:
1559+
return NYT::NControllerAgent::EOperationIncarnationSwitchReason::JobLackAfterRevival;
1560+
}
1561+
}
1562+
14811563
NProto::EJobType ConvertJobTypeToProto(
14821564
NJobTrackerClient::EJobType jobType)
14831565
{

yt/yt/client/api/rpc_proxy/helpers.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,14 @@ void FromProto(
114114
NApi::TJobTraceEvent* result,
115115
const NProto::TJobTraceEvent& proto);
116116

117+
void ToProto(
118+
NProto::TOperationEvent* proto,
119+
const NApi::TOperationEvent& result);
120+
121+
void FromProto(
122+
NApi::TOperationEvent* result,
123+
const NProto::TOperationEvent& proto);
124+
117125
void ToProto(NProto::TColumnSchema* protoSchema, const NTableClient::TColumnSchema& schema);
118126
void FromProto(NTableClient::TColumnSchema* schema, const NProto::TColumnSchema& protoSchema);
119127

@@ -264,6 +272,18 @@ NProto::EOperationState ConvertOperationStateToProto(
264272
NScheduler::EOperationState ConvertOperationStateFromProto(
265273
NProto::EOperationState proto);
266274

275+
NProto::EOperationEventType ConvertOperationEventTypeToProto(
276+
NApi::EOperationEventType operationEventType);
277+
278+
NApi::EOperationEventType ConvertOperationEventTypeFromProto(
279+
NProto::EOperationEventType proto);
280+
281+
NProto::EIncarnationSwitchReason ConvertIncarnationSwitchReasonToProto(
282+
NControllerAgent::EOperationIncarnationSwitchReason operationEventType);
283+
284+
NControllerAgent::EOperationIncarnationSwitchReason ConvertIncarnationSwitchReasonFromProto(
285+
NProto::EIncarnationSwitchReason proto);
286+
267287
NProto::EJobType ConvertJobTypeToProto(
268288
NJobTrackerClient::EJobType jobType);
269289

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#pragma once
2+
3+
#include <library/cpp/yt/misc/enum.h>
4+
5+
namespace NYT::NControllerAgent {
6+
7+
////////////////////////////////////////////////////////////////////////////////
8+
9+
DEFINE_ENUM(EOperationIncarnationSwitchReason,
10+
(JobAborted)
11+
(JobFailed)
12+
(JobInterrupted)
13+
(JobLackAfterRevival)
14+
);
15+
16+
////////////////////////////////////////////////////////////////////////////////
17+
18+
} // namespace NYT::NControllerAgent

yt/yt/client/driver/driver.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ class TDriver
307307
REGISTER_ALL(TGetJobTraceCommand, "get_job_trace", Null, Structured, false, true );
308308
REGISTER_ALL(TGetJobFailContextCommand, "get_job_fail_context", Null, Binary, false, true );
309309
REGISTER_ALL(TGetJobSpecCommand, "get_job_spec", Null, Structured, false, true );
310+
REGISTER_ALL(TListOperationEventsCommand, "list_operation_events", Null, Structured, false, false);
310311
REGISTER_ALL(TListOperationsCommand, "list_operations", Null, Structured, false, false);
311312
REGISTER_ALL(TListJobsCommand, "list_jobs", Null, Structured, false, false);
312313
REGISTER_ALL(TGetJobCommand, "get_job", Null, Structured, false, false);

0 commit comments

Comments
 (0)