@@ -29,6 +29,41 @@ namespace {
2929
3030// //////////////////////////////////////////////////////////////////////////////
3131
32+ struct TStorageProxyMetrics : public TThrRefBase {
33+ explicit TStorageProxyMetrics (const ::NMonitoring::TDynamicCounterPtr& counters)
34+ : Counters(counters)
35+ , Errors(Counters->GetCounter (" Errors" , true ))
36+ , Inflight(Counters->GetCounter (" Inflight" ))
37+ , LatencyMs(Counters->GetHistogram (" LatencyMs" , ::NMonitoring::ExplicitHistogram({1 , 5 , 20 , 100 , 500 , 2000 , 10000 , 50000 })))
38+ {}
39+
40+ ::NMonitoring::TDynamicCounterPtr Counters;
41+ ::NMonitoring::TDynamicCounters::TCounterPtr Errors;
42+ ::NMonitoring::TDynamicCounters::TCounterPtr Inflight;
43+ ::NMonitoring::THistogramPtr LatencyMs;
44+ };
45+
46+ using TStorageProxyMetricsPtr = TIntrusivePtr<TStorageProxyMetrics>;
47+
48+ struct TRequestContext : public TThrRefBase {
49+ TInstant StartTime = TInstant::Now();
50+ const TStorageProxyMetricsPtr Metrics;
51+
52+ TRequestContext (const TStorageProxyMetricsPtr& metrics)
53+ : Metrics(metrics) {
54+ Metrics->Inflight ->Inc ();
55+ }
56+
57+ ~TRequestContext () {
58+ Metrics->Inflight ->Dec ();
59+ Metrics->LatencyMs ->Collect ((TInstant::Now () - StartTime).MilliSeconds ());
60+ }
61+
62+ void IncError () {
63+ Metrics->Errors ->Inc ();
64+ }
65+ };
66+
3267class TStorageProxy : public TActorBootstrapped <TStorageProxy> {
3368 NConfig::TCheckpointCoordinatorConfig Config;
3469 NConfig::TCommonConfig CommonConfig;
@@ -38,13 +73,15 @@ class TStorageProxy : public TActorBootstrapped<TStorageProxy> {
3873 TActorId ActorGC;
3974 NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
4075 TYqSharedResources::TPtr YqSharedResources;
76+ const TStorageProxyMetricsPtr Metrics;
4177
4278public:
4379 explicit TStorageProxy (
4480 const NConfig::TCheckpointCoordinatorConfig& config,
4581 const NConfig::TCommonConfig& commonConfig,
4682 const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
47- const TYqSharedResources::TPtr& yqSharedResources);
83+ const TYqSharedResources::TPtr& yqSharedResources,
84+ const ::NMonitoring::TDynamicCounterPtr& counters);
4885
4986 void Bootstrap ();
5087
@@ -103,12 +140,14 @@ TStorageProxy::TStorageProxy(
103140 const NConfig::TCheckpointCoordinatorConfig& config,
104141 const NConfig::TCommonConfig& commonConfig,
105142 const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
106- const TYqSharedResources::TPtr& yqSharedResources)
143+ const TYqSharedResources::TPtr& yqSharedResources,
144+ const ::NMonitoring::TDynamicCounterPtr& counters)
107145 : Config(config)
108146 , CommonConfig(commonConfig)
109147 , StorageConfig(Config.GetStorage())
110148 , CredentialsProviderFactory(credentialsProviderFactory)
111- , YqSharedResources(yqSharedResources) {
149+ , YqSharedResources(yqSharedResources)
150+ , Metrics(MakeIntrusive<TStorageProxyMetrics>(counters)) {
112151 FillDefaultParameters (Config, StorageConfig);
113152}
114153
@@ -138,17 +177,21 @@ void TStorageProxy::Bootstrap() {
138177}
139178
140179void TStorageProxy::Handle (TEvCheckpointStorage::TEvRegisterCoordinatorRequest::TPtr& ev) {
180+ auto context = MakeIntrusive<TRequestContext>(Metrics);
181+
141182 const auto * event = ev->Get ();
142183 LOG_STREAMS_STORAGE_SERVICE_DEBUG (" [" << event->CoordinatorId << " ] Got TEvRegisterCoordinatorRequest" )
143184
144185 CheckpointStorage->RegisterGraphCoordinator (event->CoordinatorId )
145186 .Apply ([coordinatorId = event->CoordinatorId ,
146187 cookie = ev->Cookie ,
147188 sender = ev->Sender ,
148- actorSystem = TActivationContext::ActorSystem ()] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
189+ actorSystem = TActivationContext::ActorSystem (),
190+ context] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
149191 auto response = std::make_unique<TEvCheckpointStorage::TEvRegisterCoordinatorResponse>();
150192 response->Issues = issuesFuture.GetValue ();
151193 if (response->Issues ) {
194+ context->IncError ();
152195 LOG_STREAMS_STORAGE_SERVICE_AS_WARN (*actorSystem, " [" << coordinatorId << " ] Failed to register graph: " << response->Issues .ToString ())
153196 } else {
154197 LOG_STREAMS_STORAGE_SERVICE_AS_INFO (*actorSystem, " [" << coordinatorId << " ] Graph registered" )
@@ -159,6 +202,7 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvRegisterCoordinatorRequest::
159202}
160203
161204void TStorageProxy::Handle (TEvCheckpointStorage::TEvCreateCheckpointRequest::TPtr& ev) {
205+ auto context = MakeIntrusive<TRequestContext>(Metrics);
162206 const auto * event = ev->Get ();
163207 LOG_STREAMS_STORAGE_SERVICE_DEBUG (" [" << event->CoordinatorId << " ] [" << event->CheckpointId << " ] Got TEvCreateCheckpointRequest" )
164208
@@ -169,7 +213,8 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
169213 sender = ev->Sender ,
170214 totalGraphCheckpointsSizeLimit = Config.GetStateStorageLimits ().GetMaxGraphCheckpointsSizeBytes (),
171215 graphDesc = std::move (event->GraphDescription ),
172- storage = CheckpointStorage]
216+ storage = CheckpointStorage,
217+ context]
173218 (const NThreading::TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult>& resultFuture) {
174219 auto [totalGraphCheckpointsSize, issues] = resultFuture.GetValue ();
175220
@@ -179,6 +224,7 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
179224 issues.AddIssue (std::move (ss.Str ()));
180225 }
181226 if (issues) {
227+ context->IncError ();
182228 return NThreading::MakeFuture (ICheckpointStorage::TCreateCheckpointResult {TString (), std::move (issues) } );
183229 }
184230 if (std::holds_alternative<TString>(graphDesc)) {
@@ -191,11 +237,13 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
191237 coordinatorId = event->CoordinatorId ,
192238 cookie = ev->Cookie ,
193239 sender = ev->Sender ,
194- actorSystem = TActivationContext::ActorSystem ()]
240+ actorSystem = TActivationContext::ActorSystem (),
241+ context]
195242 (const NThreading::TFuture<ICheckpointStorage::TCreateCheckpointResult>& resultFuture) {
196243 auto [graphDescId, issues] = resultFuture.GetValue ();
197244 auto response = std::make_unique<TEvCheckpointStorage::TEvCreateCheckpointResponse>(checkpointId, std::move (issues), std::move (graphDescId));
198245 if (response->Issues ) {
246+ context->IncError ();
199247 LOG_STREAMS_STORAGE_SERVICE_AS_WARN (*actorSystem, " [" << coordinatorId << " ] [" << checkpointId << " ] Failed to create checkpoint: " << response->Issues .ToString ());
200248 } else {
201249 LOG_STREAMS_STORAGE_SERVICE_AS_INFO (*actorSystem, " [" << coordinatorId << " ] [" << checkpointId << " ] Checkpoint created" );
@@ -206,18 +254,21 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
206254}
207255
208256void TStorageProxy::Handle (TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusRequest::TPtr& ev) {
257+ auto context = MakeIntrusive<TRequestContext>(Metrics);
209258 const auto * event = ev->Get ();
210259 LOG_STREAMS_STORAGE_SERVICE_DEBUG (" [" << event->CoordinatorId << " ] [" << event->CheckpointId << " ] Got TEvSetCheckpointPendingCommitStatusRequest" )
211260 CheckpointStorage->UpdateCheckpointStatus (event->CoordinatorId , event->CheckpointId , ECheckpointStatus::PendingCommit, ECheckpointStatus::Pending, event->StateSizeBytes )
212261 .Apply ([checkpointId = event->CheckpointId ,
213262 coordinatorId = event->CoordinatorId ,
214263 cookie = ev->Cookie ,
215264 sender = ev->Sender ,
216- actorSystem = TActivationContext::ActorSystem ()]
265+ actorSystem = TActivationContext::ActorSystem (),
266+ context]
217267 (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
218268 auto issues = issuesFuture.GetValue ();
219269 auto response = std::make_unique<TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse>(checkpointId, std::move (issues));
220270 if (response->Issues ) {
271+ context->IncError ();
221272 LOG_STREAMS_STORAGE_SERVICE_AS_WARN (*actorSystem, " [" << coordinatorId << " ] [" << checkpointId << " ] Failed to set 'PendingCommit' status: " << response->Issues .ToString ())
222273 } else {
223274 LOG_STREAMS_STORAGE_SERVICE_AS_INFO (*actorSystem, " [" << coordinatorId << " ] [" << checkpointId << " ] Status updated to 'PendingCommit'" )
@@ -228,6 +279,7 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvSetCheckpointPendingCommitSt
228279}
229280
230281void TStorageProxy::Handle (TEvCheckpointStorage::TEvCompleteCheckpointRequest::TPtr& ev) {
282+ auto context = MakeIntrusive<TRequestContext>(Metrics);
231283 const auto * event = ev->Get ();
232284 LOG_STREAMS_STORAGE_SERVICE_DEBUG (" [" << event->CoordinatorId << " ] [" << event->CheckpointId << " ] Got TEvCompleteCheckpointRequest" )
233285 CheckpointStorage->UpdateCheckpointStatus (event->CoordinatorId , event->CheckpointId , ECheckpointStatus::Completed, ECheckpointStatus::PendingCommit, event->StateSizeBytes )
@@ -238,11 +290,13 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCompleteCheckpointRequest::T
238290 type = event->Type ,
239291 gcEnabled = Config.GetCheckpointGarbageConfig ().GetEnabled (),
240292 actorGC = ActorGC,
241- actorSystem = TActivationContext::ActorSystem ()]
293+ actorSystem = TActivationContext::ActorSystem (),
294+ context]
242295 (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
243296 auto issues = issuesFuture.GetValue ();
244297 auto response = std::make_unique<TEvCheckpointStorage::TEvCompleteCheckpointResponse>(checkpointId, std::move (issues));
245298 if (response->Issues ) {
299+ context->IncError ();
246300 LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG (*actorSystem, " [" << coordinatorId << " ] [" << checkpointId << " ] Failed to set 'Completed' status: " << response->Issues .ToString ())
247301 } else {
248302 LOG_STREAMS_STORAGE_SERVICE_AS_INFO (*actorSystem, " [" << coordinatorId << " ] [" << checkpointId << " ] Status updated to 'Completed'" )
@@ -258,17 +312,20 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCompleteCheckpointRequest::T
258312}
259313
260314void TStorageProxy::Handle (TEvCheckpointStorage::TEvAbortCheckpointRequest::TPtr& ev) {
315+ auto context = MakeIntrusive<TRequestContext>(Metrics);
261316 const auto * event = ev->Get ();
262317 LOG_STREAMS_STORAGE_SERVICE_DEBUG (" [" << event->CoordinatorId << " ] [" << event->CheckpointId << " ] Got TEvAbortCheckpointRequest" )
263318 CheckpointStorage->AbortCheckpoint (event->CoordinatorId ,event->CheckpointId )
264319 .Apply ([checkpointId = event->CheckpointId ,
265320 coordinatorId = event->CoordinatorId ,
266321 cookie = ev->Cookie ,
267322 sender = ev->Sender ,
268- actorSystem = TActivationContext::ActorSystem ()] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
323+ actorSystem = TActivationContext::ActorSystem (),
324+ context] (const NThreading::TFuture<NYql::TIssues>& issuesFuture) {
269325 auto issues = issuesFuture.GetValue ();
270326 auto response = std::make_unique<TEvCheckpointStorage::TEvAbortCheckpointResponse>(checkpointId, std::move (issues));
271327 if (response->Issues ) {
328+ context->IncError ();
272329 LOG_STREAMS_STORAGE_SERVICE_AS_WARN (*actorSystem, " [" << coordinatorId << " ] [" << checkpointId << " ] Failed to abort checkpoint: " << response->Issues .ToString ())
273330 } else {
274331 LOG_STREAMS_STORAGE_SERVICE_AS_INFO (*actorSystem, " [" << coordinatorId << " ] [" << checkpointId << " ] Checkpoint aborted" )
@@ -279,16 +336,19 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvAbortCheckpointRequest::TPtr
279336}
280337
281338void TStorageProxy::Handle (TEvCheckpointStorage::TEvGetCheckpointsMetadataRequest::TPtr& ev) {
339+ auto context = MakeIntrusive<TRequestContext>(Metrics);
282340 const auto * event = ev->Get ();
283341 LOG_STREAMS_STORAGE_SERVICE_DEBUG (" [" << event->GraphId << " ] Got TEvGetCheckpointsMetadataRequest" );
284342 CheckpointStorage->GetCheckpoints (event->GraphId , event->Statuses , event->Limit , event->LoadGraphDescription )
285343 .Apply ([graphId = event->GraphId ,
286344 cookie = ev->Cookie ,
287345 sender = ev->Sender ,
288- actorSystem = TActivationContext::ActorSystem ()] (const NThreading::TFuture<ICheckpointStorage::TGetCheckpointsResult>& futureResult) {
346+ actorSystem = TActivationContext::ActorSystem (),
347+ context] (const NThreading::TFuture<ICheckpointStorage::TGetCheckpointsResult>& futureResult) {
289348 auto result = futureResult.GetValue ();
290349 auto response = std::make_unique<TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse>(result.first , result.second );
291350 if (response->Issues ) {
351+ context->IncError ();
292352 LOG_STREAMS_STORAGE_SERVICE_AS_WARN (*actorSystem, " [" << graphId << " ] Failed to get checkpoints: " << response->Issues .ToString ())
293353 }
294354 LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG (*actorSystem, " [" << graphId << " ] Send TEvGetCheckpointsMetadataResponse" )
@@ -297,6 +357,7 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvGetCheckpointsMetadataReques
297357}
298358
299359void TStorageProxy::Handle (NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev) {
360+ auto context = MakeIntrusive<TRequestContext>(Metrics);
300361 auto * event = ev->Get ();
301362 const auto checkpointId = TCheckpointId (event->Checkpoint .GetGeneration (), event->Checkpoint .GetId ());
302363 LOG_STREAMS_STORAGE_SERVICE_DEBUG (" [" << event->GraphId << " ] [" << checkpointId << " ] Got TEvSaveTaskState: task " << event->TaskId );
@@ -321,7 +382,8 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev)
321382 taskId = event->TaskId ,
322383 cookie = ev->Cookie ,
323384 sender = ev->Sender ,
324- actorSystem = TActivationContext::ActorSystem ()](const NThreading::TFuture<IStateStorage::TSaveStateResult>& futureResult) {
385+ actorSystem = TActivationContext::ActorSystem (),
386+ context](const NThreading::TFuture<IStateStorage::TSaveStateResult>& futureResult) {
325387 LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG (*actorSystem, " [" << graphId << " ] [" << checkpointId << " ] TEvSaveTaskState Apply: task: " << taskId)
326388 const auto & issues = futureResult.GetValue ().second ;
327389 auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult>();
@@ -331,6 +393,7 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev)
331393 response->Record .SetTaskId (taskId);
332394
333395 if (issues) {
396+ context->IncError ();
334397 LOG_STREAMS_STORAGE_SERVICE_AS_WARN (*actorSystem, " [" << graphId << " ] [" << checkpointId << " ] Failed to save task state: task: " << taskId << " , issues: " << issues.ToString ())
335398 response->Record .SetStatus (NYql::NDqProto::TEvSaveTaskStateResult::STORAGE_ERROR);
336399 } else {
@@ -342,6 +405,7 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev)
342405}
343406
344407void TStorageProxy::Handle (NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev) {
408+ auto context = MakeIntrusive<TRequestContext>(Metrics);
345409 const auto * event = ev->Get ();
346410 const auto checkpointId = TCheckpointId (event->Checkpoint .GetGeneration (), event->Checkpoint .GetId ());
347411 LOG_STREAMS_STORAGE_SERVICE_DEBUG (" [" << event->GraphId << " ] [" << checkpointId << " ] Got TEvGetTaskState: tasks {" << JoinSeq (" , " , event->TaskIds ) << " }" );
@@ -353,12 +417,14 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev) {
353417 taskIds = event->TaskIds ,
354418 cookie = ev->Cookie ,
355419 sender = ev->Sender ,
356- actorSystem = TActivationContext::ActorSystem ()](const NThreading::TFuture<IStateStorage::TGetStateResult>& resultFuture) {
420+ actorSystem = TActivationContext::ActorSystem (),
421+ context](const NThreading::TFuture<IStateStorage::TGetStateResult>& resultFuture) {
357422 auto result = resultFuture.GetValue ();
358423
359424 auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvGetTaskStateResult>(checkpointId, result.second , generation);
360425 std::swap (response->States , result.first );
361426 if (response->Issues ) {
427+ context->IncError ();
362428 LOG_STREAMS_STORAGE_SERVICE_AS_WARN (*actorSystem, " [" << graphId << " ] [" << checkpointId << " ] Failed to get task state: tasks: {" << JoinSeq (" , " , taskIds) << " }, issues: " << response->Issues .ToString ());
363429 }
364430 LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG (*actorSystem, " [" << graphId << " ] [" << checkpointId << " ] Send TEvGetTaskStateResult: tasks: {" << JoinSeq (" , " , taskIds) << " }" );
@@ -374,9 +440,10 @@ std::unique_ptr<NActors::IActor> NewStorageProxy(
374440 const NConfig::TCheckpointCoordinatorConfig& config,
375441 const NConfig::TCommonConfig& commonConfig,
376442 const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
377- const TYqSharedResources::TPtr& yqSharedResources)
443+ const TYqSharedResources::TPtr& yqSharedResources,
444+ const ::NMonitoring::TDynamicCounterPtr& counters)
378445{
379- return std::unique_ptr<NActors::IActor>(new TStorageProxy (config, commonConfig, credentialsProviderFactory, yqSharedResources));
446+ return std::unique_ptr<NActors::IActor>(new TStorageProxy (config, commonConfig, credentialsProviderFactory, yqSharedResources, counters ));
380447}
381448
382449} // namespace NFq
0 commit comments