Skip to content

Commit 15fa2a2

Browse files
committed
feat: support keep connection to all avaliable peer and add handshake time
1 parent 5ad5cff commit 15fa2a2

21 files changed

+1637
-423
lines changed

biz/client/rpc_pull_wireguards.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/VaalaCat/frp-panel/defs"
77
"github.com/VaalaCat/frp-panel/pb"
88
"github.com/VaalaCat/frp-panel/services/app"
9+
"github.com/sirupsen/logrus"
910
)
1011

1112
func PullWireGuards(appInstance app.Application, clientID, clientSecret string) error {
@@ -32,7 +33,8 @@ func PullWireGuards(appInstance app.Application, clientID, clientSecret string)
3233
}
3334

3435
log.Debugf("client [%s] has [%d] wireguards, check their status", clientID, len(resp.GetWireguardConfigs()))
35-
log.Tracef("wireguardConfigs: %v", resp.GetWireguardConfigs())
36+
log.Tracef("wireguardConfigs: %s", resp.String())
37+
3638
wgMgr := ctx.GetApp().GetWireGuardManager()
3739
successCnt := 0
3840
for _, wireGuard := range resp.GetWireguardConfigs() {
@@ -43,7 +45,7 @@ func PullWireGuards(appInstance app.Application, clientID, clientSecret string)
4345
wgMgr.RemoveService(wireGuard.GetInterfaceName())
4446
} else {
4547
log.Debugf("wireguard [%s] already exists, skip create, update peers if need", wireGuard.GetInterfaceName())
46-
wgSvc.PatchPeers(wgCfg.GetParsedPeers())
48+
syncExistingWireGuard(log, wgSvc, wgCfg)
4749
continue
4850
}
4951
}
@@ -65,3 +67,18 @@ func PullWireGuards(appInstance app.Application, clientID, clientSecret string)
6567

6668
return nil
6769
}
70+
71+
func syncExistingWireGuard(log *logrus.Entry, wgSvc app.WireGuard, wgCfg *defs.WireGuardConfig) {
72+
if wgSvc == nil || wgCfg == nil {
73+
return
74+
}
75+
// 主链路:先更新 adjs,再 patch peers。wg 内部会基于最新拓扑做预连接补齐/不可直连清理。
76+
if err := wgSvc.UpdateAdjs(wgCfg.GetAdjs()); err != nil {
77+
log.WithError(err).Warn("update adjs failed while syncing existing wireguard")
78+
return
79+
}
80+
if _, err := wgSvc.PatchPeers(wgCfg.GetParsedPeers()); err != nil {
81+
log.WithError(err).Warn("patch peers failed while syncing existing wireguard")
82+
return
83+
}
84+
}

biz/client/update_wireguard.go.go

Lines changed: 47 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/VaalaCat/frp-panel/pb"
88
"github.com/VaalaCat/frp-panel/services/app"
99
"github.com/samber/lo"
10+
"github.com/sirupsen/logrus"
1011
)
1112

