Skip to content

Commit a36a79c

Browse files
authored
Mutable groups (#41)
* feat(group): add mutable selector and mutable urltest outbound groups * add mutable groups adapter * feat(adapter): add MutableGroupManager for dynamic outbound/endpoint group management * add tests, fix nil pointer issue * create new var for wrapping onClose to prevent recursive call * handle nil type assertion * fix nil pointer issue * use wrappedOnClose for error handling in NewConnectionEx and NewPacketConnectionEx * update register functions to match name * avoid deleting outbound groups, just remove them from their parent * add method to add existing outbound to group * allow groups to be created without initial outbounds * fix deadlock, nil pointer issues * replace stop chan with ctx cancel, move ticker and pause cb to checkloop * fix deadlock on remove in selector * add tags from new outbounds to list * change Add/Remove methods to variadic for outbound tags * collect and return all missing tags instead of returning on first missing * set selected on start and run urltest, skip urltest group when testing * skip group outbounds without a selected outbound
1 parent b7ae595 commit a36a79c

File tree

11 files changed

+1444
-164
lines changed

11 files changed

+1444
-164
lines changed

adapter/group.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package adapter
2+
3+
import (
4+
"github.com/sagernet/sing-box/adapter"
5+
)
6+
7+
type MutableOutboundGroup interface {
8+
adapter.OutboundGroup
9+
Add(tags ...string) (n int, err error)
10+
Remove(tags ...string) (n int, err error)
11+
}

adapter/groups/manager.go

Lines changed: 331 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,331 @@
1+
package groups
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"maps"
8+
"slices"
9+
"sync"
10+
"sync/atomic"
11+
"time"
12+
13+
A "github.com/sagernet/sing-box/adapter"
14+
"github.com/sagernet/sing-box/experimental/clashapi/trafficontrol"
15+
"github.com/sagernet/sing-box/log"
16+
"github.com/sagernet/sing/common/logger"
17+
18+
"github.com/getlantern/sing-box-extensions/adapter"
19+
)
20+
21+
const (
22+
pollInterval = 5 * time.Second
23+
forceAfter = 15 * time.Minute
24+
)
25+
26+
var (
27+
ErrIsClosed = errors.New("manager is closed")
28+
)
29+
30+
type ConnectionManager interface {
31+
Connections() []trafficontrol.TrackerMetadata
32+
}
33+
34+
type MutableGroupManager struct {
35+
outboundMgr A.OutboundManager
36+
endpointMgr A.EndpointManager
37+
groups map[string]adapter.MutableOutboundGroup
38+
39+
removalQueue *removalQueue
40+
closed atomic.Bool
41+
mu sync.Mutex
42+
}
43+
44+
func NewMutableGroupManager(
45+
logger logger.ContextLogger,
46+
outboundManager A.OutboundManager,
47+
endpointManager A.EndpointManager,
48+
connectionManager ConnectionManager,
49+
groups []adapter.MutableOutboundGroup,
50+
) *MutableGroupManager {
51+
gMap := make(map[string]adapter.MutableOutboundGroup)
52+
for _, group := range groups {
53+
gMap[group.Tag()] = group
54+
}
55+
return &MutableGroupManager{
56+
outboundMgr: outboundManager,
57+
endpointMgr: endpointManager,
58+
groups: gMap,
59+
removalQueue: newRemovalQueue(
60+
logger, outboundManager, endpointManager, connectionManager, pollInterval, forceAfter,
61+
),
62+
}
63+
}
64+
65+
func (m *MutableGroupManager) Close() {
66+
if m.closed.Swap(true) {
67+
return
68+
}
69+
m.removalQueue.close()
70+
}
71+
72+
func (m *MutableGroupManager) OutboundGroup(tag string) (adapter.MutableOutboundGroup, bool) {
73+
group, found := m.groups[tag]
74+
return group, found
75+
}
76+
77+
func (m *MutableGroupManager) OutboundGroups() []adapter.MutableOutboundGroup {
78+
m.mu.Lock()
79+
defer m.mu.Unlock()
80+
return slices.Collect(maps.Values(m.groups))
81+
}
82+
83+
// CreateOutboundForGroup creates an outbound and adds it to the specified group.
84+
func (m *MutableGroupManager) CreateOutboundForGroup(
85+
ctx context.Context,
86+
router A.Router,
87+
logger log.ContextLogger,
88+
group, tag, typ string,
89+
options any,
90+
) error {
91+
return m.createForGroup(ctx, m.outboundMgr, router, logger, group, tag, typ, options)
92+
}
93+
94+
// CreateEndpointForGroup creates an endpoint and adds it to the specified group.
95+
func (m *MutableGroupManager) CreateEndpointForGroup(
96+
ctx context.Context,
97+
router A.Router,
98+
logger log.ContextLogger,
99+
group, tag, typ string,
100+
options any,
101+
) error {
102+
return m.createForGroup(ctx, m.endpointMgr, router, logger, group, tag, typ, options)
103+
}
104+
105+
type outboundManager interface {
106+
Create(ctx context.Context, router A.Router, logger log.ContextLogger, tag string, outboundType string, options any) error
107+
}
108+
109+
func (m *MutableGroupManager) createForGroup(
110+
ctx context.Context,
111+
mgr outboundManager,
112+
router A.Router,
113+
logger log.ContextLogger,
114+
group, tag, typ string,
115+
options any,
116+
) error {
117+
m.mu.Lock()
118+
defer m.mu.Unlock()
119+
if m.closed.Load() {
120+
return errors.New("manager is closed")
121+
}
122+
123+
outGroup, found := m.groups[group]
124+
if !found {
125+
return fmt.Errorf("group %s not found", group)
126+
}
127+
128+
if err := mgr.Create(ctx, router, logger, tag, typ, options); err != nil {
129+
return err
130+
}
131+
return m.addToGroup(outGroup, tag)
132+
}
133+
134+
func (m *MutableGroupManager) AddToGroup(group, tag string) error {
135+
m.mu.Lock()
136+
defer m.mu.Unlock()
137+
if m.closed.Load() {
138+
return errors.New("manager is closed")
139+
}
140+
141+
outGroup, found := m.groups[group]
142+
if !found {
143+
return fmt.Errorf("group %s not found", group)
144+
}
145+
return m.addToGroup(outGroup, tag)
146+
}
147+
148+
func (m *MutableGroupManager) addToGroup(outGroup adapter.MutableOutboundGroup, tag string) error {
149+
n, err := outGroup.Add(tag)
150+
if err != nil || n == 0 {
151+
if err == nil {
152+
err = errors.New("unknown")
153+
}
154+
return fmt.Errorf("failed to add %s to %s: %w", tag, outGroup.Tag(), err)
155+
}
156+
// remove from removal queue in case it was scheduled for removal
157+
m.removalQueue.dequeue(tag)
158+
return nil
159+
}
160+
161+
// RemoveFromGroup removes an outbound/endpoint from the specified group.
162+
func (m *MutableGroupManager) RemoveFromGroup(group, tag string) error {
163+
m.mu.Lock()
164+
defer m.mu.Unlock()
165+
if m.closed.Load() {
166+
return ErrIsClosed
167+
}
168+
groupObj, found := m.groups[group]
169+
if !found {
170+
return fmt.Errorf("group %s not found", group)
171+
}
172+
173+
n, err := groupObj.Remove(tag)
174+
if err != nil || n == 0 {
175+
if err == nil {
176+
err = errors.New("unknown")
177+
}
178+
return fmt.Errorf("failed to remove %s from %s: %w", tag, group, err)
179+
}
180+
181+
outbound, exists := m.outboundMgr.Outbound(tag)
182+
if !exists {
183+
return nil
184+
}
185+
// we don't want to delete outbound groups themselves
186+
if _, isOutboundGroup := outbound.(A.OutboundGroup); !isOutboundGroup {
187+
_, isEndpoint := m.endpointMgr.Get(tag)
188+
m.removalQueue.enqueue(tag, isEndpoint)
189+
}
190+
return nil
191+
}
192+
193+
// removalQueue handles delayed removal of outbounds/endpoints.
194+
type removalQueue struct {
195+
logger log.ContextLogger
196+
outMgr A.OutboundManager
197+
epMgr A.EndpointManager
198+
connMgr ConnectionManager
199+
pending map[string]item
200+
pollInterval time.Duration
201+
forceAfter time.Duration
202+
mu sync.RWMutex
203+
running atomic.Bool
204+
done chan struct{}
205+
once sync.Once
206+
}
207+
208+
type item struct {
209+
tag string
210+
isEndpoint bool
211+
addedAt time.Time
212+
}
213+
214+
func newRemovalQueue(
215+
logger log.ContextLogger,
216+
outMgr A.OutboundManager,
217+
epMgr A.EndpointManager,
218+
connMgr ConnectionManager,
219+
pollInterval, forceAfter time.Duration,
220+
) *removalQueue {
221+
return &removalQueue{
222+
logger: logger,
223+
outMgr: outMgr,
224+
epMgr: epMgr,
225+
connMgr: connMgr,
226+
pending: make(map[string]item),
227+
pollInterval: pollInterval,
228+
forceAfter: forceAfter,
229+
done: make(chan struct{}),
230+
}
231+
}
232+
233+
func (rq *removalQueue) enqueue(tag string, isEndpoint bool) {
234+
select {
235+
case <-rq.done:
236+
return
237+
default:
238+
}
239+
240+
rq.mu.Lock()
241+
defer rq.mu.Unlock()
242+
if _, exists := rq.pending[tag]; exists {
243+
return
244+
}
245+
rq.pending[tag] = item{
246+
tag: tag,
247+
isEndpoint: isEndpoint,
248+
addedAt: time.Now(),
249+
}
250+
if !rq.running.Load() {
251+
go rq.checkLoop()
252+
}
253+
}
254+
255+
func (rq *removalQueue) dequeue(tag string) {
256+
rq.mu.Lock()
257+
delete(rq.pending, tag)
258+
rq.mu.Unlock()
259+
}
260+
261+
func (rq *removalQueue) checkLoop() {
262+
if !rq.running.CompareAndSwap(false, true) {
263+
return
264+
}
265+
defer rq.running.Store(false)
266+
267+
rq.checkPending()
268+
ticker := time.NewTicker(rq.pollInterval)
269+
defer ticker.Stop()
270+
for {
271+
rq.mu.Lock()
272+
if len(rq.pending) == 0 {
273+
rq.mu.Unlock()
274+
return
275+
}
276+
rq.mu.Unlock()
277+
select {
278+
case <-ticker.C:
279+
rq.checkPending()
280+
case <-rq.done:
281+
return
282+
}
283+
}
284+
}
285+
286+
// checkPending checks the pending removal items and removes them if they are not in use or have
287+
// exceeded the forceAfter duration.
288+
func (rq *removalQueue) checkPending() {
289+
rq.mu.RLock()
290+
pending := make(map[string]item, len(rq.pending))
291+
maps.Copy(pending, rq.pending)
292+
rq.mu.RUnlock()
293+
294+
hasConns := make(map[string]bool, len(rq.pending))
295+
for _, conn := range rq.connMgr.Connections() {
296+
if _, exists := pending[conn.Outbound]; exists {
297+
hasConns[conn.Outbound] = hasConns[conn.Outbound] || conn.ClosedAt.IsZero()
298+
}
299+
}
300+
301+
var toRemove []string
302+
for tag, item := range pending {
303+
if !hasConns[tag] || time.Since(item.addedAt) > rq.forceAfter {
304+
toRemove = append(toRemove, tag)
305+
}
306+
}
307+
308+
if len(toRemove) > 0 {
309+
rq.mu.Lock()
310+
defer rq.mu.Unlock()
311+
for _, tag := range toRemove {
312+
item, exists := rq.pending[tag]
313+
if !exists {
314+
continue
315+
}
316+
rq.logger.Debug("removing outbound", "tag", tag)
317+
if item.isEndpoint {
318+
rq.epMgr.Remove(tag)
319+
} else {
320+
rq.outMgr.Remove(tag)
321+
}
322+
delete(rq.pending, tag)
323+
}
324+
}
325+
}
326+
327+
func (rq *removalQueue) close() {
328+
rq.once.Do(func() {
329+
close(rq.done)
330+
})
331+
}

0 commit comments

Comments
 (0)