Skip to content

Commit 3898567

Browse files
authored
perf: improve getNetConnections function and Websocket handling (#11269)
* feat: Enhance WebSocket client functionality and improve data processing - Reduced message queue size in WebSocket client from 100 to 32. - Introduced atomic boolean to track client closure state. - Added SendPayload method to handle message sending with queue management. - Updated ProcessData function to utilize SendPayload for sending responses. - Expanded netTypes to include both IPv4 and IPv6 protocols in network connection retrieval. - Improved net connection processing by using a map for process names, enhancing efficiency. * feat: Enhance WebSocket client and process data handling - Added synchronization with sync.Once for safe closure of WebSocket client. - Updated message queue size to a constant for better maintainability. - Implemented context timeouts for process data retrieval to prevent blocking. - Improved network connection handling by utilizing a more efficient method for retrieving connections. - Introduced a new function to determine connection types based on protocol family. * feat: Enhance network connection retrieval and process name mapping - Updated getNetConnections function to improve efficiency by using maps for process names and connections. - Introduced a new helper function to retrieve process names from the filesystem or process context. - Enhanced filtering logic for network connections based on process ID, name, and port. - Increased initial capacity for connection results to optimize performance. * refactor: Rename SendPayload method to Send in WebSocket client - Updated the SendPayload method to be more succinctly named Send for clarity. - Ensured the method continues to handle message sending while maintaining existing functionality. * refactor: Update ProcessData and getNetConnections for improved clarity and efficiency - Replaced SendPayload method calls with Send for consistency in WebSocket message handling. - Enhanced getNetConnections function by refining process name retrieval and filtering logic. - Improved error handling in getProcessNameWithContext for better robustness. * refactor: Simplify WebSocket client closure and reading logic - Removed unnecessary synchronization for closing the WebSocket client. - Updated the Read method to handle message reading directly without a separate Close method. - Ensured the Socket is closed properly after reading messages to prevent resource leaks.
1 parent d1c2a69 commit 3898567

File tree

2 files changed

+115
-43
lines changed

2 files changed

+115
-43
lines changed

agent/utils/websocket/client.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,31 @@
11
package websocket
22

33
import (
4+
"sync/atomic"
5+
46
"github.com/gorilla/websocket"
57
)
68

9+
const MaxMessageQuenue = 32
10+
711
type Client struct {
812
ID string
913
Socket *websocket.Conn
1014
Msg chan []byte
15+
closed atomic.Bool
1116
}
1217

1318
func NewWsClient(ID string, socket *websocket.Conn) *Client {
1419
return &Client{
1520
ID: ID,
1621
Socket: socket,
17-
Msg: make(chan []byte, 100),
22+
Msg: make(chan []byte, MaxMessageQuenue),
1823
}
1924
}
2025

2126
func (c *Client) Read() {
2227
defer func() {
28+
c.closed.Store(true)
2329
close(c.Msg)
2430
}()
2531
for {
@@ -32,9 +38,7 @@ func (c *Client) Read() {
3238
}
3339

3440
func (c *Client) Write() {
35-
defer func() {
36-
c.Socket.Close()
37-
}()
41+
defer c.Socket.Close()
3842
for {
3943
message, ok := <-c.Msg
4044
if !ok {
@@ -43,3 +47,13 @@ func (c *Client) Write() {
4347
_ = c.Socket.WriteMessage(websocket.TextMessage, message)
4448
}
4549
}
50+
51+
func (c *Client) Send(res []byte) {
52+
if c.closed.Load() {
53+
return
54+
}
55+
select {
56+
case c.Msg <- res:
57+
default:
58+
}
59+
}

agent/utils/websocket/process_data.go

Lines changed: 97 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,20 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"os"
78
"strings"
89
"time"
910

10-
"github.com/1Panel-dev/1Panel/agent/utils/common"
1111
"github.com/1Panel-dev/1Panel/agent/global"
12+
"github.com/1Panel-dev/1Panel/agent/utils/common"
1213
"github.com/1Panel-dev/1Panel/agent/utils/files"
1314
"github.com/shirou/gopsutil/v4/host"
1415
"github.com/shirou/gopsutil/v4/net"
1516
"github.com/shirou/gopsutil/v4/process"
1617
)
1718

19+
const defaultTimeout = 10 * time.Second
20+
1821
type WsInput struct {
1922
Type string `json:"type"`
2023
DownloadProgress
@@ -113,25 +116,25 @@ func ProcessData(c *Client, inputMsg []byte) {
113116
if err != nil {
114117
return
115118
}
116-
c.Msg <- res
119+
c.Send(res)
117120
case "ps":
118121
res, err := getProcessData(wsInput.PsProcessConfig)
119122
if err != nil {
120123
return
121124
}
122-
c.Msg <- res
125+
c.Send(res)
123126
case "ssh":
124127
res, err := getSSHSessions(wsInput.SSHSessionConfig)
125128
if err != nil {
126129
return
127130
}
128-
c.Msg <- res
131+
c.Send(res)
129132
case "net":
130133
res, err := getNetConnections(wsInput.NetConfig)
131134
if err != nil {
132135
return
133136
}
134-
c.Msg <- res
137+
c.Send(res)
135138
}
136139

137140
}
@@ -204,7 +207,8 @@ func handleProcessData(proc *process.Process, processConfig *PsProcessConfig, pi
204207
}
205208

206209
func getProcessData(processConfig PsProcessConfig) (res []byte, err error) {
207-
ctx := context.Background()
210+
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
211+
defer cancel()
208212

209213
processes, err := process.ProcessesWithContext(ctx)
210214
if err != nil {
@@ -243,7 +247,10 @@ func getSSHSessions(config SSHSessionConfig) (res []byte, err error) {
243247
users []host.UserStat
244248
processes []*process.Process
245249
)
246-
users, err = host.Users()
250+
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
251+
defer cancel()
252+
253+
users, err = host.UsersWithContext(ctx)
247254
if err != nil {
248255
res, err = json.Marshal(result)
249256
return
@@ -268,8 +275,9 @@ func getSSHSessions(config SSHSessionConfig) (res []byte, err error) {
268275
return
269276
}
270277

271-
processes, err = process.Processes()
278+
processes, err = process.ProcessesWithContext(ctx)
272279
if err != nil {
280+
res, err = json.Marshal(result)
273281
return
274282
}
275283

@@ -312,42 +320,92 @@ func getSSHSessions(config SSHSessionConfig) (res []byte, err error) {
312320
return
313321
}
314322

315-
var netTypes = [...]string{"tcp", "udp"}
316-
317323
func getNetConnections(config NetConfig) (res []byte, err error) {
318-
var (
319-
result []ProcessConnect
320-
proc *process.Process
321-
)
322-
for _, netType := range netTypes {
323-
connections, _ := net.Connections(netType)
324-
if err == nil {
325-
for _, conn := range connections {
326-
if config.ProcessID > 0 && config.ProcessID != conn.Pid {
327-
continue
328-
}
329-
proc, err = process.NewProcess(conn.Pid)
330-
if err == nil {
331-
name, _ := proc.Name()
332-
if name != "" && config.ProcessName != "" && !strings.Contains(name, config.ProcessName) {
333-
continue
334-
}
335-
if config.Port > 0 && config.Port != conn.Laddr.Port && config.Port != conn.Raddr.Port {
336-
continue
337-
}
338-
result = append(result, ProcessConnect{
339-
Type: netType,
340-
Status: conn.Status,
341-
Laddr: conn.Laddr,
342-
Raddr: conn.Raddr,
343-
PID: conn.Pid,
344-
Name: name,
345-
})
346-
}
324+
result := make([]ProcessConnect, 0, 1024)
325+
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
326+
defer cancel()
327+
328+
connections, err := net.ConnectionsMaxWithContext(ctx, "all", 32768)
329+
if err != nil {
330+
res, _ = json.Marshal(result)
331+
return
332+
}
333+
334+
pidConnectionsMap := make(map[int32][]net.ConnectionStat, 256)
335+
pidNameMap := make(map[int32]string, 256)
336+
337+
for _, conn := range connections {
338+
if conn.Family != 2 && conn.Family != 10 {
339+
continue
340+
}
347341

342+
if conn.Pid == 0 {
343+
continue
344+
}
345+
346+
if config.ProcessID > 0 && conn.Pid != config.ProcessID {
347+
continue
348+
}
349+
350+
if config.Port > 0 && conn.Laddr.Port != config.Port && conn.Raddr.Port != config.Port {
351+
continue
352+
}
353+
354+
if _, exists := pidNameMap[conn.Pid]; !exists {
355+
pName, _ := getProcessNameWithContext(ctx, conn.Pid)
356+
if pName == "" {
357+
pName = "<UNKNOWN>"
348358
}
359+
pidNameMap[conn.Pid] = pName
349360
}
361+
362+
pidConnectionsMap[conn.Pid] = append(pidConnectionsMap[conn.Pid], conn)
350363
}
364+
365+
for pid, connections := range pidConnectionsMap {
366+
pName := pidNameMap[pid]
367+
if config.ProcessName != "" && !strings.Contains(pName, config.ProcessName) {
368+
continue
369+
}
370+
for _, conn := range connections {
371+
result = append(result, ProcessConnect{
372+
Type: getConnectionType(conn.Type, conn.Family),
373+
Status: conn.Status,
374+
Laddr: conn.Laddr,
375+
Raddr: conn.Raddr,
376+
PID: conn.Pid,
377+
Name: pName,
378+
})
379+
}
380+
}
381+
351382
res, err = json.Marshal(result)
352383
return
353384
}
385+
386+
func getProcessNameWithContext(ctx context.Context, pid int32) (string, error) {
387+
data, err := os.ReadFile(fmt.Sprintf("/proc/%d/comm", pid))
388+
if err == nil && len(data) > 0 {
389+
return strings.TrimSpace(string(data)), nil
390+
}
391+
p, err := process.NewProcessWithContext(ctx, pid)
392+
if err != nil {
393+
return "", err
394+
}
395+
return p.Name()
396+
}
397+
398+
func getConnectionType(connType uint32, family uint32) string {
399+
switch {
400+
case connType == 1 && family == 2:
401+
return "tcp"
402+
case connType == 1 && family == 10:
403+
return "tcp6"
404+
case connType == 2 && family == 2:
405+
return "udp"
406+
case connType == 2 && family == 10:
407+
return "udp6"
408+
default:
409+
return "unknown"
410+
}
411+
}

0 commit comments

Comments
 (0)