Skip to content

Commit e6000f2

Browse files
authored
Adding telemetry buffer. (#270)
* Adding telemetry buffer. * Addressing Ashvin's comments. * Adding changes to cleanup linux socket after programme fails to close.
1 parent 3f1216e commit e6000f2

File tree

5 files changed

+334
-3
lines changed

5 files changed

+334
-3
lines changed

cns/networkcontainers/networkcontainers.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ type NetworkContainers struct {
1919

2020
func interfaceExists(iFaceName string) (bool, error) {
2121
_, err := net.InterfaceByName(iFaceName)
22-
2322
if err != nil {
2423
errMsg := fmt.Sprintf("[Azure CNS] Unable to get interface by name %v, %v", iFaceName, err)
2524
log.Printf(errMsg)

telemetry/telemetry.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2017 Microsoft. All rights reserved.
1+
// Copyright 2018 Microsoft. All rights reserved.
22
// MIT License
33

44
package telemetry
@@ -142,6 +142,21 @@ type NPMReport struct {
142142
Metadata Metadata `json:"compute"`
143143
}
144144

145+
// DNCReport structure.
146+
type DNCReport struct {
147+
IsNewInstance bool
148+
CPUUsage string
149+
MemoryUsage string
150+
Processes string
151+
EventMessage string
152+
PartitionKey string
153+
Allocations string
154+
Timestamp string
155+
UUID string
156+
Errorcode string
157+
Metadata Metadata `json:"compute"`
158+
}
159+
145160
// ReportManager structure.
146161
type ReportManager struct {
147162
HostNetAgentURL string
@@ -210,8 +225,10 @@ func (reportMgr *ReportManager) SendReport() error {
210225
log.Printf("[Telemetry] %+v", reportMgr.Report.(*CNIReport))
211226
case *NPMReport:
212227
log.Printf("[Telemetry] %+v", reportMgr.Report.(*NPMReport))
228+
case *DNCReport:
229+
log.Printf("[Telemetry] %+v", reportMgr.Report.(*DNCReport))
213230
default:
214-
log.Printf("[Telemetry] %+v", reportMgr.Report)
231+
log.Printf("[Telemetry] Invalid report type")
215232
}
216233

217234
httpc := &http.Client{}
@@ -452,3 +469,20 @@ func (reportMgr *ReportManager) GetHostMetadata() error {
452469

453470
return err
454471
}
472+
473+
// ReportToBytes - returns the report bytes
474+
func (reportMgr *ReportManager) ReportToBytes() (report []byte, err error) {
475+
switch reportMgr.Report.(type) {
476+
case *CNIReport:
477+
case *NPMReport:
478+
case *DNCReport:
479+
default:
480+
err = fmt.Errorf("[Telemetry] Invalid report type")
481+
}
482+
483+
if err == nil {
484+
report, err = json.Marshal(reportMgr.Report)
485+
}
486+
487+
return
488+
}

telemetry/telemetrybuffer.go

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
// Copyright 2018 Microsoft. All rights reserved.
2+
// MIT License
3+
4+
package telemetry
5+
6+
import (
7+
"bufio"
8+
"bytes"
9+
"encoding/json"
10+
"fmt"
11+
"net"
12+
"net/http"
13+
"strings"
14+
"time"
15+
)
16+
17+
// FdName - file descriptor name
18+
// Delimiter - delimiter for socket reads/writes
19+
// HostNetAgentURL - host net agent url of type payload
20+
// DefaultDncReportsSize - default DNC report slice size
21+
// DefaultCniReportsSize - default CNI report slice size
22+
// DefaultNpmReportsSize - default NPM report slice size
23+
// DefaultInterval - default interval for sending payload to host
24+
const (
25+
FdName = "azure-telemetry"
26+
Delimiter = '\n'
27+
HostNetAgentURL = "http://169.254.169.254/machine/plugins?comp=netagent&type=payload"
28+
DefaultInterval = 1 * time.Minute
29+
)
30+
31+
// TelemetryBuffer object
32+
type TelemetryBuffer struct {
33+
client net.Conn
34+
listener net.Listener
35+
connections []net.Conn
36+
payload Payload
37+
fdExists bool
38+
connected bool
39+
data chan interface{}
40+
cancel chan bool
41+
}
42+
43+
// Payload object holds the different types of reports
44+
type Payload struct {
45+
DNCReports []DNCReport
46+
CNIReports []CNIReport
47+
NPMReports []NPMReport
48+
}
49+
50+
// NewTelemetryBuffer - create a new TelemetryBuffer
51+
func NewTelemetryBuffer() (*TelemetryBuffer, error) {
52+
var tb TelemetryBuffer
53+
tb.data = make(chan interface{})
54+
tb.cancel = make(chan bool, 1)
55+
tb.connections = make([]net.Conn, 1)
56+
err := tb.Listen(FdName)
57+
if err != nil {
58+
tb.fdExists = strings.Contains(err.Error(), "in use") || strings.Contains(err.Error(), "Access is denied")
59+
} else {
60+
// Spawn server goroutine to handle incoming connections
61+
go func() {
62+
for {
63+
// Spawn worker goroutines to communicate with client
64+
conn, err := tb.listener.Accept()
65+
if err == nil {
66+
tb.connections = append(tb.connections, conn)
67+
go func() {
68+
for {
69+
reportStr, err := read(conn)
70+
if err == nil {
71+
var tmp map[string]interface{}
72+
json.Unmarshal(reportStr, &tmp)
73+
if _, ok := tmp["NpmVersion"]; ok {
74+
var npmReport NPMReport
75+
json.Unmarshal([]byte(reportStr), &npmReport)
76+
tb.data <- npmReport
77+
} else if _, ok := tmp["CniSucceeded"]; ok {
78+
var cniReport CNIReport
79+
json.Unmarshal([]byte(reportStr), &cniReport)
80+
tb.data <- cniReport
81+
} else if _, ok := tmp["Allocations"]; ok {
82+
var dncReport DNCReport
83+
json.Unmarshal([]byte(reportStr), &dncReport)
84+
tb.data <- dncReport
85+
}
86+
}
87+
}
88+
}()
89+
}
90+
}
91+
}()
92+
}
93+
94+
err = tb.Dial(FdName)
95+
if err == nil {
96+
tb.connected = true
97+
tb.payload.DNCReports = make([]DNCReport, 0)
98+
tb.payload.CNIReports = make([]CNIReport, 0)
99+
tb.payload.NPMReports = make([]NPMReport, 0)
100+
} else if tb.fdExists {
101+
tb.cleanup(FdName)
102+
}
103+
104+
return &tb, err
105+
}
106+
107+
// Start - start running an instance if it isn't already being run elsewhere
108+
func (tb *TelemetryBuffer) Start(intervalms time.Duration) {
109+
defer tb.close()
110+
if !tb.fdExists && tb.connected {
111+
if intervalms < DefaultInterval {
112+
intervalms = DefaultInterval
113+
}
114+
115+
interval := time.NewTicker(intervalms).C
116+
for {
117+
select {
118+
case <-interval:
119+
// Send payload to host and clear cache when sent successfully
120+
// To-do : if we hit max slice size in payload, write to disk and process the logs on disk on future sends
121+
if err := tb.sendToHost(); err == nil {
122+
tb.payload.reset()
123+
}
124+
case report := <-tb.data:
125+
tb.payload.push(report)
126+
case <-tb.cancel:
127+
goto EXIT
128+
}
129+
}
130+
} else {
131+
<-tb.cancel
132+
}
133+
134+
EXIT:
135+
}
136+
137+
// read - read from the file descriptor
138+
func read(conn net.Conn) (b []byte, err error) {
139+
b, err = bufio.NewReader(conn).ReadBytes(Delimiter)
140+
if err == nil {
141+
b = b[:len(b)-1]
142+
}
143+
144+
return
145+
}
146+
147+
// Write - write to the file descriptor
148+
func (tb *TelemetryBuffer) Write(b []byte) (c int, err error) {
149+
b = append(b, Delimiter)
150+
w := bufio.NewWriter(tb.client)
151+
c, err = w.Write(b)
152+
if err == nil {
153+
w.Flush()
154+
}
155+
156+
return
157+
}
158+
159+
// Cancel - signal to tear down telemetry buffer
160+
func (tb *TelemetryBuffer) Cancel() {
161+
tb.cancel <- true
162+
}
163+
164+
// close - close all connections
165+
func (tb *TelemetryBuffer) close() {
166+
if tb.client != nil {
167+
tb.client.Close()
168+
}
169+
170+
if tb.listener != nil {
171+
tb.listener.Close()
172+
}
173+
174+
for _, conn := range tb.connections {
175+
if conn != nil {
176+
conn.Close()
177+
}
178+
}
179+
}
180+
181+
// sendToHost - send payload to host
182+
func (tb *TelemetryBuffer) sendToHost() error {
183+
httpc := &http.Client{}
184+
var body bytes.Buffer
185+
json.NewEncoder(&body).Encode(tb.payload)
186+
resp, err := httpc.Post(HostNetAgentURL, ContentType, &body)
187+
if err != nil {
188+
return fmt.Errorf("[Telemetry] HTTP Post returned error %v", err)
189+
}
190+
191+
defer resp.Body.Close()
192+
193+
if resp.StatusCode != http.StatusOK {
194+
return fmt.Errorf("[Telemetry] HTTP Post returned statuscode %d", resp.StatusCode)
195+
}
196+
197+
return nil
198+
}
199+
200+
// push - push the report (x) to corresponding slice
201+
func (pl *Payload) push(x interface{}) {
202+
switch x.(type) {
203+
case DNCReport:
204+
pl.DNCReports = append(pl.DNCReports, x.(DNCReport))
205+
case CNIReport:
206+
pl.CNIReports = append(pl.CNIReports, x.(CNIReport))
207+
case NPMReport:
208+
pl.NPMReports = append(pl.NPMReports, x.(NPMReport))
209+
}
210+
}
211+
212+
// reset - reset payload slices
213+
func (pl *Payload) reset() {
214+
pl.DNCReports = nil
215+
pl.DNCReports = make([]DNCReport, 0)
216+
pl.CNIReports = nil
217+
pl.CNIReports = make([]CNIReport, 0)
218+
pl.NPMReports = nil
219+
pl.NPMReports = make([]NPMReport, 0)
220+
}

telemetry/telemetrybuffer_linux.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright 2018 Microsoft. All rights reserved.
2+
// MIT License
3+
4+
package telemetry
5+
6+
import (
7+
"fmt"
8+
"net"
9+
"os"
10+
)
11+
12+
const (
13+
fdTemplate = "/tmp/%s.sock"
14+
)
15+
16+
// Dial - try to connect to/create a socket with 'name'
17+
func (tb *TelemetryBuffer) Dial(name string) (err error) {
18+
conn, err := net.Dial("unix", fmt.Sprintf(fdTemplate, name))
19+
if err == nil {
20+
tb.client = conn
21+
}
22+
23+
return err
24+
}
25+
26+
// Listen - try to create and listen on socket with 'name'
27+
func (tb *TelemetryBuffer) Listen(name string) (err error) {
28+
conn, err := net.Listen("unix", fmt.Sprintf(fdTemplate, name))
29+
if err == nil {
30+
tb.listener = conn
31+
}
32+
33+
return err
34+
}
35+
36+
// cleanup - manually remove socket
37+
func (tb *TelemetryBuffer) cleanup(name string) error {
38+
return os.Remove(fmt.Sprintf(fdTemplate, name))
39+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright 2018 Microsoft. All rights reserved.
2+
// MIT License
3+
4+
package telemetry
5+
6+
import (
7+
"fmt"
8+
9+
"github.com/Microsoft/go-winio"
10+
)
11+
12+
const (
13+
fdTemplate = "\\\\.\\pipe\\%s"
14+
)
15+
16+
// Dial - try to connect to a named pipe with 'name'
17+
func (tb *TelemetryBuffer) Dial(name string) (err error) {
18+
conn, err := winio.DialPipe(fmt.Sprintf(fdTemplate, name), nil)
19+
if err == nil {
20+
tb.client = conn
21+
}
22+
23+
return err
24+
}
25+
26+
// Listen - try to create and listen on named pipe with 'name'
27+
func (tb *TelemetryBuffer) Listen(name string) (err error) {
28+
listener, err := winio.ListenPipe(fmt.Sprintf(fdTemplate, name), nil)
29+
if err == nil {
30+
tb.listener = listener
31+
}
32+
33+
return err
34+
}
35+
36+
// cleanup - stub
37+
func (tb *TelemetryBuffer) cleanup(name string) error {
38+
return nil
39+
}

0 commit comments

Comments
 (0)