Skip to content

Commit a48188a

Browse files
committed
Implement m3 lp (logical process)
1 parent 09a2f4b commit a48188a

File tree

4 files changed

+252
-43
lines changed

4 files changed

+252
-43
lines changed

internal/agent/m3/m3.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,11 @@ func (m3 *M3App) captureAndTransmit(pids map[int]string, endpoint string) (err e
220220
top := capture.GoCapture(endpoint, capture.WrapRun(capTop))
221221
logger.Log("Collection of top data started.")
222222

223+
logger.Log("Starting collection of lp data...")
224+
capLPM3 := capture.NewLPM3(pids)
225+
lpM3Chan := capture.GoCapture(endpoint, capture.WrapRun(capLPM3))
226+
logger.Log("Collection of lp data started.")
227+
223228
if len(pids) > 0 {
224229
// @Andy: Existing code does this synchronously. Why not async like on-demand?
225230
for pid, appName := range pids {
@@ -241,17 +246,23 @@ func (m3 *M3App) captureAndTransmit(pids map[int]string, endpoint string) (err e
241246
}
242247
}
243248

244-
// Wait for the result of async captures
245-
if top != nil {
246-
result := <-top
247-
logger.Log(
248-
`TOP DATA
249+
topResult := <-top
250+
logger.Log(
251+
`TOP DATA
249252
Is transmission completed: %t
250253
Resp: %s
251254
252255
--------------------------------
253-
`, result.Ok, result.Msg)
254-
}
256+
`, topResult.Ok, topResult.Msg)
257+
258+
lpM3Result := <-lpM3Chan
259+
logger.Log(
260+
`LP DATA
261+
Ok: %t
262+
Resp: %s
263+
264+
--------------------------------
265+
`, lpM3Result.Ok, lpM3Result.Msg)
255266

256267
return
257268
}

internal/capture/lpm3.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package capture
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"os"
7+
"runtime"
8+
9+
"yc-agent/internal/config"
10+
"yc-agent/internal/logger"
11+
12+
psv3 "github.com/shirou/gopsutil/v3/process"
13+
)
14+
15+
const lpM3OutputPath = "lp.out"
16+
17+
// LPM3 (Logical Process) handles the capture of process status data.
18+
type LPM3 struct {
19+
Capture
20+
Pids map[int]string
21+
}
22+
23+
type LogicalProcess struct {
24+
ProcessName string
25+
CommandLine string
26+
ProcessId int
27+
}
28+
29+
// NewLPM3 creates a new LPM3 capture instance.
30+
func NewLPM3(pids map[int]string) *LPM3 {
31+
return &LPM3{Pids: pids}
32+
}
33+
34+
// Run executes the process status capture and uploads the captured file
35+
// to the specified endpoint.
36+
func (p *LPM3) Run() (Result, error) {
37+
if len(p.Pids) == 0 {
38+
logger.Warn().Msg("LPM3.Run called with nil or empty Pids map, returning empty result")
39+
return Result{Msg: "no processes to capture", Ok: true}, nil
40+
}
41+
42+
capturedFile, err := p.CaptureToFile()
43+
if err != nil {
44+
return Result{Msg: err.Error(), Ok: false}, err
45+
}
46+
defer capturedFile.Close()
47+
48+
result := p.UploadCapturedFile(capturedFile)
49+
return result, nil
50+
}
51+
52+
// CaptureToFile captures process status output to a file.
53+
// It returns the file handle for the captured data.
54+
func (p *LPM3) CaptureToFile() (*os.File, error) {
55+
file, err := os.Create(lpM3OutputPath)
56+
if err != nil {
57+
return nil, fmt.Errorf("LPM3: failed to create output file %s: %w", lpM3OutputPath, err)
58+
}
59+
60+
if err := p.captureOutput(file); err != nil {
61+
file.Close()
62+
return nil, err
63+
}
64+
65+
// Ensures all file data is written to disk.
66+
if err := file.Sync(); err != nil {
67+
logger.Warn().Err(err).Msg("failed to sync file")
68+
}
69+
70+
return file, nil
71+
}
72+
73+
// captureOutput handles the actual process status capture process.
74+
func (p *LPM3) captureOutput(f *os.File) error {
75+
var logicalProcesses []LogicalProcess
76+
77+
if runtime.GOOS == "windows" {
78+
processes, err := GetCIMProcesses(config.GlobalConfig.ProcessTokens, config.GlobalConfig.ExcludeProcessTokens)
79+
if err != nil {
80+
return fmt.Errorf("LPM3: failed to get CIM processes: %w", err)
81+
}
82+
83+
for _, process := range processes {
84+
logicalProcesses = append(logicalProcesses, LogicalProcess(process))
85+
}
86+
} else {
87+
for pid := range p.Pids {
88+
process, err := psv3.NewProcess(int32(pid))
89+
if err != nil {
90+
logger.Warn().Err(err).Int("pid", pid).Msg("LPM3: failed to create process object")
91+
continue
92+
}
93+
94+
psName, err := process.Name()
95+
if err != nil {
96+
logger.Warn().Err(err).Int("pid", pid).Msg("LPM3: failed to get process name")
97+
psName = ""
98+
}
99+
100+
cmdLine, err := process.Cmdline()
101+
if err != nil {
102+
logger.Warn().Err(err).Int("pid", pid).Msg("LPM3: failed to get command line")
103+
cmdLine = ""
104+
}
105+
106+
logicalProcesses = append(logicalProcesses, LogicalProcess{
107+
ProcessName: psName,
108+
ProcessId: pid,
109+
CommandLine: cmdLine,
110+
})
111+
}
112+
}
113+
114+
encoder := json.NewEncoder(f)
115+
if err := encoder.Encode(logicalProcesses); err != nil {
116+
return fmt.Errorf("failed to encode logical processes to JSON: %w", err)
117+
}
118+
119+
return nil
120+
}
121+
122+
// UploadCapturedFile uploads the captured file to the configured endpoint.
123+
func (p *LPM3) UploadCapturedFile(file *os.File) Result {
124+
msg, ok := PostData(p.Endpoint(), "lp", file)
125+
return Result{
126+
Msg: msg,
127+
Ok: ok,
128+
}
129+
}

internal/capture/proc_others.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,20 @@ package capture
66
import (
77
"bufio"
88
"bytes"
9+
"fmt"
910
"os"
1011
"strconv"
1112
"strings"
1213
"yc-agent/internal/capture/executils"
1314
"yc-agent/internal/config"
1415
)
1516

17+
type CIMProcess struct {
18+
ProcessName string
19+
CommandLine string
20+
ProcessId int
21+
}
22+
1623
func GetTopCpu() (pid int, err error) {
1724
output, err := executils.CommandCombinedOutput(executils.ProcessTopCPU)
1825
if err != nil {
@@ -52,6 +59,11 @@ Next:
5259
return
5360
}
5461

62+
// GetCIMProcesses is not supported on non-Windows platforms
63+
func GetCIMProcesses(tokens config.ProcessTokens, excludes config.ProcessTokens) ([]CIMProcess, error) {
64+
return nil, fmt.Errorf("GetCIMProcesses is only supported on Windows platforms")
65+
}
66+
5567
func GetTopMem() (pid int, err error) {
5668
output, err := executils.CommandCombinedOutput(executils.ProcessTopMEM)
5769
if err != nil {

internal/capture/proc_windows.go

Lines changed: 93 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -108,38 +108,8 @@ type CIMProcess struct {
108108

109109
type CIMProcessList []CIMProcess
110110

111-
func GetProcessIds(tokens config.ProcessTokens, excludes config.ProcessTokens) (pids map[int]string, err error) {
112-
output, err := executils.CommandCombinedOutput(executils.PSGetProcessIds)
113-
if err != nil {
114-
return
115-
}
116-
117-
cimProcessList := CIMProcessList{}
118-
err = json.Unmarshal(output, &cimProcessList)
119-
if err != nil {
120-
return
121-
}
122-
123-
pids = map[int]string{}
124-
125-
logger.Debug().Msgf("m3_windows GetProcessIds tokens: %v", tokens)
126-
logger.Debug().Msgf("m3_windows GetProcessIds excludes: %v", excludes)
127-
logger.Debug().Msgf("m3_windows GetProcessIds cimProcessList: %v", cimProcessList)
128-
129-
// 1. Preprocess excludes - identify excluded processes
130-
excludedProcesses := make(map[int]bool)
131-
// exclude self Pid in case some of the cmdline args matches
132-
excludedProcesses[os.Getpid()] = true
133-
for _, cimProcess := range cimProcessList {
134-
for _, exclude := range excludes {
135-
if strings.Contains(cimProcess.CommandLine, string(exclude)) {
136-
excludedProcesses[cimProcess.ProcessId] = true
137-
break
138-
}
139-
}
140-
}
141-
142-
// 2. Preprocess tokens - parse once, before entering loop for performance consideration
111+
// parseTokens parses process tokens and extracts app names
112+
func parseTokens(tokens config.ProcessTokens) []ParsedToken {
143113
parsedTokens := make([]ParsedToken, 0, len(tokens))
144114
for _, t := range tokens {
145115
tokenStr := string(t)
@@ -164,8 +134,51 @@ func GetProcessIds(tokens config.ProcessTokens, excludes config.ProcessTokens) (
164134
appName: appName,
165135
})
166136
}
137+
return parsedTokens
138+
}
167139

168-
// 3. Process matching
140+
// ProcessWithAppName represents a CIM process with its associated app name
141+
type ProcessWithAppName struct {
142+
CIMProcess
143+
AppName string
144+
}
145+
146+
// getFilteredCIMProcesses contains the common logic for filtering processes based on tokens and excludes
147+
// Returns both the filtered processes and their associated app names
148+
func getFilteredCIMProcesses(tokens config.ProcessTokens, excludes config.ProcessTokens) ([]ProcessWithAppName, error) {
149+
output, err := executils.CommandCombinedOutput(executils.PSGetProcessIds)
150+
if err != nil {
151+
return nil, err
152+
}
153+
154+
cimProcessList := CIMProcessList{}
155+
err = json.Unmarshal(output, &cimProcessList)
156+
if err != nil {
157+
return nil, err
158+
}
159+
160+
logger.Debug().Msgf("m3_windows getFilteredCIMProcesses tokens: %v", tokens)
161+
logger.Debug().Msgf("m3_windows getFilteredCIMProcesses excludes: %v", excludes)
162+
logger.Debug().Msgf("m3_windows getFilteredCIMProcesses cimProcessList: %v", cimProcessList)
163+
164+
// 1. Preprocess excludes - identify excluded processes
165+
excludedProcesses := make(map[int]bool)
166+
// exclude self Pid in case some of the cmdline args matches
167+
excludedProcesses[os.Getpid()] = true
168+
for _, cimProcess := range cimProcessList {
169+
for _, exclude := range excludes {
170+
if strings.Contains(cimProcess.CommandLine, string(exclude)) {
171+
excludedProcesses[cimProcess.ProcessId] = true
172+
break
173+
}
174+
}
175+
}
176+
177+
// 2. Parse tokens once for performance
178+
parsedTokens := parseTokens(tokens)
179+
180+
// 3. Process matching - collect matched processes with their app names
181+
matchedProcesses := make(map[int]string) // ProcessId -> AppName
169182
for _, token := range parsedTokens {
170183
for _, cimProcess := range cimProcessList {
171184
// Skip excluded processes
@@ -184,13 +197,57 @@ func GetProcessIds(tokens config.ProcessTokens, excludes config.ProcessTokens) (
184197
}
185198

186199
if matched {
187-
if _, exists := pids[cimProcess.ProcessId]; !exists {
188-
pids[cimProcess.ProcessId] = token.appName
200+
if _, exists := matchedProcesses[cimProcess.ProcessId]; !exists {
201+
matchedProcesses[cimProcess.ProcessId] = token.appName
189202
}
190203
}
191204
}
192205
}
193206

207+
// 4. Build result list with app names
208+
var result []ProcessWithAppName
209+
for _, cimProcess := range cimProcessList {
210+
if appName, matched := matchedProcesses[cimProcess.ProcessId]; matched {
211+
result = append(result, ProcessWithAppName{
212+
CIMProcess: cimProcess,
213+
AppName: appName,
214+
})
215+
}
216+
}
217+
218+
logger.Debug().Msgf("m3_windows getFilteredCIMProcesses result: %v", result)
219+
return result, nil
220+
}
221+
222+
// GetCIMProcesses returns filtered CIM processes based on tokens and excludes
223+
func GetCIMProcesses(tokens config.ProcessTokens, excludes config.ProcessTokens) ([]CIMProcess, error) {
224+
processesWithAppNames, err := getFilteredCIMProcesses(tokens, excludes)
225+
if err != nil {
226+
return nil, err
227+
}
228+
229+
// Extract just the CIMProcess part
230+
result := make([]CIMProcess, len(processesWithAppNames))
231+
for i, processWithAppName := range processesWithAppNames {
232+
result[i] = processWithAppName.CIMProcess
233+
}
234+
235+
return result, nil
236+
}
237+
238+
func GetProcessIds(tokens config.ProcessTokens, excludes config.ProcessTokens) (pids map[int]string, err error) {
239+
processesWithAppNames, err := getFilteredCIMProcesses(tokens, excludes)
240+
if err != nil {
241+
return nil, err
242+
}
243+
244+
pids = make(map[int]string)
245+
246+
// Directly use the app names from the filtered results - no need to re-match!
247+
for _, processWithAppName := range processesWithAppNames {
248+
pids[processWithAppName.ProcessId] = processWithAppName.AppName
249+
}
250+
194251
logger.Debug().Msgf("m3_windows GetProcessIds pids: %v", pids)
195-
return
252+
return pids, nil
196253
}

0 commit comments

Comments
 (0)