Skip to content

Commit 5f4eee9

Browse files
committed
refactor: connections view
1 parent 0028b06 commit 5f4eee9

File tree

9 files changed

+776
-409
lines changed

9 files changed

+776
-409
lines changed

daemon/started_service.go

Lines changed: 241 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ type StartedService struct {
5656
urlTestHistoryStorage *urltest.HistoryStorage
5757
clashModeSubscriber *observable.Subscriber[struct{}]
5858
clashModeObserver *observable.Observer[struct{}]
59+
60+
connectionEventSubscriber *observable.Subscriber[trafficontrol.ConnectionEvent]
61+
connectionEventObserver *observable.Observer[trafficontrol.ConnectionEvent]
5962
}
6063

6164
type ServiceOptions struct {
@@ -83,17 +86,19 @@ func NewStartedService(options ServiceOptions) *StartedService {
8386
// userID: options.UserID,
8487
// groupID: options.GroupID,
8588
// systemProxyEnabled: options.SystemProxyEnabled,
86-
serviceStatus: &ServiceStatus{Status: ServiceStatus_IDLE},
87-
serviceStatusSubscriber: observable.NewSubscriber[*ServiceStatus](4),
88-
logSubscriber: observable.NewSubscriber[*log.Entry](128),
89-
urlTestSubscriber: observable.NewSubscriber[struct{}](1),
90-
urlTestHistoryStorage: urltest.NewHistoryStorage(),
91-
clashModeSubscriber: observable.NewSubscriber[struct{}](1),
89+
serviceStatus: &ServiceStatus{Status: ServiceStatus_IDLE},
90+
serviceStatusSubscriber: observable.NewSubscriber[*ServiceStatus](4),
91+
logSubscriber: observable.NewSubscriber[*log.Entry](128),
92+
urlTestSubscriber: observable.NewSubscriber[struct{}](1),
93+
urlTestHistoryStorage: urltest.NewHistoryStorage(),
94+
clashModeSubscriber: observable.NewSubscriber[struct{}](1),
95+
connectionEventSubscriber: observable.NewSubscriber[trafficontrol.ConnectionEvent](256),
9296
}
9397
s.serviceStatusObserver = observable.NewObserver(s.serviceStatusSubscriber, 2)
9498
s.logObserver = observable.NewObserver(s.logSubscriber, 64)
9599
s.urlTestObserver = observable.NewObserver(s.urlTestSubscriber, 1)
96100
s.clashModeObserver = observable.NewObserver(s.clashModeSubscriber, 1)
101+
s.connectionEventObserver = observable.NewObserver(s.connectionEventSubscriber, 64)
97102
return s
98103
}
99104

