Skip to content

Commit ceab617

Browse files
committed
1. Add some new args to sysdig
2. implement processLoop
1 parent 306ba3a commit ceab617

File tree

8 files changed

+343
-50
lines changed

8 files changed

+343
-50
lines changed

main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,7 @@ func main() {
2020
},
2121
}
2222

23-
23+
app.Action = func(c *cli.Context) error {
24+
25+
}
2426
}

server/controller/container/container.go

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package container
22

33
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/YLonely/sysdig-monitor/log"
48
"github.com/YLonely/sysdig-monitor/server/controller"
59
"github.com/YLonely/sysdig-monitor/server/model"
610
"github.com/YLonely/sysdig-monitor/server/router"
@@ -13,17 +17,25 @@ type ContainerController struct {
1317
// event chan
1418
ec chan sysdig.Event
1519

16-
// containers
20+
// containers use container id as key
1721
containers map[string]*model.Container
1822
// container process channels
19-
containerCh map[string]chan sysdig.Event
23+
containerCh map[string]chan containerEvent
24+
25+
//containers mutex
26+
cm sync.RWMutex
2027
}
2128

22-
func NewController(ec chan sysdig.Event) controller.Controller {
29+
const eventBufferLen = 512
30+
31+
func NewController(ctx context.Context, ec chan sysdig.Event) (controller.Controller, error) {
2332
r := router.NewGroupRouter("/container")
24-
res := &ContainerController{containerRouter: r, ec: ec, containers: map[string]*model.Container{}}
33+
res := &ContainerController{containerRouter: r, ec: ec, containers: map[string]*model.Container{}, containerCh: map[string]chan containerEvent{}}
2534
res.initRouter()
26-
return res
35+
if err := res.start(ctx); err != nil {
36+
return res, err
37+
}
38+
return res, nil
2739
}
2840

2941
var _ controller.Controller = &ContainerController{}
@@ -35,3 +47,63 @@ func (cc *ContainerController) BindedRoutes() []router.Route {
3547
func (cc *ContainerController) initRouter() {
3648
// TODO
3749
}
50+
51+
func (cc *ContainerController) start(ctx context.Context) error {
52+
func() {
53+
var e sysdig.Event
54+
for {
55+
select {
56+
case e = <-cc.ec:
57+
case <-ctx.Done():
58+
return
59+
}
60+
if e.ContainerName == "host" {
61+
continue
62+
}
63+
ce := convert(e)
64+
containerID := ce.containerID
65+
if _, exists := cc.containers[containerID]; !exists {
66+
container := model.NewContainer(ce.containerID, ce.containerName)
67+
ch := make(chan containerEvent, eventBufferLen)
68+
cc.cm.Lock()
69+
cc.containers[containerID] = container
70+
cc.cm.Unlock()
71+
cc.containerCh[containerID] = ch
72+
go func() {
73+
log.L.WithField("container-id", containerID).Info("processLoop start")
74+
err := processLoop(ctx, container, ch)
75+
log.L.WithField("container-id", containerID).Info("processLoop exits")
76+
if err != nil {
77+
log.L.WithError(err).Error("")
78+
}
79+
cc.cm.Lock()
80+
cc.containers[container.ID] = nil
81+
cc.cm.Unlock()
82+
cc.containerCh[container.ID] = nil
83+
close(ch)
84+
}()
85+
}
86+
ch := cc.containerCh[containerID]
87+
ch <- ce
88+
}
89+
}()
90+
return nil
91+
}
92+
93+
func convert(e sysdig.Event) containerEvent {
94+
res := containerEvent{}
95+
res.containerID = e.ContainerID
96+
res.containerName = e.ContainerName
97+
res.bufferLen = e.EventBuflen
98+
res.eventDir = e.EventDir
99+
res.eventType = e.EventType
100+
res.fdName = e.FdName
101+
res.fdType = e.FdType
102+
res.isIORead = e.EventIsIORead
103+
res.isIOWrite = e.EventIsIOWrite
104+
res.latency = e.EventLatency
105+
res.rawRes = e.RawRes
106+
res.syscallType = e.SyscallType
107+
res.virtualtid = e.ThreadVirtualID
108+
return res
109+
}

server/controller/container/util.go

Lines changed: 190 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,201 @@ package container
22

33
import (
44
"context"
5+
"errors"
6+
"fmt"
7+
"strconv"
8+
"strings"
9+
"time"
510

611
"github.com/YLonely/sysdig-monitor/log"
712
"github.com/YLonely/sysdig-monitor/server/model"
8-
"github.com/YLonely/sysdig-monitor/sysdig"
913
)
1014

11-
func processLoop(ctx context.Context, c *model.Container, ch chan sysdig.Event) {
12-
log.L.WithField("container-id", c.ID).Info("process loop start.")
13-
for e := range ch {
15+
const latency100 = time.Millisecond * 100
16+
const latency10 = time.Millisecond * 10
17+
const latency1 = time.Millisecond * 1
1418

19+
type containerEvent struct {
20+
eventType string
21+
eventDir string
22+
containerID string
23+
containerName string
24+
fdType string
25+
fdName string
26+
isIOWrite, isIORead bool
27+
bufferLen int
28+
latency time.Duration
29+
rawRes int
30+
syscallType string
31+
virtualtid int
32+
}
33+
34+
func processLoop(ctx context.Context, c *model.Container, ch chan containerEvent) error {
35+
var e containerEvent
36+
var err error
37+
for {
38+
select {
39+
case e = <-ch:
40+
case <-ctx.Done():
41+
return nil
42+
}
43+
if e.containerID != c.ID {
44+
return errors.New("event id mismatch")
45+
}
46+
// container exits
47+
if e.eventType == "procexit" && e.virtualtid == 1 {
48+
return nil
49+
}
50+
if e.eventDir == "<" {
51+
if err = handleSysCall(c, e); err != nil {
52+
log.L.WithError(err).WithField("container-id", c.ID).Error("syscall handler error")
53+
}
54+
if e.rawRes >= 0 && (e.isIORead || e.isIOWrite) {
55+
if err = handleIO(c, e); err != nil {
56+
//if something wrong happens, just log it out
57+
log.L.WithField("container-id", c.ID).WithError(err).Error("io handle error")
58+
}
59+
} else {
60+
// may have some other handler?
61+
if err = handleNetwork(c, e); err != nil {
62+
log.L.WithField("container-id", c.ID).WithError(err).Error("network handler error")
63+
}
64+
}
65+
}
66+
}
67+
}
68+
69+
func handleSysCall(c *model.Container, e containerEvent) error {
70+
syscall := e.syscallType
71+
latency := e.latency
72+
if len(syscall) <= 0 {
73+
return nil
74+
}
75+
if _, exists := c.IndividualCalls[syscall]; !exists {
76+
c.IndividualCalls[syscall] = &model.SystemCall{Name: syscall}
77+
}
78+
call := c.IndividualCalls[syscall]
79+
call.Calls++
80+
call.TotalTime += latency
81+
c.SystemCalls.TotalCalls++
82+
return nil
83+
}
84+
85+
func handleIO(c *model.Container, e containerEvent) error {
86+
if e.fdType == "file" {
87+
return handleFileIO(c, e)
88+
} else if e.fdType == "ipv4" || e.fdType == "ipv6" {
89+
return handleNetIO(c, e)
90+
}
91+
return nil
92+
}
93+
94+
func handleNetIO(c *model.Container, e containerEvent) error {
95+
bufLen := e.bufferLen
96+
meta, err := connectionMeta(e.fdName)
97+
if err != nil {
98+
return err
99+
}
100+
// if event shows that a net io begins before "connect" or "accpet",
101+
// we just ignore the error sequence and add a new connection
102+
if _, exists := c.ActiveConnections[meta]; !exists {
103+
c.ActiveConnections[meta] = &model.Connection{Type: e.fdType}
104+
}
105+
conn := c.ActiveConnections[meta]
106+
if e.isIORead {
107+
conn.ReadIn += int64(bufLen)
108+
c.Network.TotalReadIn += int64(bufLen)
109+
} else if e.isIOWrite {
110+
conn.WriteOut += int64(bufLen)
111+
c.Network.TotalWriteOut += int64(bufLen)
112+
}
113+
return nil
114+
}
115+
116+
func handleFileIO(c *model.Container, e containerEvent) error {
117+
fileName := e.fdName
118+
bufLen := e.bufferLen
119+
if _, exists := c.AccessedFiles[fileName]; !exists {
120+
c.AccessedFiles[fileName] = &model.File{Name: fileName}
121+
}
122+
file := c.AccessedFiles[fileName]
123+
if e.isIOWrite {
124+
file.WriteOut += int64(bufLen)
125+
c.FileSystem.TotalWriteOut += int64(bufLen)
126+
} else if e.isIORead {
127+
file.ReadIn += int64(bufLen)
128+
c.FileSystem.TotalReadIn += int64(bufLen)
129+
}
130+
latency := e.latency
131+
if !strings.HasPrefix(e.fdName, "/dev/") {
132+
iocall := &model.IOCall{FileName: fileName, Latency: latency}
133+
if latency > latency100 {
134+
c.IOCalls100 = append(c.IOCalls100, iocall)
135+
}
136+
if latency > latency10 {
137+
c.IOCalls10 = append(c.IOCalls10, iocall)
138+
}
139+
if latency > latency1 {
140+
c.IOCalls1 = append(c.IOCalls1, iocall)
141+
}
142+
}
143+
return nil
144+
}
145+
146+
func handleNetwork(c *model.Container, e containerEvent) error {
147+
var (
148+
meta model.ConnectionMeta
149+
err error
150+
)
151+
if e.eventType == "connect" || e.eventType == "accept" {
152+
if meta, err = connectionMeta(e.fdName); err != nil {
153+
return err
154+
}
155+
if _, exists := c.ActiveConnections[meta]; !exists {
156+
c.ActiveConnections[meta] = &model.Connection{Type: e.fdType}
157+
}
158+
} else if e.eventType == "close" && !strings.HasPrefix(e.fdName, "/") {
159+
if meta, err = connectionMeta(e.fdName); err != nil {
160+
return err
161+
}
162+
// should return nonexists error?
163+
if _, exists := c.ActiveConnections[meta]; exists {
164+
c.ActiveConnections[meta] = nil
165+
}
166+
}
167+
return nil
168+
}
169+
170+
func connectionMeta(fdname string) (model.ConnectionMeta, error) {
171+
parts := strings.Split(fdname, "->")
172+
meta := model.ConnectionMeta{}
173+
if len(parts) != 2 {
174+
return meta, fmt.Errorf("wrong connection meta format:%v", fdname)
175+
}
176+
source, dest := parts[0], parts[1]
177+
178+
var err error
179+
if meta.SourceIP, meta.SourcePort, err = splitAddress(source); err != nil {
180+
return meta, err
181+
}
182+
if meta.DestIP, meta.DestPort, err = splitAddress(dest); err != nil {
183+
return meta, err
184+
}
185+
return meta, nil
186+
}
187+
188+
func splitAddress(address string) (string, int, error) {
189+
var (
190+
portStart, port int
191+
err error
192+
)
193+
if len(address) <= 0 {
194+
return "", -1, errors.New("empty address")
195+
}
196+
for portStart := len(address) - 1; portStart >= 0 && address[portStart] != ':'; portStart-- {
197+
}
198+
if port, err = strconv.Atoi(address[portStart+1:]); err != nil {
199+
return "", -1, fmt.Errorf("wrong address format:%v", address)
15200
}
16-
log.L.WithField("container-id", c.ID).Error("process loop unexpected exited.")
201+
return address[:portStart], port, nil
17202
}

server/controller/controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@ import (
55
)
66

77
type Controller interface {
8+
// BindedRoutes return all the routes binded to the controller
89
BindedRoutes() []router.Route
910
}

server/model/container.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ package model
33
import "time"
44

55
type Container struct {
6-
ID string
6+
ID string
7+
Name string
78

89
SystemCalls
910

@@ -12,27 +13,34 @@ type Container struct {
1213
Network
1314
}
1415

16+
func NewContainer(id, name string) *Container {
17+
system := SystemCalls{IndividualCalls: map[string]*SystemCall{}}
18+
fileSys := FileSystem{AccessedFiles: map[string]*File{}}
19+
net := Network{ActiveConnections: map[ConnectionMeta]*Connection{}}
20+
return &Container{ID: id, Name: name, SystemCalls: system, FileSystem: fileSys, Network: net}
21+
}
22+
1523
type SystemCalls struct {
1624
// map system call name to SystemCall
17-
IndividualCalls map[string]SystemCall
25+
IndividualCalls map[string]*SystemCall
1826
TotalCalls int64
1927
}
2028

2129
type FileSystem struct {
2230
// map file name to file
23-
AccessedFiles map[string]File
31+
AccessedFiles map[string]*File
2432
// io calls whose latency is bigger than 1ms
25-
IOCalls1 []IOCall
33+
IOCalls1 []*IOCall
2634
// io calls whose latency is bigger than 10ms
27-
IOCalls10 []IOCall
35+
IOCalls10 []*IOCall
2836
// io calls whose latency is bigger than 100ms
29-
IOCalls100 []IOCall
37+
IOCalls100 []*IOCall
3038
TotalReadIn int64
3139
TotalWriteOut int64
3240
}
3341

3442
type Network struct {
35-
ActiveConnections map[ConnectionMeta]Connection
43+
ActiveConnections map[ConnectionMeta]*Connection
3644
TotalReadIn, TotalWriteOut int64
3745
}
3846

@@ -50,8 +58,8 @@ type File struct {
5058
}
5159

5260
type Connection struct {
53-
Type string
54-
ConnectionMeta
61+
// ipv4 or ipv6
62+
Type string
5563
WriteOut int64
5664
ReadIn int64
5765
}

0 commit comments

Comments
 (0)