Skip to content

Commit 39757d7

Browse files
alexvruydbot
authored andcommitted
Propagate StorageConfig up the distconf tree (#25867)
1 parent bfc0198 commit 39757d7

File tree

5 files changed

+38
-9
lines changed

5 files changed

+38
-9
lines changed

ydb/core/blobstorage/nodewarden/distconf_binding.cpp

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -432,22 +432,37 @@ namespace NKikimr::NStorage {
432432
FanOutReversePush();
433433
}
434434

435+
std::unique_ptr<TEvNodeConfigPush> pendingPush;
436+
auto getPendingPushRecord = [&] {
437+
if (!pendingPush) {
438+
pendingPush.reset(new TEvNodeConfigPush);
439+
}
440+
return &pendingPush->Record;
441+
};
442+
435443
// process cache updates, if needed
436444
if (record.HasCacheUpdate()) {
437445
auto *cacheUpdate = record.MutableCacheUpdate();
438446
ApplyCacheUpdates(cacheUpdate, senderNodeId);
439447

440448
if (cacheUpdate->RequestedKeysSize()) {
441-
auto ev = std::make_unique<TEvNodeConfigPush>();
442449
for (const TString& key : cacheUpdate->GetRequestedKeys()) {
443450
if (const auto it = Cache.find(key); it != Cache.end()) {
444-
AddCacheUpdate(ev->Record.MutableCacheUpdate(), it, true);
451+
AddCacheUpdate(getPendingPushRecord()->MutableCacheUpdate(), it, true);
445452
}
446453
}
447-
SendEvent(*Binding, std::move(ev));
448454
}
449455
}
450456

457+
if (StorageConfig && record.HasRequestStorageConfigGeneration() &&
458+
record.GetRequestStorageConfigGeneration() <= StorageConfig->GetGeneration()) {
459+
getPendingPushRecord()->MutableStorageConfig()->CopyFrom(*StorageConfig);
460+
}
461+
462+
if (pendingPush) {
463+
SendEvent(*Binding, std::move(pendingPush));
464+
}
465+
451466
// if we have a root node id that is not ours, then we drop all inbound pile connections; also execute any
452467
// pending commands
453468
if (Binding && Binding->RootNodeId) {
@@ -537,11 +552,14 @@ namespace NKikimr::NStorage {
537552

538553
// check if we have to send our current config to the peer
539554
const NKikimrBlobStorage::TStorageConfig *configToPeer = nullptr;
540-
if (record.GetInitial()) {
555+
std::optional<ui64> requestStorageConfigGeneration;
556+
if (StorageConfig) {
541557
for (const auto& item : record.GetBoundNodes()) {
542558
if (item.GetNodeId().GetNodeId() == senderNodeId) {
543-
if (StorageConfig && item.GetMeta().GetGeneration() < StorageConfig->GetGeneration()) {
559+
if (item.GetMeta().GetGeneration() < StorageConfig->GetGeneration()) {
544560
configToPeer = StorageConfig.get();
561+
} else if (StorageConfig->GetGeneration() < item.GetMeta().GetGeneration()) {
562+
requestStorageConfigGeneration.emplace(item.GetMeta().GetGeneration());
545563
}
546564
break;
547565
}
@@ -606,7 +624,8 @@ namespace NKikimr::NStorage {
606624
GetRootNodeId());
607625
TBoundNode& info = it->second;
608626
if (inserted) {
609-
auto response = std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), configToPeer);
627+
auto response = std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), configToPeer,
628+
requestStorageConfigGeneration);
610629
if (record.GetInitial()) {
611630
auto *cache = record.MutableCacheUpdate();
612631

@@ -675,6 +694,10 @@ namespace NKikimr::NStorage {
675694
ApplyCacheUpdates(record.MutableCacheUpdate(), senderNodeId);
676695
}
677696

697+
if (record.HasStorageConfig()) {
698+
ApplyStorageConfig(record.GetStorageConfig(), /*fromBinding=*/ false);
699+
}
700+
678701
if (pushEv && pushEv->IsUseful()) {
679702
SendEvent(*Binding, std::move(pushEv));
680703
}

ydb/core/blobstorage/nodewarden/distconf_cache.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ namespace NKikimr::NStorage {
4848
}
4949
// propagate backwards
5050
for (auto& [nodeId, info] : DirectBoundNodes) {
51-
auto ev = std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), nullptr);
51+
auto ev = std::make_unique<TEvNodeConfigReversePush>(GetRootNodeId(), nullptr, std::nullopt);
5252
info.LastReportedRootNodeId = GetRootNodeId();
5353
ev->Record.MutableCacheUpdate()->CopyFrom(updates);
5454
SendEvent(nodeId, info, std::move(ev));

ydb/core/blobstorage/nodewarden/distconf_fsm.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -715,7 +715,7 @@ namespace NKikimr::NStorage {
715715
const bool needToUpdateConfig = nodeIdsToUpdate.contains(nodeId);
716716
if (needToUpdateConfig || info.LastReportedRootNodeId != rootNodeId) {
717717
SendEvent(nodeId, info, std::make_unique<TEvNodeConfigReversePush>(rootNodeId,
718-
needToUpdateConfig ? StorageConfig.get() : nullptr));
718+
needToUpdateConfig ? StorageConfig.get() : nullptr, std::nullopt));
719719
info.LastReportedRootNodeId = GetRootNodeId();
720720
}
721721
}

ydb/core/blobstorage/nodewarden/node_warden_events.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,15 @@ namespace NKikimr::NStorage {
2222
{
2323
TEvNodeConfigReversePush() = default;
2424

25-
TEvNodeConfigReversePush(ui32 rootNodeId, const NKikimrBlobStorage::TStorageConfig *committedConfig) {
25+
TEvNodeConfigReversePush(ui32 rootNodeId, const NKikimrBlobStorage::TStorageConfig *committedConfig,
26+
std::optional<ui64> requestStorageConfigGeneration) {
2627
Record.SetRootNodeId(rootNodeId);
2728
if (committedConfig) {
2829
Record.MutableCommittedStorageConfig()->CopyFrom(*committedConfig);
2930
}
31+
if (requestStorageConfigGeneration) {
32+
Record.SetRequestStorageConfigGeneration(*requestStorageConfigGeneration);
33+
}
3034
}
3135

3236
static std::unique_ptr<TEvNodeConfigReversePush> MakeRejected(const NKikimrBlobStorage::TStorageConfig *config) {

ydb/core/protos/blobstorage_distributed_config.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ message TEvNodeConfigPush {
7878
repeated TBoundNode BoundNodes = 2; // a list of bound node updates (including itself)
7979
repeated TNodeIdentifier DeletedBoundNodeIds = 3; // a list of detached nodes
8080
TCacheUpdate CacheUpdate = 4;
81+
TStorageConfig StorageConfig = 5;
8182
}
8283

8384
// Used to reverse-propagate configuration and to confirm/reject initial TEvNodePushBinding query.
@@ -87,6 +88,7 @@ message TEvNodeConfigReversePush {
8788
TStorageConfig CommittedStorageConfig = 3; // last known committed storage configuration
8889
bool RecurseConfigUpdate = 4;
8990
TCacheUpdate CacheUpdate = 5;
91+
optional uint64 RequestStorageConfigGeneration = 6;
9092
}
9193

9294
// Remove node from bound list.

0 commit comments

Comments
 (0)