Skip to content

Commit be7cc78

Browse files
committed
Properly close channel when monitor exits.
Signed-off-by: Lantao Liu <[email protected]>
1 parent 705cb01 commit be7cc78

File tree

9 files changed

+37
-14
lines changed

9 files changed

+37
-14
lines changed

cmd/logcounter/log_counter.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,11 @@ func main() {
4444
fmt.Print(err)
4545
os.Exit(int(types.Unknown))
4646
}
47-
actual := counter.Count()
47+
actual, err := counter.Count()
48+
if err != nil {
49+
fmt.Print(err)
50+
os.Exit(int(types.Unknown))
51+
}
4852
if actual >= fedo.Count {
4953
fmt.Printf("Found %d matching logs, which meets the threshold of %d\n", actual, fedo.Count)
5054
os.Exit(int(types.NonOK))

pkg/custompluginmonitor/custom_plugin_monitor.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,11 @@ func (c *customPluginMonitor) monitorLoop() {
128128

129129
for {
130130
select {
131-
case result := <-resultChan:
131+
case result, ok := <-resultChan:
132+
if !ok {
133+
glog.Errorf("Result channel closed: %s", c.configPath)
134+
return
135+
}
132136
glog.V(3).Infof("Receive new plugin result for %s: %+v", c.configPath, result)
133137
status := c.generateStatus(result)
134138
glog.Infof("New status generated: %+v", status)

pkg/custompluginmonitor/plugin/plugin.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func (p *Plugin) GetResultChan() <-chan cpmtypes.Result {
5555
func (p *Plugin) Run() {
5656
defer func() {
5757
glog.Info("Stopping plugin execution")
58+
close(p.resultChan)
5859
p.tomb.Done()
5960
}()
6061

pkg/logcounter/log_counter.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,15 @@ func NewJournaldLogCounter(options *options.LogCounterOptions) (types.LogCounter
6363
}, nil
6464
}
6565

66-
func (e *logCounter) Count() (count int) {
66+
func (e *logCounter) Count() (count int, err error) {
6767
start := e.clock.Now()
6868
for {
6969
select {
70-
case log := <-e.logCh:
70+
case log, ok := <-e.logCh:
71+
if !ok {
72+
err = fmt.Errorf("log channel closed unexpectedly")
73+
return
74+
}
7175
// We only want to count events up until the time at which we started.
7276
// Otherwise we would run forever
7377
if start.Before(log.Timestamp) {

pkg/logcounter/log_counter_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,10 @@ func TestCount(t *testing.T) {
120120
fakeClock.Step(2 * timeout)
121121
}
122122
}(tc.logs, logCh)
123-
actualCount := counter.Count()
123+
actualCount, err := counter.Count()
124+
if err != nil {
125+
t.Errorf("unexpected error %v", err)
126+
}
124127
if actualCount != tc.expectedCount {
125128
t.Errorf("got %d; expected %d", actualCount, tc.expectedCount)
126129
}

pkg/logcounter/types/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@ limitations under the License.
1717
package types
1818

1919
type LogCounter interface {
20-
Count() int
20+
Count() (int, error)
2121
}

pkg/systemlogmonitor/log_monitor.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,16 @@ func (l *logMonitor) Stop() {
125125

126126
// monitorLoop is the main loop of log monitor.
127127
func (l *logMonitor) monitorLoop() {
128-
defer l.tomb.Done()
128+
defer func() {
129+
close(l.output)
130+
l.tomb.Done()
131+
}()
129132
l.initializeStatus()
130133
for {
131134
select {
132135
case log, ok := <-l.logCh:
133136
if !ok {
134-
glog.Errorf("Log channel closed")
137+
glog.Errorf("Log channel closed: %s", l.configPath)
135138
return
136139
}
137140
l.parseLog(log)

pkg/systemlogmonitor/logwatchers/kmsg/log_watcher.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,21 +86,25 @@ func (k *kernelLogWatcher) Stop() {
8686

8787
// watchLoop is the main watch loop of kernel log watcher.
8888
func (k *kernelLogWatcher) watchLoop() {
89+
kmsgs := k.kmsgParser.Parse()
8990
defer func() {
91+
if err := k.kmsgParser.Close(); err != nil {
92+
glog.Errorf("Failed to close kmsg parser: %v", err)
93+
}
9094
close(k.logCh)
9195
k.tomb.Done()
9296
}()
93-
kmsgs := k.kmsgParser.Parse()
9497

9598
for {
9699
select {
97100
case <-k.tomb.Stopping():
98101
glog.Infof("Stop watching kernel log")
99-
if err := k.kmsgParser.Close(); err != nil {
100-
glog.Errorf("Failed to close kmsg parser: %v", err)
101-
}
102102
return
103-
case msg := <-kmsgs:
103+
case msg, ok := <-kmsgs:
104+
if !ok {
105+
glog.Error("Kmsg channel closed")
106+
return
107+
}
104108
glog.V(5).Infof("got kernel message: %+v", msg)
105109
if msg.Message == "" {
106110
continue

test/e2e/lib/gce/instance.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func CreateInstance(instance Instance, imageName string, imageProject string) (I
4646

4747
p, err := instance.ComputeService.Projects.Get(instance.Project).Do()
4848
if err != nil {
49-
return instance, fmt.Errorf("failed to get project info %q", instance.Project)
49+
return instance, fmt.Errorf("failed to get project info %q: %v", instance.Project, err)
5050
}
5151

5252
i := &compute.Instance{

0 commit comments

Comments
 (0)