Skip to content

Commit 552dc60

Browse files
[management] migrate group peers into seperate table (#4096)
1 parent 71bb09d commit 552dc60

24 files changed

+1132
-414
lines changed

management/server/account.go

Lines changed: 42 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1368,7 +1368,7 @@ func (am *DefaultAccountManager) SyncUserJWTGroups(ctx context.Context, userAuth
13681368
return nil
13691369
}
13701370

1371-
if err = transaction.SaveGroups(ctx, store.LockingStrengthUpdate, userAuth.AccountId, newGroupsToCreate); err != nil {
1371+
if err = transaction.CreateGroups(ctx, store.LockingStrengthUpdate, userAuth.AccountId, newGroupsToCreate); err != nil {
13721372
return fmt.Errorf("error saving groups: %w", err)
13731373
}
13741374

@@ -1382,28 +1382,22 @@ func (am *DefaultAccountManager) SyncUserJWTGroups(ctx context.Context, userAuth
13821382

13831383
// Propagate changes to peers if group propagation is enabled
13841384
if settings.GroupsPropagationEnabled {
1385-
groups, err = transaction.GetAccountGroups(ctx, store.LockingStrengthShare, userAuth.AccountId)
1386-
if err != nil {
1387-
return fmt.Errorf("error getting account groups: %w", err)
1388-
}
1389-
1390-
groupsMap := make(map[string]*types.Group, len(groups))
1391-
for _, group := range groups {
1392-
groupsMap[group.ID] = group
1393-
}
1394-
13951385
peers, err := transaction.GetUserPeers(ctx, store.LockingStrengthShare, userAuth.AccountId, userAuth.UserId)
13961386
if err != nil {
13971387
return fmt.Errorf("error getting user peers: %w", err)
13981388
}
13991389

1400-
updatedGroups, err := updateUserPeersInGroups(groupsMap, peers, addNewGroups, removeOldGroups)
1401-
if err != nil {
1402-
return fmt.Errorf("error modifying user peers in groups: %w", err)
1403-
}
1404-
1405-
if err = transaction.SaveGroups(ctx, store.LockingStrengthUpdate, userAuth.AccountId, updatedGroups); err != nil {
1406-
return fmt.Errorf("error saving groups: %w", err)
1390+
for _, peer := range peers {
1391+
for _, g := range addNewGroups {
1392+
if err := transaction.AddPeerToGroup(ctx, userAuth.AccountId, peer.ID, g); err != nil {
1393+
return fmt.Errorf("error adding peer %s to group %s: %w", peer.ID, g, err)
1394+
}
1395+
}
1396+
for _, g := range removeOldGroups {
1397+
if err := transaction.RemovePeerFromGroup(ctx, peer.ID, g); err != nil {
1398+
return fmt.Errorf("error removing peer %s from group %s: %w", peer.ID, g, err)
1399+
}
1400+
}
14071401
}
14081402

14091403
if err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, userAuth.AccountId); err != nil {
@@ -1971,53 +1965,56 @@ func (am *DefaultAccountManager) UpdateToPrimaryAccount(ctx context.Context, acc
19711965
// propagateUserGroupMemberships propagates all account users' group memberships to their peers.
19721966
// Returns true if any groups were modified, true if those updates affect peers and an error.
19731967
func propagateUserGroupMemberships(ctx context.Context, transaction store.Store, accountID string) (groupsUpdated bool, peersAffected bool, err error) {
1974-
groups, err := transaction.GetAccountGroups(ctx, store.LockingStrengthShare, accountID)
1968+
users, err := transaction.GetAccountUsers(ctx, store.LockingStrengthShare, accountID)
19751969
if err != nil {
19761970
return false, false, err
19771971
}
19781972

1979-
groupsMap := make(map[string]*types.Group, len(groups))
1980-
for _, group := range groups {
1981-
groupsMap[group.ID] = group
1973+
accountGroupPeers, err := transaction.GetAccountGroupPeers(ctx, store.LockingStrengthShare, accountID)
1974+
if err != nil {
1975+
return false, false, fmt.Errorf("error getting account group peers: %w", err)
19821976
}
19831977

1984-
users, err := transaction.GetAccountUsers(ctx, store.LockingStrengthShare, accountID)
1978+
accountGroups, err := transaction.GetAccountGroups(ctx, store.LockingStrengthShare, accountID)
19851979
if err != nil {
1986-
return false, false, err
1980+
return false, false, fmt.Errorf("error getting account groups: %w", err)
19871981
}
19881982

1989-
groupsToUpdate := make(map[string]*types.Group)
1983+
for _, group := range accountGroups {
1984+
if _, exists := accountGroupPeers[group.ID]; !exists {
1985+
accountGroupPeers[group.ID] = make(map[string]struct{})
1986+
}
1987+
}
19901988

1989+
updatedGroups := []string{}
19911990
for _, user := range users {
19921991
userPeers, err := transaction.GetUserPeers(ctx, store.LockingStrengthShare, accountID, user.Id)
19931992
if err != nil {
19941993
return false, false, err
19951994
}
19961995

1997-
updatedGroups, err := updateUserPeersInGroups(groupsMap, userPeers, user.AutoGroups, nil)
1998-
if err != nil {
1999-
return false, false, err
2000-
}
2001-
2002-
for _, group := range updatedGroups {
2003-
groupsToUpdate[group.ID] = group
2004-
groupsMap[group.ID] = group
1996+
for _, peer := range userPeers {
1997+
for _, groupID := range user.AutoGroups {
1998+
if _, exists := accountGroupPeers[groupID]; !exists {
1999+
// we do not wanna create the groups here
2000+
log.WithContext(ctx).Warnf("group %s does not exist for user group propagation", groupID)
2001+
continue
2002+
}
2003+
if _, exists := accountGroupPeers[groupID][peer.ID]; exists {
2004+
continue
2005+
}
2006+
if err := transaction.AddPeerToGroup(ctx, accountID, peer.ID, groupID); err != nil {
2007+
return false, false, fmt.Errorf("error adding peer %s to group %s: %w", peer.ID, groupID, err)
2008+
}
2009+
updatedGroups = append(updatedGroups, groupID)
2010+
}
20052011
}
20062012
}
20072013

2008-
if len(groupsToUpdate) == 0 {
2009-
return false, false, nil
2010-
}
2011-
2012-
peersAffected, err = areGroupChangesAffectPeers(ctx, transaction, accountID, maps.Keys(groupsToUpdate))
2013-
if err != nil {
2014-
return false, false, err
2015-
}
2016-
2017-
err = transaction.SaveGroups(ctx, store.LockingStrengthUpdate, accountID, maps.Values(groupsToUpdate))
2014+
peersAffected, err = areGroupChangesAffectPeers(ctx, transaction, accountID, updatedGroups)
20182015
if err != nil {
2019-
return false, false, err
2016+
return false, false, fmt.Errorf("error checking if group changes affect peers: %w", err)
20202017
}
20212018

2022-
return true, peersAffected, nil
2019+
return len(updatedGroups) > 0, peersAffected, nil
20232020
}

management/server/account/manager.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,10 @@ type Manager interface {
6262
GetGroup(ctx context.Context, accountId, groupID, userID string) (*types.Group, error)
6363
GetAllGroups(ctx context.Context, accountID, userID string) ([]*types.Group, error)
6464
GetGroupByName(ctx context.Context, groupName, accountID string) (*types.Group, error)
65-
SaveGroup(ctx context.Context, accountID, userID string, group *types.Group, create bool) error
66-
SaveGroups(ctx context.Context, accountID, userID string, newGroups []*types.Group, create bool) error
65+
CreateGroup(ctx context.Context, accountID, userID string, group *types.Group) error
66+
UpdateGroup(ctx context.Context, accountID, userID string, group *types.Group) error
67+
CreateGroups(ctx context.Context, accountID, userID string, newGroups []*types.Group) error
68+
UpdateGroups(ctx context.Context, accountID, userID string, newGroups []*types.Group) error
6769
DeleteGroup(ctx context.Context, accountId, userId, groupID string) error
6870
DeleteGroups(ctx context.Context, accountId, userId string, groupIDs []string) error
6971
GroupAddPeer(ctx context.Context, accountId, groupID, peerID string) error

management/server/account_test.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,7 +1159,7 @@ func TestAccountManager_NetworkUpdates_SaveGroup(t *testing.T) {
11591159
Name: "GroupA",
11601160
Peers: []string{},
11611161
}
1162-
if err := manager.SaveGroup(context.Background(), account.Id, userID, &group, true); err != nil {
1162+
if err := manager.CreateGroup(context.Background(), account.Id, userID, &group); err != nil {
11631163
t.Errorf("save group: %v", err)
11641164
return
11651165
}
@@ -1194,7 +1194,7 @@ func TestAccountManager_NetworkUpdates_SaveGroup(t *testing.T) {
11941194
}()
11951195

11961196
group.Peers = []string{peer1.ID, peer2.ID, peer3.ID}
1197-
if err := manager.SaveGroup(context.Background(), account.Id, userID, &group, true); err != nil {
1197+
if err := manager.UpdateGroup(context.Background(), account.Id, userID, &group); err != nil {
11981198
t.Errorf("save group: %v", err)
11991199
return
12001200
}
@@ -1240,11 +1240,12 @@ func TestAccountManager_NetworkUpdates_SavePolicy(t *testing.T) {
12401240
manager, account, peer1, peer2, _ := setupNetworkMapTest(t)
12411241

12421242
group := types.Group{
1243-
ID: "groupA",
1244-
Name: "GroupA",
1245-
Peers: []string{peer1.ID, peer2.ID},
1243+
AccountID: account.Id,
1244+
ID: "groupA",
1245+
Name: "GroupA",
1246+
Peers: []string{peer1.ID, peer2.ID},
12461247
}
1247-
if err := manager.SaveGroup(context.Background(), account.Id, userID, &group, true); err != nil {
1248+
if err := manager.CreateGroup(context.Background(), account.Id, userID, &group); err != nil {
12481249
t.Errorf("save group: %v", err)
12491250
return
12501251
}
@@ -1292,7 +1293,7 @@ func TestAccountManager_NetworkUpdates_DeletePeer(t *testing.T) {
12921293
Name: "GroupA",
12931294
Peers: []string{peer1.ID, peer3.ID},
12941295
}
1295-
if err := manager.SaveGroup(context.Background(), account.Id, userID, &group, true); err != nil {
1296+
if err := manager.CreateGroup(context.Background(), account.Id, userID, &group); err != nil {
12961297
t.Errorf("save group: %v", err)
12971298
return
12981299
}
@@ -1343,11 +1344,11 @@ func TestAccountManager_NetworkUpdates_DeleteGroup(t *testing.T) {
13431344
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
13441345
defer manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
13451346

1346-
err := manager.SaveGroup(context.Background(), account.Id, userID, &types.Group{
1347+
err := manager.CreateGroup(context.Background(), account.Id, userID, &types.Group{
13471348
ID: "groupA",
13481349
Name: "GroupA",
13491350
Peers: []string{peer1.ID, peer2.ID, peer3.ID},
1350-
}, true)
1351+
})
13511352

13521353
require.NoError(t, err, "failed to save group")
13531354

@@ -1672,9 +1673,10 @@ func TestAccount_Copy(t *testing.T) {
16721673
},
16731674
Groups: map[string]*types.Group{
16741675
"group1": {
1675-
ID: "group1",
1676-
Peers: []string{"peer1"},
1677-
Resources: []types.Resource{},
1676+
ID: "group1",
1677+
Peers: []string{"peer1"},
1678+
Resources: []types.Resource{},
1679+
GroupPeers: []types.GroupPeer{},
16781680
},
16791681
},
16801682
Policies: []*types.Policy{
@@ -2616,6 +2618,7 @@ func TestAccount_GetNextInactivePeerExpiration(t *testing.T) {
26162618
}
26172619

26182620
func TestAccount_SetJWTGroups(t *testing.T) {
2621+
t.Setenv("NETBIRD_STORE_ENGINE", "postgres")
26192622
manager, err := createManager(t)
26202623
require.NoError(t, err, "unable to create account manager")
26212624

@@ -3360,7 +3363,7 @@ func TestPropagateUserGroupMemberships(t *testing.T) {
33603363

33613364
t.Run("should update membership but no account peers update for unused groups", func(t *testing.T) {
33623365
group1 := &types.Group{ID: "group1", Name: "Group 1", AccountID: account.Id}
3363-
require.NoError(t, manager.Store.SaveGroup(ctx, store.LockingStrengthUpdate, group1))
3366+
require.NoError(t, manager.Store.CreateGroup(ctx, store.LockingStrengthUpdate, group1))
33643367

33653368
user, err := manager.Store.GetUserByUserID(ctx, store.LockingStrengthShare, initiatorId)
33663369
require.NoError(t, err)
@@ -3382,7 +3385,7 @@ func TestPropagateUserGroupMemberships(t *testing.T) {
33823385

33833386
t.Run("should update membership and account peers for used groups", func(t *testing.T) {
33843387
group2 := &types.Group{ID: "group2", Name: "Group 2", AccountID: account.Id}
3385-
require.NoError(t, manager.Store.SaveGroup(ctx, store.LockingStrengthUpdate, group2))
3388+
require.NoError(t, manager.Store.CreateGroup(ctx, store.LockingStrengthUpdate, group2))
33863389

33873390
user, err := manager.Store.GetUserByUserID(ctx, store.LockingStrengthShare, initiatorId)
33883391
require.NoError(t, err)

management/server/dns_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ func TestToProtocolDNSConfigWithCache(t *testing.T) {
495495
func TestDNSAccountPeersUpdate(t *testing.T) {
496496
manager, account, peer1, peer2, peer3 := setupNetworkMapTest(t)
497497

498-
err := manager.SaveGroups(context.Background(), account.Id, userID, []*types.Group{
498+
err := manager.CreateGroups(context.Background(), account.Id, userID, []*types.Group{
499499
{
500500
ID: "groupA",
501501
Name: "GroupA",
@@ -506,7 +506,7 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
506506
Name: "GroupB",
507507
Peers: []string{},
508508
},
509-
}, true)
509+
})
510510
assert.NoError(t, err)
511511

512512
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
@@ -562,11 +562,11 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
562562

563563
// Creating DNS settings with groups that have peers should update account peers and send peer update
564564
t.Run("creating dns setting with used groups", func(t *testing.T) {
565-
err = manager.SaveGroup(context.Background(), account.Id, userID, &types.Group{
565+
err = manager.UpdateGroup(context.Background(), account.Id, userID, &types.Group{
566566
ID: "groupA",
567567
Name: "GroupA",
568568
Peers: []string{peer1.ID, peer2.ID, peer3.ID},
569-
}, true)
569+
})
570570
assert.NoError(t, err)
571571

572572
done := make(chan struct{})

0 commit comments

Comments
 (0)