Skip to content

Commit 6276bfc

Browse files
authored
Limit telemetry payload size to 65K (#303)
* Limiting the size of our buffered payload to ~2MB * Modifying payload cap from 2MB to 65535 bytes. * Modifying how we cap payload size.
1 parent aa055ab commit 6276bfc

File tree

2 files changed

+84
-22
lines changed

2 files changed

+84
-22
lines changed

telemetry/telemetry_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package telemetry
55

66
import (
7+
"bytes"
78
"encoding/json"
89
"fmt"
910
"log"
@@ -35,6 +36,28 @@ var ipamQueryResponse = "" +
3536
" </Interface>" +
3637
"</Interfaces>"
3738

39+
var sampleCniReport = CNIReport{
40+
IsNewInstance: false,
41+
EventMessage: "[azure-cns] Code:UnknownContainerID {IPConfiguration:{IPSubnet:{IPAddress: PrefixLength:0} DNSServers:[] GatewayIPAddress:} Routes:[] CnetAddressSpace:[] MultiTenancyInfo:{EncapType: ID:0} PrimaryInterfaceIdentifier: LocalIPConfiguration:{IPSubnet:{IPAddress: PrefixLength:0} DNSServers:[] GatewayIPAddress:} {ReturnCode:18 Message:NetworkContainer doesn't exist.}}.",
42+
Timestamp: "2019-02-27 17:44:47.319911225 +0000 UTC",
43+
Metadata: Metadata{
44+
Location: "EastUS2EUAP",
45+
VMName: "k8s-agentpool1-65609007-0",
46+
Offer: "aks",
47+
OsType: "Linux",
48+
PlacementGroupID: "",
49+
PlatformFaultDomain: "0",
50+
PlatformUpdateDomain: "0",
51+
Publisher: "microsoft-aks",
52+
ResourceGroupName: "rghostnetagttest",
53+
Sku: "aks-ubuntu-1604-201811",
54+
SubscriptionID: "eff73b63-f38d-4cb5-bad1-21f273c1e36b",
55+
Tags: "acsengineVersion:v0.25.0;creationSource:acsengine-k8s-agentpool1-65609007-0;orchestrator:Kubernetes:1.10.9;poolName:agentpool1;resourceNameSuffix:65609007",
56+
OSVersion: "2018.11.02",
57+
VMID: "eff73b63-f38d-4cb5-bad1-21f273c1e36b",
58+
VMSize: "Standard_DS2_v2",
59+
KernelVersion: ""}}
60+
3861
func TestMain(m *testing.M) {
3962
u, _ := url.Parse("tcp://" + ipamQueryUrl)
4063
ipamAgent, err := common.NewListener(u)
@@ -155,3 +178,18 @@ func TestSetReportState(t *testing.T) {
155178
t.Errorf("Error removing telemetry file due to %v", err)
156179
}
157180
}
181+
182+
func TestPayloadCap(t *testing.T) {
183+
// sampleCniReport is ~66 bytes and we're adding 2000 reports here to test that the payload will be capped to 65535
184+
for i := 0; i < 4; i++ {
185+
for j := 0; j < 500; j++ {
186+
tb.payload.push(sampleCniReport)
187+
}
188+
189+
var body bytes.Buffer
190+
json.NewEncoder(&body).Encode(tb.payload)
191+
if uint16(body.Len()) > MaxPayloadSize {
192+
t.Fatalf("Payload size exceeded max size of %d", MaxPayloadSize)
193+
}
194+
}
195+
}

telemetry/telemetrybuffer.go

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,24 @@ import (
2222
// FdName - file descriptor name
2323
// Delimiter - delimiter for socket reads/writes
2424
// azureHostReportURL - host net agent url of type payload
25-
// DefaultDncReportsSize - default DNC report slice size
26-
// DefaultCniReportsSize - default CNI report slice size
27-
// DefaultNpmReportsSize - default NPM report slice size
2825
// DefaultInterval - default interval for sending payload to host
26+
// logName - telemetry log name
27+
// MaxPayloadSize - max payload size in bytes
2928
const (
30-
FdName = "azure-vnet-telemetry"
31-
Delimiter = '\n'
32-
azureHostReportURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=payload"
33-
DefaultInterval = 60 * time.Second
34-
logName = "azure-vnet-telemetry"
35-
MaxPayloadSize = 2097
29+
FdName = "azure-vnet-telemetry"
30+
Delimiter = '\n'
31+
azureHostReportURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=payload"
32+
DefaultInterval = 10 * time.Second
33+
logName = "azure-vnet-telemetry"
34+
MaxPayloadSize uint16 = 65535
35+
dnc = "DNC"
36+
cns = "CNS"
37+
npm = "NPM"
38+
cni = "CNI"
3639
)
3740

3841
var telemetryLogger = log.NewLogger(logName, log.LevelInfo, log.TargetStderr)
42+
var payloadSize uint16 = 0
3943

4044
// TelemetryBuffer object
4145
type TelemetryBuffer struct {
@@ -108,7 +112,6 @@ func (tb *TelemetryBuffer) StartServer() error {
108112
json.Unmarshal([]byte(reportStr), &npmReport)
109113
tb.data <- npmReport
110114
} else if _, ok := tmp["CniSucceeded"]; ok {
111-
telemetryLogger.Printf("[Telemetry] Got cni report")
112115
var cniReport CNIReport
113116
json.Unmarshal([]byte(reportStr), &cniReport)
114117
tb.data <- cniReport
@@ -157,14 +160,12 @@ func (tb *TelemetryBuffer) BufferAndPushData(intervalms time.Duration) {
157160
case <-interval:
158161
// Send payload to host and clear cache when sent successfully
159162
// To-do : if we hit max slice size in payload, write to disk and process the logs on disk on future sends
160-
telemetryLogger.Printf("[Telemetry] send data to host")
161163
if err := tb.sendToHost(); err == nil {
162164
tb.payload.reset()
163165
} else {
164166
telemetryLogger.Printf("[Telemetry] sending to host failed with error %+v", err)
165167
}
166168
case report := <-tb.data:
167-
telemetryLogger.Printf("[Telemetry] Got data..Append it to buffer")
168169
tb.payload.push(report)
169170
case <-tb.cancel:
170171
goto EXIT
@@ -253,29 +254,29 @@ func (pl *Payload) push(x interface{}) {
253254
}
254255
}
255256

256-
if pl.len() < MaxPayloadSize {
257-
switch x.(type) {
258-
case DNCReport:
257+
if notExceeded, reportType := pl.payloadCapNotExceeded(x); notExceeded {
258+
switch reportType {
259+
case dnc:
259260
dncReport := x.(DNCReport)
260261
dncReport.Metadata = metadata
261262
pl.DNCReports = append(pl.DNCReports, dncReport)
262-
case CNIReport:
263+
case cni:
263264
cniReport := x.(CNIReport)
264265
cniReport.Metadata = metadata
265266
pl.CNIReports = append(pl.CNIReports, cniReport)
266-
case NPMReport:
267+
case npm:
267268
npmReport := x.(NPMReport)
268269
npmReport.Metadata = metadata
269270
pl.NPMReports = append(pl.NPMReports, npmReport)
270-
case CNSReport:
271+
case cns:
271272
cnsReport := x.(CNSReport)
272273
cnsReport.Metadata = metadata
273274
pl.CNSReports = append(pl.CNSReports, cnsReport)
274275
}
275276
}
276277
}
277278

278-
// reset - reset payload slices
279+
// reset - reset payload slices and sets payloadSize to 0
279280
func (pl *Payload) reset() {
280281
pl.DNCReports = nil
281282
pl.DNCReports = make([]DNCReport, 0)
@@ -285,11 +286,34 @@ func (pl *Payload) reset() {
285286
pl.NPMReports = make([]NPMReport, 0)
286287
pl.CNSReports = nil
287288
pl.CNSReports = make([]CNSReport, 0)
289+
payloadSize = 0
288290
}
289291

290-
// len - get number of payload items
291-
func (pl *Payload) len() int {
292-
return len(pl.CNIReports) + len(pl.CNSReports) + len(pl.DNCReports) + len(pl.NPMReports)
292+
// payloadCapNotExceeded - Returns whether payload cap will be exceeded as a result of adding the new report; and the report type
293+
// If the cap is not exceeded, we update the payload size here.
294+
func (pl *Payload) payloadCapNotExceeded(x interface{}) (notExceeded bool, reportType string) {
295+
var body bytes.Buffer
296+
switch x.(type) {
297+
case DNCReport:
298+
reportType = dnc
299+
json.NewEncoder(&body).Encode(x.(DNCReport))
300+
case CNIReport:
301+
reportType = cni
302+
json.NewEncoder(&body).Encode(x.(CNIReport))
303+
case NPMReport:
304+
reportType = npm
305+
json.NewEncoder(&body).Encode(x.(NPMReport))
306+
case CNSReport:
307+
reportType = cns
308+
json.NewEncoder(&body).Encode(x.(CNSReport))
309+
}
310+
311+
updatedPayloadSize := uint16(body.Len()) + payloadSize
312+
if notExceeded = updatedPayloadSize < MaxPayloadSize && payloadSize < updatedPayloadSize; notExceeded {
313+
payloadSize = updatedPayloadSize
314+
}
315+
316+
return
293317
}
294318

295319
// saveHostMetadata - save metadata got from wireserver to json file

0 commit comments

Comments
 (0)