Skip to content

Commit 4be2f92

Browse files
Close telemetry socket when its done (#302)
* 1. fix for closing telemetry socket in cni 2. fix for closing connection socket if server receives error on read * added uts and addressed comments * removed from slice after closing connection
1 parent 6276bfc commit 4be2f92

File tree

4 files changed

+45
-23
lines changed

4 files changed

+45
-23
lines changed

cni/network/plugin/main.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ func main() {
184184
}
185185
}
186186

187+
defer tb.Close()
188+
187189
t := time.Now()
188190
cniReport.Timestamp = t.Format("2006-01-02 15:04:05")
189191
cniReport.GetReport(pluginName, version, ipamQueryURL)
@@ -205,15 +207,15 @@ func main() {
205207
if err != nil {
206208
log.Printf("Failed to create network plugin, err:%v.\n", err)
207209
reportPluginError(reportManager, tb, err)
208-
os.Exit(1)
210+
return
209211
}
210212

211213
netPlugin.SetCNIReport(cniReport)
212214

213215
if err = netPlugin.Plugin.InitializeKeyValueStore(&config); err != nil {
214216
log.Printf("Failed to initialize key-value store of network plugin, err:%v.\n", err)
215217
reportPluginError(reportManager, tb, err)
216-
os.Exit(1)
218+
return
217219
}
218220

219221
defer func() {
@@ -222,7 +224,7 @@ func main() {
222224
}
223225

224226
if recover() != nil {
225-
os.Exit(1)
227+
return
226228
}
227229
}()
228230

cni/telemetry/service/telemetrymain.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@ package main
33
// Entry point of the telemetry service if started by CNI
44

55
import (
6-
"fmt"
76
"time"
87

9-
"github.com/Azure/azure-container-networking/log"
108
"github.com/Azure/azure-container-networking/telemetry"
119
)
1210

@@ -19,28 +17,16 @@ func main() {
1917
var tb *telemetry.TelemetryBuffer
2018
var err error
2119

22-
log.SetName(azurecnitelemetry)
23-
log.SetLevel(log.LevelInfo)
24-
err = log.SetTarget(log.TargetLogfile)
25-
if err != nil {
26-
fmt.Printf("log settarget failed")
27-
}
28-
29-
log.Printf("[Telemetry] TelemetryBuffer process started")
3020
for {
3121
tb = telemetry.NewTelemetryBuffer("")
3222
err = tb.StartServer()
3323
if err == nil || tb.FdExists {
34-
log.Printf("[Telemetry] Server started")
3524
break
3625
}
3726

3827
tb.Cleanup(telemetry.FdName)
39-
40-
log.Printf("[Telemetry] Failed to establish telemetry buffer connection.")
4128
time.Sleep(time.Millisecond * 200)
4229
}
4330

4431
tb.BufferAndPushData(reportToHostIntervalInSeconds)
45-
log.Printf("[Telemetry] TelemetryBuffer process exiting")
4632
}

telemetry/telemetry_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ func TestMain(m *testing.M) {
105105
}
106106

107107
exitCode := m.Run()
108-
tb.Cancel()
109108
tb.Cleanup(FdName)
110109
os.Exit(exitCode)
111110
}
@@ -167,6 +166,15 @@ func TestReceiveTelemetryData(t *testing.T) {
167166
t.Errorf("payload doesn't contain CNI report")
168167
}
169168
}
169+
170+
func TestCloseTelemetryConnection(t *testing.T) {
171+
tb.Cancel()
172+
time.Sleep(300 * time.Millisecond)
173+
if len(tb.connections) != 0 {
174+
t.Errorf("server didn't close connection")
175+
}
176+
}
177+
170178
func TestSetReportState(t *testing.T) {
171179
err := reportManager.SetReportState("a.json")
172180
if err != nil {

telemetry/telemetrybuffer.go

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func NewTelemetryBuffer(hostReportURL string) *TelemetryBuffer {
7272

7373
tb.data = make(chan interface{})
7474
tb.cancel = make(chan bool, 1)
75-
tb.connections = make([]net.Conn, 1)
75+
tb.connections = make([]net.Conn, 0)
7676
tb.payload.DNCReports = make([]DNCReport, 0)
7777
tb.payload.CNIReports = make([]CNIReport, 0)
7878
tb.payload.NPMReports = make([]NPMReport, 0)
@@ -86,14 +86,21 @@ func NewTelemetryBuffer(hostReportURL string) *TelemetryBuffer {
8686
return &tb
8787
}
8888

89+
func remove(s []net.Conn, i int) []net.Conn {
90+
s[i] = s[len(s)-1]
91+
return s[:len(s)-1]
92+
}
93+
8994
// Starts Telemetry server listening on unix domain socket
9095
func (tb *TelemetryBuffer) StartServer() error {
9196
err := tb.Listen(FdName)
9297
if err != nil {
9398
tb.FdExists = strings.Contains(err.Error(), "in use") || strings.Contains(err.Error(), "Access is denied")
99+
telemetryLogger.Printf("Listen returns: %v", err.Error())
94100
return err
95101
}
96102

103+
telemetryLogger.Printf("Telemetry service started")
97104
// Spawn server goroutine to handle incoming connections
98105
go func() {
99106
for {
@@ -124,9 +131,20 @@ func (tb *TelemetryBuffer) StartServer() error {
124131
json.Unmarshal([]byte(reportStr), &cnsReport)
125132
tb.data <- cnsReport
126133
}
134+
} else {
135+
telemetryLogger.Printf("Server closing client connection")
136+
for index, value := range tb.connections {
137+
if value == conn {
138+
conn.Close()
139+
tb.connections = remove(tb.connections, index)
140+
return
141+
}
142+
}
127143
}
128144
}
129145
}()
146+
} else {
147+
return
130148
}
131149
}
132150
}()
@@ -147,7 +165,7 @@ func (tb *TelemetryBuffer) Connect() error {
147165

148166
// BufferAndPushData - BufferAndPushData running an instance if it isn't already being run elsewhere
149167
func (tb *TelemetryBuffer) BufferAndPushData(intervalms time.Duration) {
150-
defer tb.close()
168+
defer tb.Close()
151169
if !tb.FdExists {
152170
telemetryLogger.Printf("[Telemetry] Buffer telemetry data and send it to host")
153171
if intervalms < DefaultInterval {
@@ -168,11 +186,13 @@ func (tb *TelemetryBuffer) BufferAndPushData(intervalms time.Duration) {
168186
case report := <-tb.data:
169187
tb.payload.push(report)
170188
case <-tb.cancel:
189+
telemetryLogger.Printf("server cancel event")
171190
goto EXIT
172191
}
173192
}
174193
} else {
175194
<-tb.cancel
195+
telemetryLogger.Printf("Received cancel event")
176196
}
177197

178198
EXIT:
@@ -205,19 +225,25 @@ func (tb *TelemetryBuffer) Cancel() {
205225
tb.cancel <- true
206226
}
207227

208-
// close - close all connections
209-
func (tb *TelemetryBuffer) close() {
228+
// Close - close all connections
229+
func (tb *TelemetryBuffer) Close() {
210230
if tb.client != nil {
231+
telemetryLogger.Printf("client close")
211232
tb.client.Close()
233+
tb.client = nil
212234
}
213235

214236
if tb.listener != nil {
237+
telemetryLogger.Printf("server close")
215238
tb.listener.Close()
239+
tb.listener = nil
216240
}
217241

218-
for _, conn := range tb.connections {
242+
for index, conn := range tb.connections {
219243
if conn != nil {
244+
telemetryLogger.Printf("connection close")
220245
conn.Close()
246+
remove(tb.connections, index)
221247
}
222248
}
223249
}

0 commit comments

Comments
 (0)