33#include < userver/components/component.hpp>
44#include < userver/components/statistics_storage.hpp>
55#include < userver/dist_lock/dist_lock_settings.hpp>
6+ #include < userver/dynamic_config/storage/component.hpp>
7+ #include < userver/hostinfo/blocking/get_hostname.hpp>
68#include < userver/storages/postgres/component.hpp>
79#include < userver/testsuite/tasks.hpp>
10+ #include < userver/utils/algo.hpp>
811#include < userver/yaml_config/merge_schemas.hpp>
912
1013#ifndef ARCADIA_ROOT
1114#include " generated/src/storages/postgres/dist_lock_component_base.yaml.hpp" // Y_IGNORE
1215#endif
1316
17+ #include < dynamic_config/variables/POSTGRES_DISTLOCK_SETTINGS.hpp>
18+
1419USERVER_NAMESPACE_BEGIN
1520
1621namespace storages ::postgres {
@@ -19,7 +24,11 @@ DistLockComponentBase::DistLockComponentBase(
1924 const components::ComponentConfig& component_config,
2025 const components::ComponentContext& component_context
2126)
22- : components::ComponentBase(component_config, component_context) {
27+ : components::ComponentBase(component_config, component_context),
28+ config_ (component_context.FindComponent<components::DynamicConfig>().GetSource()),
29+ name_(component_config.Name()),
30+ real_host_name_(hostinfo::blocking::GetRealHostName())
31+ {
2332 auto cluster =
2433 component_context.FindComponent <components::Postgres>(component_config[" cluster" ].As <std::string>())
2534 .GetCluster ();
@@ -38,29 +47,32 @@ DistLockComponentBase::DistLockComponentBase(
3847 settings.worker_func_restart_delay =
3948 component_config[" restart-delay" ].As <std::chrono::milliseconds>(settings.worker_func_restart_delay );
4049
50+ default_settings_ = settings;
4151 auto strategy = std::make_shared<DistLockStrategy>(std::move (cluster), table, lock_name, settings);
4252
4353 auto task_processor_name = component_config[" task-processor" ].As <std::optional<std::string>>();
4454 auto * task_processor =
4555 task_processor_name ? &component_context.GetTaskProcessor (task_processor_name.value ()) : nullptr ;
4656
4757 auto locker_log_level = logging::LevelFromString (component_config[" locker-log-level" ].As <std::string>(" info" ));
48-
4958 worker_ = std::make_unique<dist_lock::DistLockedWorker>(
5059 lock_name,
5160 [this ]() {
52- if (testsuite_enabled_) {
53- DoWorkTestsuite ();
54- } else {
55- DoWork ();
61+ const auto snapshot = config_.GetSnapshot ();
62+ if (ShouldRunOnHost (snapshot)) {
63+ if (testsuite_enabled_) {
64+ DoWorkTestsuite ();
65+ } else {
66+ DoWork ();
67+ }
5668 }
5769 },
5870 std::move (strategy),
5971 settings,
6072 task_processor,
6173 locker_log_level
6274 );
63-
75+ subscription_token_ = config_. UpdateAndListen ( this , name_, &DistLockComponentBase::OnConfigUpdate);
6476 autostart_ = component_config[" autostart" ].As <bool >(false );
6577
6678 auto & statistics_storage = component_context.FindComponent <components::StatisticsStorage>();
@@ -80,7 +92,10 @@ DistLockComponentBase::DistLockComponentBase(
8092 }
8193}
8294
83- DistLockComponentBase::~DistLockComponentBase () { statistics_holder_.Unregister (); }
95+ DistLockComponentBase::~DistLockComponentBase () {
96+ statistics_holder_.Unregister ();
97+ subscription_token_.Unsubscribe ();
98+ }
8499
85100dist_lock::DistLockedWorker& DistLockComponentBase::GetWorker () { return *worker_; }
86101
@@ -97,6 +112,54 @@ void DistLockComponentBase::AutostartDistLock() {
97112
98113void DistLockComponentBase::StopDistLock () { worker_->Stop (); }
99114
115+ void DistLockComponentBase::OnConfigUpdate (const dynamic_config::Diff& diff) {
116+ const auto & old_snapshot_opt = diff.previous ;
117+ const auto & new_snapshot = diff.current ;
118+
119+ const auto & new_settings = new_snapshot[::dynamic_config::POSTGRES_DISTLOCK_SETTINGS];
120+ const auto * new_overrides = utils::FindOrNullptr (new_settings.extra , name_);
121+
122+ if (old_snapshot_opt.has_value ()) {
123+ const auto & old_settings = (*old_snapshot_opt)[::dynamic_config::POSTGRES_DISTLOCK_SETTINGS];
124+ const auto * old_overrides = utils::FindOrNullptr (old_settings.extra , name_);
125+
126+ if (!old_overrides && !new_overrides) {
127+ return ;
128+ }
129+ if (old_overrides && new_overrides && *old_overrides == *new_overrides) {
130+ return ;
131+ }
132+ }
133+ auto & worker = GetWorker ();
134+ if (new_overrides) {
135+ dist_lock::DistLockSettings settings;
136+ settings.acquire_interval = new_overrides->acquire_interval .value_or (default_settings_.acquire_interval );
137+ settings.prolong_interval = new_overrides->prolong_interval .value_or (default_settings_.prolong_interval );
138+ settings.lock_ttl = new_overrides->lock_ttl .value_or (default_settings_.lock_ttl );
139+ settings.forced_stop_margin = new_overrides->forced_stop_margin .value_or (default_settings_.forced_stop_margin );
140+ settings.worker_func_restart_delay =
141+ new_overrides->worker_func_restart_delay .value_or (default_settings_.worker_func_restart_delay );
142+
143+ worker.UpdateSettings (settings);
144+ } else {
145+ worker.UpdateSettings (default_settings_);
146+ }
147+ }
148+ bool DistLockComponentBase::ShouldRunOnHost (const dynamic_config::Snapshot& config) const {
149+ const auto & worker_settings_config = config[::dynamic_config::POSTGRES_DISTLOCK_SETTINGS];
150+ if (const auto * settings = utils::FindOrNullptr (worker_settings_config.extra , name_)) {
151+ if (const auto & hosts = settings->run_on_hosts ) {
152+ return hosts->count (real_host_name_);
153+ }
154+ }
155+ return true ;
156+ }
157+
158+ bool DistLockComponentBase::IsCancelAdvised () const {
159+ const auto snapshot = config_.GetSnapshot ();
160+ return !ShouldRunOnHost (snapshot);
161+ }
162+
100163yaml_config::Schema DistLockComponentBase::GetStaticConfigSchema () {
101164 return yaml_config::MergeSchemasFromResource<
102165 components::ComponentBase>(" src/storages/postgres/dist_lock_component_base.yaml" );
0 commit comments