Skip to content

Commit f9480a7

Browse files
committed
Propagate deadline in SDK components (#27497)
1 parent 1cf0535 commit f9480a7

File tree

35 files changed

+379
-152
lines changed

35 files changed

+379
-152
lines changed

.github/last_commit.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
b9b3351e7e628df62ee9335ff1b38d831e7fbe66
1+
eaa66fe4cc75727a60d89a9ec9d99970e03ce0e0

include/ydb-cpp-sdk/client/types/request_settings.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ struct TRequestSettings {
2020
FLUENT_SETTING(std::string, TraceId);
2121
FLUENT_SETTING(std::string, RequestType);
2222
FLUENT_SETTING(THeader, Header);
23-
FLUENT_SETTING(TDuration, ClientTimeout);
23+
FLUENT_SETTING_DEFAULT(TDuration, ClientTimeout, TDuration::Max());
2424
FLUENT_SETTING(std::string, TraceParent);
2525

2626
TRequestSettings() = default;

src/client/common_client/impl/client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class TClientImplCommon
5353
return DbDriverState_->DiscoveryCompleted();
5454
}
5555

56-
void ScheduleTask(const std::function<void()>& fn, TDuration timeout) override {
56+
void ScheduleTask(const std::function<void()>& fn, TDeadline::Duration timeout) override {
5757
std::weak_ptr<IClientImplCommon> weak = this->shared_from_this();
5858
auto cbGuard = [weak, fn]() {
5959
auto strongClient = weak.lock();
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
#pragma once
22

3+
#include <src/library/time/time.h>
4+
35
#include <functional>
4-
#include <util/datetime/base.h>
56

67
namespace NYdb::inline V3 {
78

89
class IClientImplCommon {
910
public:
1011
virtual ~IClientImplCommon() = default;
11-
virtual void ScheduleTask(const std::function<void()>& fn, TDuration timeout) = 0;
12+
virtual void ScheduleTask(const std::function<void()>& fn, TDeadline::Duration timeout) = 0;
1213
};
1314

1415
}

src/client/driver/driver.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,9 @@ TDriverConfig TDriver::GetConfig() const {
265265
);
266266
config.SetDrainOnDtors(Impl_->DrainOnDtors_);
267267
config.SetBalancingPolicy(std::make_unique<TBalancingPolicy::TImpl>(Impl_->BalancingSettings_));
268-
config.SetGRpcKeepAliveTimeout(Impl_->GRpcKeepAliveTimeout_);
268+
config.SetGRpcKeepAliveTimeout(std::chrono::duration_cast<std::chrono::microseconds>(Impl_->GRpcKeepAliveTimeout_));
269269
config.SetGRpcKeepAlivePermitWithoutCalls(Impl_->GRpcKeepAlivePermitWithoutCalls_);
270-
config.SetSocketIdleTimeout(Impl_->SocketIdleTimeout_);
270+
config.SetSocketIdleTimeout(std::chrono::duration_cast<std::chrono::microseconds>(Impl_->SocketIdleTimeout_));
271271
config.SetMaxInboundMessageSize(Impl_->MaxInboundMessageSize_);
272272
config.SetMaxOutboundMessageSize(Impl_->MaxOutboundMessageSize_);
273273
config.SetMaxMessageSize(Impl_->MaxMessageSize_);

src/client/federated_topic/impl/federation_observer.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ TFederatedDbObserverImpl::TFederatedDbObserverImpl(std::shared_ptr<TGRpcConnecti
1313
, PromiseToInitState(NThreading::NewPromise())
1414
, FederationDiscoveryRetryPolicy(settings.RetryPolicy_)
1515
{
16-
RpcSettings.ClientTimeout = settings.ConnectionTimeout_;
1716
RpcSettings.EndpointPolicy = TRpcRequestSettings::TEndpointPolicy::UseDiscoveryEndpoint;
1817
RpcSettings.UseAuth = true;
18+
ConnectionTimeout = TDeadline::SafeDurationCast(settings.ConnectionTimeout_);
1919
}
2020

2121
TFederatedDbObserverImpl::~TFederatedDbObserverImpl() {
@@ -82,6 +82,9 @@ void TFederatedDbObserverImpl::RunFederationDiscoveryImpl() {
8282
}
8383
};
8484

85+
TRpcRequestSettings settings = RpcSettings;
86+
settings.Deadline = TDeadline::AfterDuration(ConnectionTimeout);
87+
8588
Connections_->RunDeferred<Ydb::FederationDiscovery::V1::FederationDiscoveryService,
8689
Ydb::FederationDiscovery::ListFederationDatabasesRequest,
8790
Ydb::FederationDiscovery::ListFederationDatabasesResponse>(
@@ -90,7 +93,7 @@ void TFederatedDbObserverImpl::RunFederationDiscoveryImpl() {
9093
&Ydb::FederationDiscovery::V1::FederationDiscoveryService::Stub::AsyncListFederationDatabases,
9194
DbDriverState_,
9295
{}, // no polling unready operations, so no need in delay parameter
93-
RpcSettings,
96+
settings,
9497
FederationDiscoveryDelayContext);
9598
}
9699

src/client/federated_topic/impl/federation_observer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ class TFederatedDbObserverImpl : public TClientImplCommon<TFederatedDbObserverIm
8484
std::shared_ptr<TFederatedDbState> FederatedDbState;
8585
NThreading::TPromise<void> PromiseToInitState;
8686
TRpcRequestSettings RpcSettings;
87+
TDeadline::Duration ConnectionTimeout;
8788
TSpinLock Lock;
8889

8990
NTopic::IRetryPolicy::TPtr FederationDiscoveryRetryPolicy;

src/client/impl/internal/db_driver_state/state.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ namespace {
2020

2121
namespace NYdb::inline V3 {
2222

23+
using namespace std::chrono_literals;
24+
2325
constexpr int PESSIMIZATION_DISCOVERY_THRESHOLD = 50; // percent of endpoints pessimized by transport error to start recheck
2426
constexpr TDuration ENDPOINT_UPDATE_PERIOD = TDuration::Minutes(1); // period to perform endpoints update in "normal" case
25-
constexpr TDuration DISCOVERY_RECHECK_PERIOD = TDuration::Seconds(5); // period to run periodic discovery task
27+
constexpr TDeadline::Duration DISCOVERY_RECHECK_PERIOD = 5s; // period to run periodic discovery task
2628

2729
TDbDriverState::TDbDriverState(
2830
const std::string& database,
@@ -237,7 +239,7 @@ TDbDriverStatePtr TDbDriverStateTracker::GetDriverState(
237239
return strongState;
238240
}
239241

240-
void TDbDriverState::AddPeriodicTask(TPeriodicCb&& cb, TDuration period) {
242+
void TDbDriverState::AddPeriodicTask(TPeriodicCb&& cb, TDeadline::Duration period) {
241243
Client->AddPeriodicTask(std::move(cb), period);
242244
}
243245

src/client/impl/internal/db_driver_state/state.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class TDbDriverState
3939

4040
void SignalDiscoveryCompleted();
4141

42-
void AddPeriodicTask(TPeriodicCb&& cb, TDuration period) override;
42+
void AddPeriodicTask(TPeriodicCb&& cb, TDeadline::Duration period) override;
4343

4444
void AddCb(TCb&& cb, ENotifyType type);
4545
void ForEachEndpoint(const TEndpointElectorSafe::THandleCb& cb, const void* tag) const;

src/client/impl/internal/grpc_connections/actions.cpp

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44

55
#include <src/api/grpc/ydb_operation_v1.grpc.pb.h>
66

7+
using namespace std::chrono_literals;
8+
79
namespace NYdb::inline V3 {
810

9-
constexpr TDuration MAX_DEFERRED_CALL_DELAY = TDuration::Seconds(10); // The max delay between GetOperation calls for one operation
11+
constexpr TDeadline::Duration MAX_DEFERRED_CALL_DELAY = 10s; // The max delay between GetOperation calls for one operation
1012

1113
TSimpleCbResult::TSimpleCbResult(
1214
TSimpleCb&& cb,
@@ -24,18 +26,18 @@ TDeferredAction::TDeferredAction(const std::string& operationId,
2426
TDeferredOperationCb&& userCb,
2527
TGRpcConnectionsImpl* connection,
2628
std::shared_ptr<IQueueClientContext> context,
27-
TDuration delay,
29+
TDeadline::Duration delay,
30+
TDeadline globalDeadline,
2831
TDbDriverStatePtr dbState,
2932
const std::string& endpoint)
3033
: TAlarmActionBase(std::move(userCb), connection, std::move(context))
31-
, NextDelay_(Min(delay * 2, MAX_DEFERRED_CALL_DELAY))
34+
, NextDelay_(std::min(delay * 2, MAX_DEFERRED_CALL_DELAY))
35+
, GlobalDeadline_(globalDeadline)
3236
, DbDriverState_(dbState)
3337
, OperationId_(operationId)
3438
, Endpoint_(endpoint)
3539
{
36-
Deadline_ = gpr_time_add(
37-
gpr_now(GPR_CLOCK_MONOTONIC),
38-
gpr_time_from_micros(delay.MicroSeconds(), GPR_TIMESPAN));
40+
Deadline_ = std::min(GlobalDeadline_, TDeadline::AfterDuration(delay));
3941
}
4042

4143
void TDeferredAction::OnAlarm() {
@@ -46,7 +48,8 @@ void TDeferredAction::OnAlarm() {
4648

4749
TRpcRequestSettings settings;
4850
settings.PreferredEndpoint = TEndpointKey(Endpoint_, 0);
49-
51+
settings.Deadline = GlobalDeadline_;
52+
5053
Connection_->RunDeferred<Ydb::Operation::V1::OperationService, Ydb::Operations::GetOperationRequest, Ydb::Operations::GetOperationResponse>(
5154
std::move(getOperationRequest),
5255
std::move(UserResponseCb_),
@@ -56,7 +59,7 @@ void TDeferredAction::OnAlarm() {
5659
settings,
5760
true,
5861
std::move(Context_));
59-
}
62+
}
6063

6164
void TDeferredAction::OnError() {
6265
Y_ABORT_UNLESS(Connection_);
@@ -76,13 +79,11 @@ TPeriodicAction::TPeriodicAction(
7679
TPeriodicCb&& userCb,
7780
TGRpcConnectionsImpl* connection,
7881
std::shared_ptr<NYdbGrpc::IQueueClientContext> context,
79-
TDuration period)
82+
TDeadline::Duration period)
8083
: TAlarmActionBase(std::move(userCb), connection, std::move(context))
8184
, Period_(period)
8285
{
83-
Deadline_ = gpr_time_add(
84-
gpr_now(GPR_CLOCK_MONOTONIC),
85-
gpr_time_from_micros(Period_.MicroSeconds(), GPR_TIMESPAN));
86+
Deadline_ = TDeadline::AfterDuration(period);
8687
}
8788

8889
void TPeriodicAction::OnAlarm() {

0 commit comments

Comments
 (0)