Skip to content

Commit cf92843

Browse files
committed
feat: stream connection events
1 parent f910abc commit cf92843

File tree

9 files changed

+713
-329
lines changed

9 files changed

+713
-329
lines changed

daemon/started_service.go

Lines changed: 241 additions & 83 deletions
Large diffs are not rendered by default.

daemon/started_service.pb.go

Lines changed: 209 additions & 180 deletions
Large diffs are not rendered by default.

daemon/started_service.proto

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ service StartedService {
2727
rpc GetSystemProxyStatus(google.protobuf.Empty) returns(SystemProxyStatus) {}
2828
rpc SetSystemProxyEnabled(SetSystemProxyEnabledRequest) returns(google.protobuf.Empty) {}
2929

30-
rpc SubscribeConnections(SubscribeConnectionsRequest) returns(stream Connections) {}
30+
rpc SubscribeConnections(SubscribeConnectionsRequest) returns(stream ConnectionEvents) {}
3131
rpc CloseConnection(CloseConnectionRequest) returns(google.protobuf.Empty) {}
3232
rpc CloseAllConnections(google.protobuf.Empty) returns(google.protobuf.Empty) {}
3333
rpc GetDeprecatedWarnings(google.protobuf.Empty) returns(DeprecatedWarnings) {}
@@ -143,24 +143,26 @@ message SetSystemProxyEnabledRequest {
143143

144144
message SubscribeConnectionsRequest {
145145
int64 interval = 1;
146-
ConnectionFilter filter = 2;
147-
ConnectionSortBy sortBy = 3;
148146
}
149147

150-
enum ConnectionFilter {
151-
ALL = 0;
152-
ACTIVE = 1;
153-
CLOSED = 2;
148+
enum ConnectionEventType {
149+
CONNECTION_EVENT_NEW = 0;
150+
CONNECTION_EVENT_UPDATE = 1;
151+
CONNECTION_EVENT_CLOSED = 2;
154152
}
155153

156-
enum ConnectionSortBy {
157-
DATE = 0;
158-
TRAFFIC = 1;
159-
TOTAL_TRAFFIC = 2;
154+
message ConnectionEvent {
155+
ConnectionEventType type = 1;
156+
string id = 2;
157+
Connection connection = 3;
158+
int64 uplinkDelta = 4;
159+
int64 downlinkDelta = 5;
160+
int64 closedAt = 6;
160161
}
161162

162-
message Connections {
163-
repeated Connection connections = 1;
163+
message ConnectionEvents {
164+
repeated ConnectionEvent events = 1;
165+
bool reset = 2;
164166
}
165167

166168
message Connection {

daemon/started_service_grpc.pb.go

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ type StartedServiceClient interface {
5858
SetGroupExpand(ctx context.Context, in *SetGroupExpandRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
5959
GetSystemProxyStatus(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SystemProxyStatus, error)
6060
SetSystemProxyEnabled(ctx context.Context, in *SetSystemProxyEnabledRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
61-
SubscribeConnections(ctx context.Context, in *SubscribeConnectionsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Connections], error)
61+
SubscribeConnections(ctx context.Context, in *SubscribeConnectionsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[ConnectionEvents], error)
6262
CloseConnection(ctx context.Context, in *CloseConnectionRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
6363
CloseAllConnections(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*emptypb.Empty, error)
6464
GetDeprecatedWarnings(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*DeprecatedWarnings, error)
@@ -278,13 +278,13 @@ func (c *startedServiceClient) SetSystemProxyEnabled(ctx context.Context, in *Se
278278
return out, nil
279279
}
280280

281-
func (c *startedServiceClient) SubscribeConnections(ctx context.Context, in *SubscribeConnectionsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Connections], error) {
281+
func (c *startedServiceClient) SubscribeConnections(ctx context.Context, in *SubscribeConnectionsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[ConnectionEvents], error) {
282282
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
283283
stream, err := c.cc.NewStream(ctx, &StartedService_ServiceDesc.Streams[5], StartedService_SubscribeConnections_FullMethodName, cOpts...)
284284
if err != nil {
285285
return nil, err
286286
}
287-
x := &grpc.GenericClientStream[SubscribeConnectionsRequest, Connections]{ClientStream: stream}
287+
x := &grpc.GenericClientStream[SubscribeConnectionsRequest, ConnectionEvents]{ClientStream: stream}
288288
if err := x.ClientStream.SendMsg(in); err != nil {
289289
return nil, err
290290
}
@@ -295,7 +295,7 @@ func (c *startedServiceClient) SubscribeConnections(ctx context.Context, in *Sub
295295
}
296296

297297
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
298-
type StartedService_SubscribeConnectionsClient = grpc.ServerStreamingClient[Connections]
298+
type StartedService_SubscribeConnectionsClient = grpc.ServerStreamingClient[ConnectionEvents]
299299

300300
func (c *startedServiceClient) CloseConnection(ctx context.Context, in *CloseConnectionRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
301301
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
@@ -357,7 +357,7 @@ type StartedServiceServer interface {
357357
SetGroupExpand(context.Context, *SetGroupExpandRequest) (*emptypb.Empty, error)
358358
GetSystemProxyStatus(context.Context, *emptypb.Empty) (*SystemProxyStatus, error)
359359
SetSystemProxyEnabled(context.Context, *SetSystemProxyEnabledRequest) (*emptypb.Empty, error)
360-
SubscribeConnections(*SubscribeConnectionsRequest, grpc.ServerStreamingServer[Connections]) error
360+
SubscribeConnections(*SubscribeConnectionsRequest, grpc.ServerStreamingServer[ConnectionEvents]) error
361361
CloseConnection(context.Context, *CloseConnectionRequest) (*emptypb.Empty, error)
362362
CloseAllConnections(context.Context, *emptypb.Empty) (*emptypb.Empty, error)
363363
GetDeprecatedWarnings(context.Context, *emptypb.Empty) (*DeprecatedWarnings, error)
@@ -373,87 +373,87 @@ type StartedServiceServer interface {
373373
type UnimplementedStartedServiceServer struct{}
374374

375375
func (UnimplementedStartedServiceServer) StopService(context.Context, *emptypb.Empty) (*emptypb.Empty, error) {
376-
return nil, status.Errorf(codes.Unimplemented, "method StopService not implemented")
376+
return nil, status.Error(codes.Unimplemented, "method StopService not implemented")
377377
}
378378

379379
func (UnimplementedStartedServiceServer) ReloadService(context.Context, *emptypb.Empty) (*emptypb.Empty, error) {
380-
return nil, status.Errorf(codes.Unimplemented, "method ReloadService not implemented")
380+
return nil, status.Error(codes.Unimplemented, "method ReloadService not implemented")
381381
}
382382

383383
func (UnimplementedStartedServiceServer) SubscribeServiceStatus(*emptypb.Empty, grpc.ServerStreamingServer[ServiceStatus]) error {
384-
return status.Errorf(codes.Unimplemented, "method SubscribeServiceStatus not implemented")
384+
return status.Error(codes.Unimplemented, "method SubscribeServiceStatus not implemented")
385385
}
386386

387387
func (UnimplementedStartedServiceServer) SubscribeLog(*emptypb.Empty, grpc.ServerStreamingServer[Log]) error {
388-
return status.Errorf(codes.Unimplemented, "method SubscribeLog not implemented")
388+
return status.Error(codes.Unimplemented, "method SubscribeLog not implemented")
389389
}
390390

391391
func (UnimplementedStartedServiceServer) GetDefaultLogLevel(context.Context, *emptypb.Empty) (*DefaultLogLevel, error) {
392-
return nil, status.Errorf(codes.Unimplemented, "method GetDefaultLogLevel not implemented")
392+
return nil, status.Error(codes.Unimplemented, "method GetDefaultLogLevel not implemented")
393393
}
394394

395395
func (UnimplementedStartedServiceServer) ClearLogs(context.Context, *emptypb.Empty) (*emptypb.Empty, error) {
396-
return nil, status.Errorf(codes.Unimplemented, "method ClearLogs not implemented")
396+
return nil, status.Error(codes.Unimplemented, "method ClearLogs not implemented")
397397
}
398398

399399
func (UnimplementedStartedServiceServer) SubscribeStatus(*SubscribeStatusRequest, grpc.ServerStreamingServer[Status]) error {
400-
return status.Errorf(codes.Unimplemented, "method SubscribeStatus not implemented")
400+
return status.Error(codes.Unimplemented, "method SubscribeStatus not implemented")
401401
}
402402

403403
func (UnimplementedStartedServiceServer) SubscribeGroups(*emptypb.Empty, grpc.ServerStreamingServer[Groups]) error {
404-
return status.Errorf(codes.Unimplemented, "method SubscribeGroups not implemented")
404+
return status.Error(codes.Unimplemented, "method SubscribeGroups not implemented")
405405
}
406406

407407
func (UnimplementedStartedServiceServer) GetClashModeStatus(context.Context, *emptypb.Empty) (*ClashModeStatus, error) {
408-
return nil, status.Errorf(codes.Unimplemented, "method GetClashModeStatus not implemented")
408+
return nil, status.Error(codes.Unimplemented, "method GetClashModeStatus not implemented")
409409
}
410410

411411
func (UnimplementedStartedServiceServer) SubscribeClashMode(*emptypb.Empty, grpc.ServerStreamingServer[ClashMode]) error {
412-
return status.Errorf(codes.Unimplemented, "method SubscribeClashMode not implemented")
412+
return status.Error(codes.Unimplemented, "method SubscribeClashMode not implemented")
413413
}
414414

415415
func (UnimplementedStartedServiceServer) SetClashMode(context.Context, *ClashMode) (*emptypb.Empty, error) {
416-
return nil, status.Errorf(codes.Unimplemented, "method SetClashMode not implemented")
416+
return nil, status.Error(codes.Unimplemented, "method SetClashMode not implemented")
417417
}
418418

419419
func (UnimplementedStartedServiceServer) URLTest(context.Context, *URLTestRequest) (*emptypb.Empty, error) {
420-
return nil, status.Errorf(codes.Unimplemented, "method URLTest not implemented")
420+
return nil, status.Error(codes.Unimplemented, "method URLTest not implemented")
421421
}
422422

423423
func (UnimplementedStartedServiceServer) SelectOutbound(context.Context, *SelectOutboundRequest) (*emptypb.Empty, error) {
424-
return nil, status.Errorf(codes.Unimplemented, "method SelectOutbound not implemented")
424+
return nil, status.Error(codes.Unimplemented, "method SelectOutbound not implemented")
425425
}
426426

427427
func (UnimplementedStartedServiceServer) SetGroupExpand(context.Context, *SetGroupExpandRequest) (*emptypb.Empty, error) {
428-
return nil, status.Errorf(codes.Unimplemented, "method SetGroupExpand not implemented")
428+
return nil, status.Error(codes.Unimplemented, "method SetGroupExpand not implemented")
429429
}
430430

431431
func (UnimplementedStartedServiceServer) GetSystemProxyStatus(context.Context, *emptypb.Empty) (*SystemProxyStatus, error) {
432-
return nil, status.Errorf(codes.Unimplemented, "method GetSystemProxyStatus not implemented")
432+
return nil, status.Error(codes.Unimplemented, "method GetSystemProxyStatus not implemented")
433433
}
434434

435435
func (UnimplementedStartedServiceServer) SetSystemProxyEnabled(context.Context, *SetSystemProxyEnabledRequest) (*emptypb.Empty, error) {
436-
return nil, status.Errorf(codes.Unimplemented, "method SetSystemProxyEnabled not implemented")
436+
return nil, status.Error(codes.Unimplemented, "method SetSystemProxyEnabled not implemented")
437437
}
438438

439-
func (UnimplementedStartedServiceServer) SubscribeConnections(*SubscribeConnectionsRequest, grpc.ServerStreamingServer[Connections]) error {
440-
return status.Errorf(codes.Unimplemented, "method SubscribeConnections not implemented")
439+
func (UnimplementedStartedServiceServer) SubscribeConnections(*SubscribeConnectionsRequest, grpc.ServerStreamingServer[ConnectionEvents]) error {
440+
return status.Error(codes.Unimplemented, "method SubscribeConnections not implemented")
441441
}
442442

443443
func (UnimplementedStartedServiceServer) CloseConnection(context.Context, *CloseConnectionRequest) (*emptypb.Empty, error) {
444-
return nil, status.Errorf(codes.Unimplemented, "method CloseConnection not implemented")
444+
return nil, status.Error(codes.Unimplemented, "method CloseConnection not implemented")
445445
}
446446

447447
func (UnimplementedStartedServiceServer) CloseAllConnections(context.Context, *emptypb.Empty) (*emptypb.Empty, error) {
448-
return nil, status.Errorf(codes.Unimplemented, "method CloseAllConnections not implemented")
448+
return nil, status.Error(codes.Unimplemented, "method CloseAllConnections not implemented")
449449
}
450450

451451
func (UnimplementedStartedServiceServer) GetDeprecatedWarnings(context.Context, *emptypb.Empty) (*DeprecatedWarnings, error) {
452-
return nil, status.Errorf(codes.Unimplemented, "method GetDeprecatedWarnings not implemented")
452+
return nil, status.Error(codes.Unimplemented, "method GetDeprecatedWarnings not implemented")
453453
}
454454

455455
func (UnimplementedStartedServiceServer) GetStartedAt(context.Context, *emptypb.Empty) (*StartedAt, error) {
456-
return nil, status.Errorf(codes.Unimplemented, "method GetStartedAt not implemented")
456+
return nil, status.Error(codes.Unimplemented, "method GetStartedAt not implemented")
457457
}
458458
func (UnimplementedStartedServiceServer) mustEmbedUnimplementedStartedServiceServer() {}
459459
func (UnimplementedStartedServiceServer) testEmbeddedByValue() {}
@@ -466,7 +466,7 @@ type UnsafeStartedServiceServer interface {
466466
}
467467

468468
func RegisterStartedServiceServer(s grpc.ServiceRegistrar, srv StartedServiceServer) {
469-
// If the following call pancis, it indicates UnimplementedStartedServiceServer was
469+
// If the following call panics, it indicates UnimplementedStartedServiceServer was
470470
// embedded by pointer and is nil. This will cause panics if an
471471
// unimplemented method is ever invoked, so we test this at initialization
472472
// time to prevent it from happening at runtime later due to I/O.
@@ -734,11 +734,11 @@ func _StartedService_SubscribeConnections_Handler(srv interface{}, stream grpc.S
734734
if err := stream.RecvMsg(m); err != nil {
735735
return err
736736
}
737-
return srv.(StartedServiceServer).SubscribeConnections(m, &grpc.GenericServerStream[SubscribeConnectionsRequest, Connections]{ServerStream: stream})
737+
return srv.(StartedServiceServer).SubscribeConnections(m, &grpc.GenericServerStream[SubscribeConnectionsRequest, ConnectionEvents]{ServerStream: stream})
738738
}
739739

740740
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
741-
type StartedService_SubscribeConnectionsServer = grpc.ServerStreamingServer[Connections]
741+
type StartedService_SubscribeConnectionsServer = grpc.ServerStreamingServer[ConnectionEvents]
742742

743743
func _StartedService_CloseConnection_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
744744
in := new(CloseConnectionRequest)

experimental/clashapi/trafficontrol/manager.go

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,28 +10,59 @@ import (
1010
C "github.com/sagernet/sing-box/constant"
1111
"github.com/sagernet/sing/common"
1212
"github.com/sagernet/sing/common/json"
13+
"github.com/sagernet/sing/common/observable"
1314
"github.com/sagernet/sing/common/x/list"
1415

1516
"github.com/gofrs/uuid/v5"
1617
)
1718

19+
type ConnectionEventType int
20+
21+
const (
22+
ConnectionEventNew ConnectionEventType = iota
23+
ConnectionEventUpdate
24+
ConnectionEventClosed
25+
)
26+
27+
type ConnectionEvent struct {
28+
Type ConnectionEventType
29+
ID uuid.UUID
30+
Metadata TrackerMetadata
31+
UplinkDelta int64
32+
DownlinkDelta int64
33+
ClosedAt time.Time
34+
}
35+
1836
type Manager struct {
1937
uploadTotal atomic.Int64
2038
downloadTotal atomic.Int64
2139

2240
connections compatible.Map[uuid.UUID, Tracker]
2341
closedConnectionsAccess sync.Mutex
2442
closedConnections list.List[TrackerMetadata]
25-
// process *process.Process
26-
memory uint64
43+
memory uint64
44+
45+
eventSubscriber *observable.Subscriber[ConnectionEvent]
2746
}
2847

2948
func NewManager() *Manager {
3049
return &Manager{}
3150
}
3251

52+
func (m *Manager) SetEventHook(subscriber *observable.Subscriber[ConnectionEvent]) {
53+
m.eventSubscriber = subscriber
54+
}
55+
3356
func (m *Manager) Join(c Tracker) {
34-
m.connections.Store(c.Metadata().ID, c)
57+
metadata := c.Metadata()
58+
m.connections.Store(metadata.ID, c)
59+
if m.eventSubscriber != nil {
60+
m.eventSubscriber.Emit(ConnectionEvent{
61+
Type: ConnectionEventNew,
62+
ID: metadata.ID,
63+
Metadata: metadata,
64+
})
65+
}
3566
}
3667

3768
func (m *Manager) Leave(c Tracker) {
@@ -40,11 +71,19 @@ func (m *Manager) Leave(c Tracker) {
4071
if loaded {
4172
metadata.ClosedAt = time.Now()
4273
m.closedConnectionsAccess.Lock()
43-
defer m.closedConnectionsAccess.Unlock()
4474
if m.closedConnections.Len() >= 1000 {
4575
m.closedConnections.PopFront()
4676
}
4777
m.closedConnections.PushBack(metadata)
78+
m.closedConnectionsAccess.Unlock()
79+
if m.eventSubscriber != nil {
80+
m.eventSubscriber.Emit(ConnectionEvent{
81+
Type: ConnectionEventClosed,
82+
ID: metadata.ID,
83+
Metadata: metadata,
84+
ClosedAt: metadata.ClosedAt,
85+
})
86+
}
4887
}
4988
}
5089

experimental/libbox/command_client.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ type CommandClientHandler interface {
4949
WriteGroups(message OutboundGroupIterator)
5050
InitializeClashMode(modeList StringIterator, currentMode string)
5151
UpdateClashMode(newMode string)
52-
WriteConnections(message *Connections)
52+
WriteConnectionEvents(events *ConnectionEvents)
5353
}
5454

5555
type LogEntry struct {
@@ -491,15 +491,14 @@ func (c *CommandClient) handleConnectionsStream() {
491491
return
492492
}
493493

494-
var connections Connections
495494
for {
496-
conns, err := stream.Recv()
495+
events, err := stream.Recv()
497496
if err != nil {
498497
c.handler.Disconnected(err.Error())
499498
return
500499
}
501-
connections.input = ConnectionsFromGRPC(conns)
502-
c.handler.WriteConnections(&connections)
500+
libboxEvents := ConnectionEventsFromGRPC(events)
501+
c.handler.WriteConnectionEvents(libboxEvents)
503502
}
504503
}
505504

0 commit comments

Comments
 (0)