1213
func UpdateWireGuard(ctx *app.Context, req *pb.UpdateWireGuardRequest) (*pb.UpdateWireGuardResponse, error) {
@@ -38,19 +39,15 @@ func AddPeer(ctx *app.Context, wgSvc app.WireGuard, req *pb.UpdateWireGuardReque
3839

3940
log.Debugf("add peer, peer_config: %+v", req.GetWireguardConfig().GetPeers())
4041

41-
for _, peer := range req.GetWireguardConfig().GetPeers() {
42-
err := wgSvc.AddPeer(&defs.WireGuardPeerConfig{WireGuardPeerConfig: peer})
43-
if err != nil {
44-
log.WithError(err).Errorf("add peer failed")
45-
continue
46-
}
47-
}
48-
49-
if err := wgSvc.UpdateAdjs(req.GetWireguardConfig().GetAdjs()); err != nil {
50-
log.WithError(err).Errorf("update adjs failed, adjs: %+v", req.GetWireguardConfig().GetAdjs())
42+
// 主链路:先更新 adjs(保证后续 wg 内部的预连接/清理逻辑使用最新拓扑)
43+
if err := updateAdjsFirst(log, wgSvc, req); err != nil {
5144
return nil, err
5245
}
5346

47+
applyPeerOps(log, req.GetWireguardConfig().GetPeers(), "add peer", func(peer *pb.WireGuardPeerConfig) error {
48+
return wgSvc.AddPeer(&defs.WireGuardPeerConfig{WireGuardPeerConfig: peer})
49+
})
50+
5451
log.Infof("add peer done")
5552

5653
return &pb.UpdateWireGuardResponse{Status: &pb.Status{Code: pb.RespCode_RESP_CODE_SUCCESS, Message: "success"}}, nil
@@ -61,19 +58,15 @@ func RemovePeer(ctx *app.Context, wgSvc app.WireGuard, req *pb.UpdateWireGuardRe
6158

6259
log.Debugf("remove peer, peer_config: %+v", req.GetWireguardConfig().GetPeers())
6360

64-
for _, peer := range req.GetWireguardConfig().GetPeers() {
65-
err := wgSvc.RemovePeer(peer.GetPublicKey())
66-
if err != nil {
67-
log.WithError(err).Errorf("remove peer failed")
68-
continue
69-
}
70-
}
71-
72-
if err := wgSvc.UpdateAdjs(req.GetWireguardConfig().GetAdjs()); err != nil {
73-
log.WithError(err).Errorf("update adjs failed, adjs: %+v", req.GetWireguardConfig().GetAdjs())
61+
// 主链路:先更新 adjs(保证后续 wg 内部的预连接/清理逻辑使用最新拓扑)
62+
if err := updateAdjsFirst(log, wgSvc, req); err != nil {
7463
return nil, err
7564
}
7665

66+
applyPeerOps(log, req.GetWireguardConfig().GetPeers(), "remove peer routes", func(peer *pb.WireGuardPeerConfig) error {
67+
return wgSvc.RemovePeer(peer.GetPublicKey())
68+
})
69+
7770
log.Infof("remove peer done")
7871

7972
return &pb.UpdateWireGuardResponse{Status: &pb.Status{Code: pb.RespCode_RESP_CODE_SUCCESS, Message: "success"}}, nil
@@ -84,19 +77,15 @@ func UpdatePeer(ctx *app.Context, wgSvc app.WireGuard, req *pb.UpdateWireGuardRe
8477

8578
log.Debugf("update peer, peer_config: %+v", req.GetWireguardConfig().GetPeers())
8679

87-
for _, peer := range req.GetWireguardConfig().GetPeers() {
88-
err := wgSvc.UpdatePeer(&defs.WireGuardPeerConfig{WireGuardPeerConfig: peer})
89-
if err != nil {
90-
log.WithError(err).Errorf("update peer failed")
91-
continue
92-
}
93-
}
94-
95-
if err := wgSvc.UpdateAdjs(req.GetWireguardConfig().GetAdjs()); err != nil {
96-
log.WithError(err).Errorf("update adjs failed, adjs: %+v", req.GetWireguardConfig().GetAdjs())
80+
// 主链路:先更新 adjs(保证后续 wg 内部的预连接/清理逻辑使用最新拓扑)
81+
if err := updateAdjsFirst(log, wgSvc, req); err != nil {
9782
return nil, err
9883
}
9984

85+
applyPeerOps(log, req.GetWireguardConfig().GetPeers(), "update peer", func(peer *pb.WireGuardPeerConfig) error {
86+
return wgSvc.UpdatePeer(&defs.WireGuardPeerConfig{WireGuardPeerConfig: peer})
87+
})
88+
10089
log.Infof("update peer done")
10190

10291
return &pb.UpdateWireGuardResponse{Status: &pb.Status{Code: pb.RespCode_RESP_CODE_SUCCESS, Message: "success"}}, nil
@@ -107,6 +96,11 @@ func PatchPeers(ctx *app.Context, wgSvc app.WireGuard, req *pb.UpdateWireGuardRe
10796

10897
log.Debugf("patch peers, peer_config: %+v", req.GetWireguardConfig().GetPeers())
10998

99+
// 主链路:先更新 adjs(保证后续 wg 内部的预连接/清理逻辑使用最新拓扑)
100+
if err := updateAdjsFirst(log, wgSvc, req); err != nil {
101+
return nil, err
102+
}
103+
110104
wgCfg := &defs.WireGuardConfig{WireGuardConfig: req.GetWireguardConfig()}
111105

112106
diffResp, err := wgSvc.PatchPeers(wgCfg.GetParsedPeers())
@@ -115,14 +109,32 @@ func PatchPeers(ctx *app.Context, wgSvc app.WireGuard, req *pb.UpdateWireGuardRe
115109
return nil, err
116110
}
117111

118-
if err = wgSvc.UpdateAdjs(req.GetWireguardConfig().GetAdjs()); err != nil {
119-
log.WithError(err).Errorf("update adjs failed, adjs: %+v", req.GetWireguardConfig().GetAdjs())
120-
return nil, err
121-
}
122-
123112
log.Debugf("patch peers done, add_peers: %+v, remove_peers: %+v",
124113
lo.Map(diffResp.AddPeers, func(item *defs.WireGuardPeerConfig, _ int) string { return item.GetClientId() }),
125114
lo.Map(diffResp.RemovePeers, func(item *defs.WireGuardPeerConfig, _ int) string { return item.GetClientId() }))
126115

127116
return &pb.UpdateWireGuardResponse{Status: &pb.Status{Code: pb.RespCode_RESP_CODE_SUCCESS, Message: "success"}}, nil
128117
}
118+
119+
func updateAdjsFirst(log *logrus.Entry, wgSvc app.WireGuard, req *pb.UpdateWireGuardRequest) error {
120+
if req == nil || req.GetWireguardConfig() == nil {
121+
return nil
122+
}
123+
if err := wgSvc.UpdateAdjs(req.GetWireguardConfig().GetAdjs()); err != nil {
124+
log.WithError(err).Errorf("update adjs failed, adjs: %+v", req.GetWireguardConfig().GetAdjs())
125+
return err
126+
}
127+
return nil
128+
}
129+
130+
func applyPeerOps(log *logrus.Entry, peers []*pb.WireGuardPeerConfig, op string, fn func(peer *pb.WireGuardPeerConfig) error) {
131+
for _, peer := range peers {
132+
if peer == nil {
133+
continue
134+
}
135+
if err := fn(peer); err != nil {
136+
log.WithError(err).Errorf("%s failed", op)
137+
continue
138+
}
139+
}
140+
}

biz/master/wg/client_list_wireguards.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package wg
22

33
import (
4+
"sort"
5+
46
"github.com/VaalaCat/frp-panel/models"
57
"github.com/VaalaCat/frp-panel/pb"
68
"github.com/VaalaCat/frp-panel/services/app"
79
"github.com/VaalaCat/frp-panel/services/dao"
810
wgsvc "github.com/VaalaCat/frp-panel/services/wg"
911
"github.com/samber/lo"
12+
"github.com/sirupsen/logrus"
1013
)
1114

1215
func ListClientWireGuards(ctx *app.Context, req *pb.ListClientWireGuardsRequest) (*pb.ListClientWireGuardsResponse, error) {
@@ -79,6 +82,15 @@ func ListClientWireGuards(ctx *app.Context, req *pb.ListClientWireGuardsRequest)
7982
return nil
8083
}
8184

85+
// 构建 network 内 WireGuard 索引,用于补齐“可直连 peer 的基础配置”
86+
idToWg := make(map[uint32]*models.WireGuard, len(networkPeers[wgCfg.NetworkID]))
87+
for _, item := range networkPeers[wgCfg.NetworkID] {
88+
if item == nil {
89+
continue
90+
}
91+
idToWg[uint32(item.ID)] = item
92+
}
93+
8294
r := wgCfg.ToPB()
8395
r.Peers = lo.Map(networkPeerConfigsMap[wgCfg.NetworkID][wgCfg.ID],
8496
func(peerCfg *pb.WireGuardPeerConfig, _ int) *pb.WireGuardPeerConfig {
@@ -87,9 +99,89 @@ func ListClientWireGuards(ctx *app.Context, req *pb.ListClientWireGuardsRequest)
8799

88100
r.Adjs = adjsToPB(networkAllEdgesMap[wgCfg.NetworkID])
89101

102+
fillConnectablePeersAsPreconnect(r, uint32(wgCfg.ID), idToWg, log)
103+
sortPeersStable(r)
104+
90105
return r
91106
}),
92107
}
93108

94109
return resp, nil
95110
}
111+
112+
// fillConnectablePeersAsPreconnect 将 adj[localID] 中可直连的 peer 补齐到 r.peers 中,并将 AllowedIPs 置空(只预连接,不承载路由)。
113+
func fillConnectablePeersAsPreconnect(r *pb.WireGuardConfig, localID uint32, idToWg map[uint32]*models.WireGuard, log *logrus.Entry) {
114+
if r == nil || localID == 0 {
115+
return
116+
}
117+
exists := make(map[uint32]struct{}, len(r.GetPeers()))
118+
for _, p := range r.GetPeers() {
119+
if p == nil {
120+
continue
121+
}
122+
if p.GetId() != 0 {
123+
exists[p.GetId()] = struct{}{}
124+
}
125+
if p.GetEndpoint() != nil && p.GetEndpoint().GetWireguardId() != 0 {
126+
exists[p.GetEndpoint().GetWireguardId()] = struct{}{}
127+
}
128+
}
129+
130+
links := r.GetAdjs()[localID]
131+
if links == nil {
132+
return
133+
}
134+
for _, l := range links.GetLinks() {
135+
if l == nil {
136+
continue
137+
}
138+
toID := l.GetToWireguardId()
139+
if toID == 0 || toID == localID {
140+
continue
141+
}
142+
if _, ok := exists[toID]; ok {
143+
continue
144+
}
145+
remote, ok := idToWg[toID]
146+
if !ok || remote == nil {
147+
continue
148+
}
149+
150+
// 优先使用链路显式 to_endpoint
151+
var specifiedEndpoint *models.Endpoint
152+
if l.GetToEndpoint() != nil {
153+
m := &models.Endpoint{}
154+
m.FromPB(l.GetToEndpoint())
155+
specifiedEndpoint = m
156+
}
157+
158+
base, err := remote.AsBasePeerConfig(specifiedEndpoint)
159+
if err != nil {
160+
log.WithError(err).Warnf("failed to build base peer config for preconnect: local=%d to=%d", localID, toID)
161+
continue
162+
}
163+
base.AllowedIps = nil
164+
r.Peers = append(r.Peers, base)
165+
exists[toID] = struct{}{}
166+
}
167+
}
168+
169+
func sortPeersStable(r *pb.WireGuardConfig) {
170+
if r == nil || len(r.Peers) <= 1 {
171+
return
172+
}
173+
sort.SliceStable(r.Peers, func(i, j int) bool {
174+
pi := r.Peers[i]
175+
pj := r.Peers[j]
176+
if pi == nil && pj == nil {
177+
return false
178+
}
179+
if pi == nil {
180+
return false
181+
}
182+
if pj == nil {
183+
return true
184+
}
185+
return pi.GetClientId() < pj.GetClientId()
186+
})
187+
}

biz/master/wg/get_network_typology.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,19 +48,26 @@ func GetNetworkTopology(ctx *app.Context, req *pb.GetNetworkTopologyRequest) (*p
4848
ctx.GetApp().GetClientsManager(),
4949
)
5050

51-
var resp map[uint][]wg.Edge
52-
5351
if req.GetSpf() {
54-
resp, err = wg.NewDijkstraAllowedIPsPlanner(policy).BuildFinalGraph(peers, links)
55-
} else {
56-
resp, err = wg.NewDijkstraAllowedIPsPlanner(policy).BuildGraph(peers, links)
52+
// SPF 模式:展示“真实下发的路由表”(即 PeerConfig.AllowedIps),确保与实际一致。
53+
peerCfgs, allEdges, err := wg.PlanAllowedIPs(peers, links, policy)
54+
if err != nil {
55+
log.WithError(err).Errorf("failed to plan allowed ips")
56+
return nil, err
57+
}
58+
adjs := peerConfigsToPBAdjs(peerCfgs, allEdges)
59+
60+
return &pb.GetNetworkTopologyResponse{
61+
Status: &pb.Status{Code: pb.RespCode_RESP_CODE_SUCCESS, Message: "success"},
62+
Adjs: adjs,
63+
}, nil
5764
}
5865

66+
resp, err := wg.NewDijkstraAllowedIPsPlanner(policy).BuildGraph(peers, links)
5967
if err != nil {
6068
log.WithError(err).Errorf("failed to build graph")
6169
return nil, err
6270
}
63-
6471
adjs := adjsToPB(resp)
6572

6673
return &pb.GetNetworkTopologyResponse{

0 commit comments

Comments
 (0)