@@ -2,9 +2,11 @@ package server
2
2
3
3
import (
4
4
"context"
5
+ "errors"
5
6
"fmt"
6
7
"net"
7
8
"net/netip"
9
+ "os"
8
10
"strings"
9
11
"sync"
10
12
"time"
@@ -38,20 +40,28 @@ import (
38
40
internalStatus "github.com/netbirdio/netbird/shared/management/status"
39
41
)
40
42
43
+ const (
44
+ envLogBlockedPeers = "NB_LOG_BLOCKED_PEERS"
45
+ envBlockPeers = "NB_BLOCK_SAME_PEERS"
46
+ )
47
+
41
48
// GRPCServer an instance of a Management gRPC API server
42
49
type GRPCServer struct {
43
50
accountManager account.Manager
44
51
settingsManager settings.Manager
45
52
wgKey wgtypes.Key
46
53
proto.UnimplementedManagementServiceServer
47
- peersUpdateManager * PeersUpdateManager
48
- config * nbconfig.Config
49
- secretsManager SecretsManager
50
- appMetrics telemetry.AppMetrics
51
- ephemeralManager * EphemeralManager
52
- peerLocks sync.Map
53
- authManager auth.Manager
54
- integratedPeerValidator integrated_validator.IntegratedValidator
54
+ peersUpdateManager * PeersUpdateManager
55
+ config * nbconfig.Config
56
+ secretsManager SecretsManager
57
+ appMetrics telemetry.AppMetrics
58
+ ephemeralManager * EphemeralManager
59
+ peerLocks sync.Map
60
+ authManager auth.Manager
61
+
62
+ logBlockedPeers bool
63
+ blockPeersWithSameConfig bool
64
+ integratedPeerValidator integrated_validator.IntegratedValidator
55
65
}
56
66
57
67
// NewServer creates a new Management server
@@ -82,18 +92,23 @@ func NewServer(
82
92
}
83
93
}
84
94
95
+ logBlockedPeers := strings .ToLower (os .Getenv (envLogBlockedPeers )) == "true"
96
+ blockPeersWithSameConfig := strings .ToLower (os .Getenv (envBlockPeers )) == "true"
97
+
85
98
return & GRPCServer {
86
99
wgKey : key ,
87
100
// peerKey -> event channel
88
- peersUpdateManager : peersUpdateManager ,
89
- accountManager : accountManager ,
90
- settingsManager : settingsManager ,
91
- config : config ,
92
- secretsManager : secretsManager ,
93
- authManager : authManager ,
94
- appMetrics : appMetrics ,
95
- ephemeralManager : ephemeralManager ,
96
- integratedPeerValidator : integratedPeerValidator ,
101
+ peersUpdateManager : peersUpdateManager ,
102
+ accountManager : accountManager ,
103
+ settingsManager : settingsManager ,
104
+ config : config ,
105
+ secretsManager : secretsManager ,
106
+ authManager : authManager ,
107
+ appMetrics : appMetrics ,
108
+ ephemeralManager : ephemeralManager ,
109
+ logBlockedPeers : logBlockedPeers ,
110
+ blockPeersWithSameConfig : blockPeersWithSameConfig ,
111
+ integratedPeerValidator : integratedPeerValidator ,
97
112
}, nil
98
113
}
99
114
@@ -136,9 +151,6 @@ func getRealIP(ctx context.Context) net.IP {
136
151
// notifies the connected peer of any updates (e.g. new peers under the same account)
137
152
func (s * GRPCServer ) Sync (req * proto.EncryptedMessage , srv proto.ManagementService_SyncServer ) error {
138
153
reqStart := time .Now ()
139
- if s .appMetrics != nil {
140
- s .appMetrics .GRPCMetrics ().CountSyncRequest ()
141
- }
142
154
143
155
ctx := srv .Context ()
144
156
@@ -147,6 +159,25 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
147
159
if err != nil {
148
160
return err
149
161
}
162
+ realIP := getRealIP (ctx )
163
+ sRealIP := realIP .String ()
164
+ peerMeta := extractPeerMeta (ctx , syncReq .GetMeta ())
165
+ metahashed := metaHash (peerMeta , sRealIP )
166
+ if ! s .accountManager .AllowSync (peerKey .String (), metahashed ) {
167
+ if s .appMetrics != nil {
168
+ s .appMetrics .GRPCMetrics ().CountSyncRequestBlocked ()
169
+ }
170
+ if s .logBlockedPeers {
171
+ log .WithContext (ctx ).Warnf ("peer %s with meta hash %d is blocked from syncing" , peerKey .String (), metahashed )
172
+ }
173
+ if s .blockPeersWithSameConfig {
174
+ return mapError (ctx , internalStatus .ErrPeerAlreadyLoggedIn )
175
+ }
176
+ }
177
+
178
+ if s .appMetrics != nil {
179
+ s .appMetrics .GRPCMetrics ().CountSyncRequest ()
180
+ }
150
181
151
182
// nolint:staticcheck
152
183
ctx = context .WithValue (ctx , nbContext .PeerIDKey , peerKey .String ())
@@ -172,14 +203,13 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
172
203
// nolint:staticcheck
173
204
ctx = context .WithValue (ctx , nbContext .AccountIDKey , accountID )
174
205
175
- realIP := getRealIP (ctx )
176
- log .WithContext (ctx ).Debugf ("Sync request from peer [%s] [%s]" , req .WgPubKey , realIP .String ())
206
+ log .WithContext (ctx ).Debugf ("Sync request from peer [%s] [%s]" , req .WgPubKey , sRealIP )
177
207
178
208
if syncReq .GetMeta () == nil {
179
209
log .WithContext (ctx ).Tracef ("peer system meta has to be provided on sync. Peer %s, remote addr %s" , peerKey .String (), realIP )
180
210
}
181
211
182
- peer , netMap , postureChecks , err := s .accountManager .SyncAndMarkPeer (ctx , accountID , peerKey .String (), extractPeerMeta ( ctx , syncReq . GetMeta ()) , realIP )
212
+ peer , netMap , postureChecks , err := s .accountManager .SyncAndMarkPeer (ctx , accountID , peerKey .String (), peerMeta , realIP )
183
213
if err != nil {
184
214
log .WithContext (ctx ).Debugf ("error while syncing peer %s: %v" , peerKey .String (), err )
185
215
return mapError (ctx , err )
@@ -345,6 +375,9 @@ func mapError(ctx context.Context, err error) error {
345
375
default :
346
376
}
347
377
}
378
+ if errors .Is (err , internalStatus .ErrPeerAlreadyLoggedIn ) {
379
+ return status .Error (codes .PermissionDenied , internalStatus .ErrPeerAlreadyLoggedIn .Error ())
380
+ }
348
381
log .WithContext (ctx ).Errorf ("got an unhandled error: %s" , err )
349
382
return status .Errorf (codes .Internal , "failed handling request" )
350
383
}
@@ -436,19 +469,34 @@ func (s *GRPCServer) parseRequest(ctx context.Context, req *proto.EncryptedMessa
436
469
// In case of the successful registration login is also successful
437
470
func (s * GRPCServer ) Login (ctx context.Context , req * proto.EncryptedMessage ) (* proto.EncryptedMessage , error ) {
438
471
reqStart := time .Now ()
439
-
440
- if s .appMetrics != nil {
441
- s .appMetrics .GRPCMetrics ().CountLoginRequest ()
442
- }
443
472
realIP := getRealIP (ctx )
444
- log .WithContext (ctx ).Debugf ("Login request from peer [%s] [%s]" , req .WgPubKey , realIP .String ())
473
+ sRealIP := realIP .String ()
474
+ log .WithContext (ctx ).Debugf ("Login request from peer [%s] [%s]" , req .WgPubKey , sRealIP )
445
475
446
476
loginReq := & proto.LoginRequest {}
447
477
peerKey , err := s .parseRequest (ctx , req , loginReq )
448
478
if err != nil {
449
479
return nil , err
450
480
}
451
481
482
+ peerMeta := extractPeerMeta (ctx , loginReq .GetMeta ())
483
+ metahashed := metaHash (peerMeta , sRealIP )
484
+ if ! s .accountManager .AllowSync (peerKey .String (), metahashed ) {
485
+ if s .logBlockedPeers {
486
+ log .WithContext (ctx ).Warnf ("peer %s with meta hash %d is blocked from login" , peerKey .String (), metahashed )
487
+ }
488
+ if s .appMetrics != nil {
489
+ s .appMetrics .GRPCMetrics ().CountLoginRequestBlocked ()
490
+ }
491
+ if s .blockPeersWithSameConfig {
492
+ return nil , internalStatus .ErrPeerAlreadyLoggedIn
493
+ }
494
+ }
495
+
496
+ if s .appMetrics != nil {
497
+ s .appMetrics .GRPCMetrics ().CountLoginRequest ()
498
+ }
499
+
452
500
//nolint
453
501
ctx = context .WithValue (ctx , nbContext .PeerIDKey , peerKey .String ())
454
502
accountID , err := s .accountManager .GetAccountIDForPeerKey (ctx , peerKey .String ())
@@ -485,7 +533,7 @@ func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*p
485
533
peer , netMap , postureChecks , err := s .accountManager .LoginPeer (ctx , types.PeerLogin {
486
534
WireGuardPubKey : peerKey .String (),
487
535
SSHKey : string (sshKey ),
488
- Meta : extractPeerMeta ( ctx , loginReq . GetMeta ()) ,
536
+ Meta : peerMeta ,
489
537
UserID : userID ,
490
538
SetupKey : loginReq .GetSetupKey (),
491
539
ConnectionIP : realIP ,
0 commit comments