Skip to content

Commit cc0ac8a

Browse files
committed
1. Add container router
2. Implement sysdig server
1 parent 4fa2263 commit cc0ac8a

File tree

6 files changed

+213
-2
lines changed

6 files changed

+213
-2
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
module sysdig-monitor
1+
module github.com/YLonely/sysdig-monitor
22

33
go 1.12
44

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package container
2+
3+
import (
4+
"github.com/YLonely/sysdig-monitor/server/router"
5+
)
6+
7+
type containerRouter struct {
8+
routes []router.Route
9+
// sysdig client
10+
11+
// docker client
12+
}
13+
14+
func NewRouter() router.Router {
15+
r := &containerRouter{}
16+
17+
return r
18+
}
19+
20+
func (r *containerRouter) Routes() []router.Route {
21+
return r.routes
22+
}
23+
24+
func (r *containerRouter) initRoutes() {
25+
r.routes = []router.Route{}
26+
}

server/router/router.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ type HandlerType func(c *gin.Context)
88

99
// Router defines an interface to specify a group of routes ot add to the server
1010
type Router interface {
11-
Routes()
11+
Routes() []Route
1212
}
1313

1414
// Route defines an individual API route in the sysdig-monitor server

sysdig/event.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package sysdig
2+
3+
// Event represents a container event in sysdig
4+
type Event struct {
5+
ID string `json:"container.id"`
6+
Name string `json:"container.name"`
7+
CPUId int `json:"evt.cpu"`
8+
Dir string `json:"evt.dir"`
9+
Info string `json:"evt.info"`
10+
Num int `json:"evt.num"`
11+
TimeStamp int64 `json:"evt.outputtime"`
12+
Type string `json:"evt.type"`
13+
ProcName string `json:"proc.name"`
14+
ThreadID int `json:"thread.tid"`
15+
ThreadVirtualID int `json:"thread.vid"`
16+
}
17+
18+
type subscriber struct {
19+
c chan Event
20+
closed bool
21+
}
22+
23+
func (s *subscriber) close() {
24+
if s.closed {
25+
return
26+
}
27+
close(s.c)
28+
s.closed = true
29+
return
30+
}

sysdig/monitor.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package sysdig
2+
3+
import (
4+
"os/exec"
5+
"syscall"
6+
"time"
7+
)
8+
9+
var Monitor ProcessMonitor = &defaultMonitor{}
10+
11+
// Exit info
12+
type Exit struct {
13+
TimeStamp time.Time
14+
Pid int
15+
Status int
16+
}
17+
18+
// ProcessMonitor is an interface for monitoring process
19+
type ProcessMonitor interface {
20+
Start(*exec.Cmd) (chan Exit, error)
21+
Wait(*exec.Cmd, chan Exit) (int, error)
22+
}
23+
24+
type defaultMonitor struct {
25+
}
26+
27+
func (m *defaultMonitor) Start(c *exec.Cmd) (chan Exit, error) {
28+
if err := c.Start(); err != nil {
29+
return nil, err
30+
}
31+
ec := make(chan Exit, 1)
32+
go func() {
33+
var status int
34+
if err := c.Wait(); err != nil {
35+
status = 255
36+
if exitErr, ok := err.(*exec.ExitError); ok {
37+
if ws, ok := exitErr.Sys().(syscall.WaitStatus); ok {
38+
status = ws.ExitStatus()
39+
}
40+
}
41+
}
42+
ec <- Exit{
43+
TimeStamp: time.Now(),
44+
Pid: c.Process.Pid,
45+
Status: status,
46+
}
47+
close(ec)
48+
}()
49+
return ec, nil
50+
}
51+
52+
func (m *defaultMonitor) Wait(c *exec.Cmd, ec chan Exit) (int, error) {
53+
e := <-ec
54+
return e.Status, nil
55+
}

sysdig/sysdig.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package sysdig
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"io"
8+
"os/exec"
9+
)
10+
11+
const binaryName = "sysdig"
12+
const bufferSize = 1024
13+
14+
// Server starts sysdig and dispatch events
15+
type Server interface {
16+
Subscribe() chan Event
17+
Start(ctx context.Context) error
18+
}
19+
20+
var _ Server = &localServer{}
21+
22+
type localServer struct {
23+
subscribers []*subscriber
24+
}
25+
26+
// NewServer creates a server
27+
func NewServer() Server {
28+
return &localServer{}
29+
}
30+
31+
func (ls *localServer) Subscribe() chan Event {
32+
c := make(chan Event, bufferSize)
33+
ls.subscribers = append(ls.subscribers, &subscriber{c: c})
34+
return c
35+
}
36+
37+
func (ls *localServer) Start(ctx context.Context) error {
38+
if err := ls.preRrequestCheck(); err != nil {
39+
return err
40+
}
41+
args := []string{"-pc", "-j", "container.id!=host"}
42+
cmd := exec.CommandContext(ctx, binaryName, args...)
43+
rd, err := cmd.StdoutPipe()
44+
if err != nil {
45+
return err
46+
}
47+
ec, err := Monitor.Start(cmd)
48+
if err != nil {
49+
rd.Close()
50+
return err
51+
}
52+
53+
var (
54+
dec = json.NewDecoder(rd)
55+
)
56+
57+
go func() {
58+
defer func() {
59+
rd.Close()
60+
Monitor.Wait(cmd, ec)
61+
}()
62+
for {
63+
var e Event
64+
if err := dec.Decode(&e); err != nil {
65+
if err == io.EOF {
66+
return
67+
}
68+
e = Event{Type: "error"}
69+
}
70+
for _, subscriber := range ls.subscribers {
71+
if !subscriber.closed {
72+
subscriber.c <- e
73+
}
74+
}
75+
}
76+
}()
77+
78+
return nil
79+
}
80+
81+
func (ls *localServer) preRrequestCheck() error {
82+
//try run sysdig
83+
ctx, cancel := context.WithCancel(context.Background())
84+
cmd := exec.CommandContext(ctx, binaryName)
85+
var (
86+
ec chan Exit
87+
err error
88+
)
89+
90+
if ec, err = Monitor.Start(cmd); err != nil {
91+
cancel()
92+
return errors.New("can not start sysdig")
93+
}
94+
95+
cancel()
96+
97+
Monitor.Wait(cmd, ec)
98+
99+
return nil
100+
}

0 commit comments

Comments
 (0)