@@ -183,6 +188,7 @@ func (s *StartedService) StartOrReloadService(profileContent string, options *Ov
183188
instance.urlTestHistoryStorage.SetHook(s.urlTestSubscriber)
184189
if instance.clashServer != nil {
185190
instance.clashServer.SetModeUpdateHook(s.clashModeSubscriber)
191+
instance.clashServer.(*clashapi.Server).TrafficManager().SetEventHook(s.connectionEventSubscriber)
186192
}
187193
s.serviceAccess.Unlock()
188194
err = instance.Start()
@@ -666,77 +672,261 @@ func (s *StartedService) SetSystemProxyEnabled(ctx context.Context, request *Set
666672
return nil, err
667673
}
668674

669-
func (s *StartedService) SubscribeConnections(request *SubscribeConnectionsRequest, server grpc.ServerStreamingServer[Connections]) error {
675+
func (s *StartedService) SubscribeConnections(request *SubscribeConnectionsRequest, server grpc.ServerStreamingServer[ConnectionEvents]) error {
670676
err := s.waitForStarted(server.Context())
671677
if err != nil {
672678
return err
673679
}
674680
s.serviceAccess.RLock()
675681
boxService := s.instance
676682
s.serviceAccess.RUnlock()
677-
ticker := time.NewTicker(time.Duration(request.Interval))
678-
defer ticker.Stop()
683+
684+
if boxService.clashServer == nil {
685+
return E.New("clash server not available")
686+
}
687+
679688
trafficManager := boxService.clashServer.(*clashapi.Server).TrafficManager()
680-
var (
681-
connections = make(map[uuid.UUID]*Connection)
682-
outConnections []*Connection
683-
)
689+
690+
subscription, done, err := s.connectionEventObserver.Subscribe()
691+
if err != nil {
692+
return err
693+
}
694+
defer s.connectionEventObserver.UnSubscribe(subscription)
695+
696+
connectionSnapshots := make(map[uuid.UUID]connectionSnapshot)
697+
initialEvents := s.buildInitialConnectionState(trafficManager, connectionSnapshots)
698+
err = server.Send(&ConnectionEvents{
699+
Events: initialEvents,
700+
Reset_: true,
701+
})
702+
if err != nil {
703+
return err
704+
}
705+
706+
interval := time.Duration(request.Interval)
707+
if interval <= 0 {
708+
interval = time.Second
709+
}
710+
ticker := time.NewTicker(interval)
711+
defer ticker.Stop()
712+
684713
for {
685-
outConnections = outConnections[:0]
686-
for _, connection := range trafficManager.Connections() {
687-
outConnections = append(outConnections, newConnection(connections, connection, false))
688-
}
689-
for _, connection := range trafficManager.ClosedConnections() {
690-
outConnections = append(outConnections, newConnection(connections, connection, true))
691-
}
692-
err := server.Send(&Connections{Connections: outConnections})
693-
if err != nil {
694-
return err
695-
}
696714
select {
697715
case <-s.ctx.Done():
698716
return s.ctx.Err()
699717
case <-server.Context().Done():
700718
return server.Context().Err()
719+
case <-done:
720+
return nil
721+
722+
case event := <-subscription:
723+
var pendingEvents []*ConnectionEvent
724+
if protoEvent := s.applyConnectionEvent(event, connectionSnapshots); protoEvent != nil {
725+
pendingEvents = append(pendingEvents, protoEvent)
726+
}
727+
drain:
728+
for {
729+
select {
730+
case event = <-subscription:
731+
if protoEvent := s.applyConnectionEvent(event, connectionSnapshots); protoEvent != nil {
732+
pendingEvents = append(pendingEvents, protoEvent)
733+
}
734+
default:
735+
break drain
736+
}
737+
}
738+
if len(pendingEvents) > 0 {
739+
err = server.Send(&ConnectionEvents{Events: pendingEvents})
740+
if err != nil {
741+
return err
742+
}
743+
}
744+
701745
case <-ticker.C:
746+
protoEvents := s.buildTrafficUpdates(trafficManager, connectionSnapshots)
747+
if len(protoEvents) == 0 {
748+
continue
749+
}
750+
err = server.Send(&ConnectionEvents{Events: protoEvents})
751+
if err != nil {
752+
return err
753+
}
754+
}
755+
}
756+
}
757+
758+
type connectionSnapshot struct {
759+
uplink int64
760+
downlink int64
761+
hadTraffic bool
762+
}
763+
764+
func (s *StartedService) buildInitialConnectionState(manager *trafficontrol.Manager, snapshots map[uuid.UUID]connectionSnapshot) []*ConnectionEvent {
765+
var events []*ConnectionEvent
766+
767+
for _, metadata := range manager.Connections() {
768+
events = append(events, &ConnectionEvent{
769+
Type: ConnectionEventType_CONNECTION_EVENT_NEW,
770+
Id: metadata.ID.String(),
771+
Connection: buildConnectionProto(metadata),
772+
})
773+
snapshots[metadata.ID] = connectionSnapshot{
774+
uplink: metadata.Upload.Load(),
775+
downlink: metadata.Download.Load(),
702776
}
703777
}
778+
779+
for _, metadata := range manager.ClosedConnections() {
780+
conn := buildConnectionProto(metadata)
781+
conn.ClosedAt = metadata.ClosedAt.UnixMilli()
782+
events = append(events, &ConnectionEvent{
783+
Type: ConnectionEventType_CONNECTION_EVENT_NEW,
784+
Id: metadata.ID.String(),
785+
Connection: conn,
786+
})
787+
}
788+
789+
return events
704790
}
705791

706-
func newConnection(connections map[uuid.UUID]*Connection, metadata trafficontrol.TrackerMetadata, isClosed bool) *Connection {
707-
if oldConnection, loaded := connections[metadata.ID]; loaded {
708-
if isClosed {
709-
if oldConnection.ClosedAt == 0 {
710-
oldConnection.Uplink = 0
711-
oldConnection.Downlink = 0
712-
oldConnection.ClosedAt = metadata.ClosedAt.UnixMilli()
792+
func (s *StartedService) applyConnectionEvent(event trafficontrol.ConnectionEvent, snapshots map[uuid.UUID]connectionSnapshot) *ConnectionEvent {
793+
switch event.Type {
794+
case trafficontrol.ConnectionEventNew:
795+
if _, exists := snapshots[event.ID]; exists {
796+
return nil
797+
}
798+
snapshots[event.ID] = connectionSnapshot{
799+
uplink: event.Metadata.Upload.Load(),
800+
downlink: event.Metadata.Download.Load(),
801+
}
802+
return &ConnectionEvent{
803+
Type: ConnectionEventType_CONNECTION_EVENT_NEW,
804+
Id: event.ID.String(),
805+
Connection: buildConnectionProto(event.Metadata),
806+
}
807+
case trafficontrol.ConnectionEventClosed:
808+
delete(snapshots, event.ID)
809+
protoEvent := &ConnectionEvent{
810+
Type: ConnectionEventType_CONNECTION_EVENT_CLOSED,
811+
Id: event.ID.String(),
812+
}
813+
closedAt := event.ClosedAt
814+
if closedAt.IsZero() && !event.Metadata.ClosedAt.IsZero() {
815+
closedAt = event.Metadata.ClosedAt
816+
}
817+
if closedAt.IsZero() {
818+
closedAt = time.Now()
819+
}
820+
protoEvent.ClosedAt = closedAt.UnixMilli()
821+
if event.Metadata.ID != uuid.Nil {
822+
conn := buildConnectionProto(event.Metadata)
823+
conn.ClosedAt = protoEvent.ClosedAt
824+
protoEvent.Connection = conn
825+
}
826+
return protoEvent
827+
default:
828+
return nil
829+
}
830+
}
831+
832+
func (s *StartedService) buildTrafficUpdates(manager *trafficontrol.Manager, snapshots map[uuid.UUID]connectionSnapshot) []*ConnectionEvent {
833+
activeConnections := manager.Connections()
834+
activeIndex := make(map[uuid.UUID]trafficontrol.TrackerMetadata, len(activeConnections))
835+
var events []*ConnectionEvent
836+
837+
for _, metadata := range activeConnections {
838+
activeIndex[metadata.ID] = metadata
839+
currentUpload := metadata.Upload.Load()
840+
currentDownload := metadata.Download.Load()
841+
snapshot, exists := snapshots[metadata.ID]
842+
if !exists {
843+
snapshots[metadata.ID] = connectionSnapshot{
844+
uplink: currentUpload,
845+
downlink: currentDownload,
713846
}
714-
return oldConnection
715-
}
716-
lastUplink := oldConnection.UplinkTotal
717-
lastDownlink := oldConnection.DownlinkTotal
718-
uplinkTotal := metadata.Upload.Load()
719-
downlinkTotal := metadata.Download.Load()
720-
oldConnection.Uplink = uplinkTotal - lastUplink
721-
oldConnection.Downlink = downlinkTotal - lastDownlink
722-
oldConnection.UplinkTotal = uplinkTotal
723-
oldConnection.DownlinkTotal = downlinkTotal
724-
return oldConnection
847+
events = append(events, &ConnectionEvent{
848+
Type: ConnectionEventType_CONNECTION_EVENT_NEW,
849+
Id: metadata.ID.String(),
850+
Connection: buildConnectionProto(metadata),
851+
})
852+
continue
853+
}
854+
uplinkDelta := currentUpload - snapshot.uplink
855+
downlinkDelta := currentDownload - snapshot.downlink
856+
if uplinkDelta < 0 || downlinkDelta < 0 {
857+
snapshots[metadata.ID] = connectionSnapshot{
858+
uplink: currentUpload,
859+
downlink: currentDownload,
860+
}
861+
continue
862+
}
863+
if uplinkDelta > 0 || downlinkDelta > 0 {
864+
snapshots[metadata.ID] = connectionSnapshot{
865+
uplink: currentUpload,
866+
downlink: currentDownload,
867+
hadTraffic: true,
868+
}
869+
events = append(events, &ConnectionEvent{
870+
Type: ConnectionEventType_CONNECTION_EVENT_UPDATE,
871+
Id: metadata.ID.String(),
872+
UplinkDelta: uplinkDelta,
873+
DownlinkDelta: downlinkDelta,
874+
})
875+
continue
876+
}
877+
if snapshot.hadTraffic {
878+
snapshots[metadata.ID] = connectionSnapshot{
879+
uplink: currentUpload,
880+
downlink: currentDownload,
881+
}
882+
events = append(events, &ConnectionEvent{
883+
Type: ConnectionEventType_CONNECTION_EVENT_UPDATE,
884+
Id: metadata.ID.String(),
885+
UplinkDelta: 0,
886+
DownlinkDelta: 0,
887+
})
888+
}
889+
}
890+
891+
var closedIndex map[uuid.UUID]trafficontrol.TrackerMetadata
892+
for id := range snapshots {
893+
if _, exists := activeIndex[id]; exists {
894+
continue
895+
}
896+
if closedIndex == nil {
897+
closedIndex = make(map[uuid.UUID]trafficontrol.TrackerMetadata)
898+
for _, metadata := range manager.ClosedConnections() {
899+
closedIndex[metadata.ID] = metadata
900+
}
901+
}
902+
closedAt := time.Now()
903+
var conn *Connection
904+
if metadata, ok := closedIndex[id]; ok {
905+
if !metadata.ClosedAt.IsZero() {
906+
closedAt = metadata.ClosedAt
907+
}
908+
conn = buildConnectionProto(metadata)
909+
conn.ClosedAt = closedAt.UnixMilli()
910+
}
911+
events = append(events, &ConnectionEvent{
912+
Type: ConnectionEventType_CONNECTION_EVENT_CLOSED,
913+
Id: id.String(),
914+
ClosedAt: closedAt.UnixMilli(),
915+
Connection: conn,
916+
})
917+
delete(snapshots, id)
725918
}
919+
920+
return events
921+
}
922+
923+
func buildConnectionProto(metadata trafficontrol.TrackerMetadata) *Connection {
726924
var rule string
727925
if metadata.Rule != nil {
728926
rule = metadata.Rule.String()
729927
}
730928
uplinkTotal := metadata.Upload.Load()
731929
downlinkTotal := metadata.Download.Load()
732-
uplink := uplinkTotal
733-
downlink := downlinkTotal
734-
var closedAt int64
735-
if !metadata.ClosedAt.IsZero() {
736-
closedAt = metadata.ClosedAt.UnixMilli()
737-
uplink = 0
738-
downlink = 0
739-
}
740930
var processInfo *ProcessInfo
741931
if metadata.Metadata.ProcessInfo != nil {
742932
processInfo = &ProcessInfo{
@@ -747,7 +937,7 @@ func newConnection(connections map[uuid.UUID]*Connection, metadata trafficontrol
747937
PackageName: metadata.Metadata.ProcessInfo.AndroidPackageName,
748938
}
749939
}
750-
connection := &Connection{
940+
return &Connection{
751941
Id: metadata.ID.String(),
752942
Inbound: metadata.Metadata.Inbound,
753943
InboundType: metadata.Metadata.InboundType,
@@ -760,9 +950,6 @@ func newConnection(connections map[uuid.UUID]*Connection, metadata trafficontrol
760950
User: metadata.Metadata.User,
761951
FromOutbound: metadata.Metadata.Outbound,
762952
CreatedAt: metadata.CreatedAt.UnixMilli(),
763-
ClosedAt: closedAt,
764-
Uplink: uplink,
765-
Downlink: downlink,
766953
UplinkTotal: uplinkTotal,
767954
DownlinkTotal: downlinkTotal,
768955
Rule: rule,
@@ -771,8 +958,6 @@ func newConnection(connections map[uuid.UUID]*Connection, metadata trafficontrol
771958
ChainList: metadata.Chain,
772959
ProcessInfo: processInfo,
773960
}
774-
connections[metadata.ID] = connection
775-
return connection
776961
}
777962

778963
func (s *StartedService) CloseConnection(ctx context.Context, request *CloseConnectionRequest) (*emptypb.Empty, error) {

0 commit comments

Comments
 (0)