Skip to content

Commit 0c4cc67

Browse files
authored
Merge pull request #1192 from arjraman/master
feat(logwatchers): Restart kmsg parser if channel closes
2 parents ed5ec82 + 748fecd commit 0c4cc67

File tree

3 files changed

+200
-4
lines changed

3 files changed

+200
-4
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ debug.test
99
/output/
1010
coverage.out
1111
.idea/
12+
*.DS_Store
13+
*.iml

pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ import (
3030
"k8s.io/node-problem-detector/pkg/util/tomb"
3131
)
3232

33+
const (
34+
// retryDelay is the time to wait before attempting to restart the kmsg parser.
35+
retryDelay = 5 * time.Second
36+
)
37+
3338
type kernelLogWatcher struct {
3439
cfg types.WatcherConfig
3540
startTime time.Time
@@ -101,8 +106,21 @@ func (k *kernelLogWatcher) watchLoop() {
101106
return
102107
case msg, ok := <-kmsgs:
103108
if !ok {
104-
klog.Error("Kmsg channel closed")
105-
return
109+
klog.Error("Kmsg channel closed, attempting to restart kmsg parser")
110+
111+
// Close the old parser
112+
if err := k.kmsgParser.Close(); err != nil {
113+
klog.Errorf("Failed to close kmsg parser: %v", err)
114+
}
115+
116+
// Try to restart
117+
var restarted bool
118+
kmsgs, restarted = k.retryCreateParser()
119+
if !restarted {
120+
// Stopping was signaled
121+
return
122+
}
123+
continue
106124
}
107125
klog.V(5).Infof("got kernel message: %+v", msg)
108126
if msg.Message == "" {
@@ -122,3 +140,26 @@ func (k *kernelLogWatcher) watchLoop() {
122140
}
123141
}
124142
}
143+
144+
// retryCreateParser attempts to create a new kmsg parser.
145+
// It tries immediately first, then waits retryDelay between subsequent failures.
146+
// It returns the new message channel and true on success, or nil and false if stopping was signaled.
147+
func (k *kernelLogWatcher) retryCreateParser() (<-chan kmsgparser.Message, bool) {
148+
for {
149+
parser, err := kmsgparser.NewParser()
150+
if err == nil {
151+
k.kmsgParser = parser
152+
klog.Infof("Successfully restarted kmsg parser")
153+
return parser.Parse(), true
154+
}
155+
156+
klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err)
157+
158+
select {
159+
case <-k.tomb.Stopping():
160+
klog.Infof("Stop watching kernel log during restart attempt")
161+
return nil, false
162+
case <-time.After(retryDelay):
163+
}
164+
}
165+
}

pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go

Lines changed: 155 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package kmsg
1818

1919
import (
20+
"sync"
2021
"testing"
2122
"time"
2223

@@ -27,23 +28,44 @@ import (
2728
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
2829
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
2930
"k8s.io/node-problem-detector/pkg/util"
31+
"k8s.io/node-problem-detector/pkg/util/tomb"
3032
)
3133

3234
type mockKmsgParser struct {
33-
kmsgs []kmsgparser.Message
35+
kmsgs []kmsgparser.Message
36+
closeAfterSend bool
37+
closeCalled bool
38+
mu sync.Mutex
3439
}
3540

3641
func (m *mockKmsgParser) SetLogger(kmsgparser.Logger) {}
37-
func (m *mockKmsgParser) Close() error { return nil }
42+
43+
func (m *mockKmsgParser) Close() error {
44+
m.mu.Lock()
45+
defer m.mu.Unlock()
46+
m.closeCalled = true
47+
return nil
48+
}
49+
50+
func (m *mockKmsgParser) WasCloseCalled() bool {
51+
m.mu.Lock()
52+
defer m.mu.Unlock()
53+
return m.closeCalled
54+
}
55+
3856
func (m *mockKmsgParser) Parse() <-chan kmsgparser.Message {
3957
c := make(chan kmsgparser.Message)
4058
go func() {
4159
for _, msg := range m.kmsgs {
4260
c <- msg
4361
}
62+
if m.closeAfterSend {
63+
close(c)
64+
}
4465
}()
4566
return c
4667
}
68+
4769
func (m *mockKmsgParser) SeekEnd() error { return nil }
4870

4971
func TestWatch(t *testing.T) {
@@ -169,3 +191,134 @@ func TestWatch(t *testing.T) {
169191
}
170192
}
171193
}
194+
195+
func TestWatcherStopsGracefullyOnTombStop(t *testing.T) {
196+
now := time.Now()
197+
198+
mock := &mockKmsgParser{
199+
kmsgs: []kmsgparser.Message{
200+
{Message: "test message", Timestamp: now},
201+
},
202+
closeAfterSend: false, // Don't close, let tomb stop it
203+
}
204+
205+
w := &kernelLogWatcher{
206+
cfg: types.WatcherConfig{},
207+
startTime: now.Add(-time.Second),
208+
tomb: tomb.NewTomb(),
209+
logCh: make(chan *logtypes.Log, 100),
210+
kmsgParser: mock,
211+
}
212+
213+
logCh, err := w.Watch()
214+
assert.NoError(t, err)
215+
216+
// Should receive the message
217+
select {
218+
case log := <-logCh:
219+
assert.Equal(t, "test message", log.Message)
220+
case <-time.After(time.Second):
221+
t.Fatal("timeout waiting for log message")
222+
}
223+
224+
// Stop the watcher
225+
w.Stop()
226+
227+
// Log channel should be closed after stop
228+
select {
229+
case _, ok := <-logCh:
230+
assert.False(t, ok, "log channel should be closed after Stop()")
231+
case <-time.After(time.Second):
232+
t.Fatal("timeout waiting for log channel to close after Stop()")
233+
}
234+
235+
// Verify parser was closed
236+
assert.True(t, mock.WasCloseCalled(), "parser Close() should have been called")
237+
}
238+
239+
func TestWatcherProcessesEmptyMessages(t *testing.T) {
240+
now := time.Now()
241+
242+
mock := &mockKmsgParser{
243+
kmsgs: []kmsgparser.Message{
244+
{Message: "", Timestamp: now},
245+
{Message: "valid message", Timestamp: now.Add(time.Second)},
246+
{Message: "", Timestamp: now.Add(2 * time.Second)},
247+
},
248+
closeAfterSend: false,
249+
}
250+
251+
w := &kernelLogWatcher{
252+
cfg: types.WatcherConfig{},
253+
startTime: now.Add(-time.Second),
254+
tomb: tomb.NewTomb(),
255+
logCh: make(chan *logtypes.Log, 100),
256+
kmsgParser: mock,
257+
}
258+
259+
logCh, err := w.Watch()
260+
assert.NoError(t, err)
261+
262+
// Should only receive the non-empty message
263+
select {
264+
case log := <-logCh:
265+
assert.Equal(t, "valid message", log.Message)
266+
case <-time.After(time.Second):
267+
t.Fatal("timeout waiting for log message")
268+
}
269+
270+
// Stop the watcher and verify channel closes
271+
w.Stop()
272+
273+
select {
274+
case _, ok := <-logCh:
275+
assert.False(t, ok, "log channel should be closed after Stop()")
276+
case <-time.After(time.Second):
277+
t.Fatal("timeout waiting for log channel to close")
278+
}
279+
}
280+
281+
func TestWatcherTrimsMessageWhitespace(t *testing.T) {
282+
now := time.Now()
283+
284+
mock := &mockKmsgParser{
285+
kmsgs: []kmsgparser.Message{
286+
{Message: " message with spaces ", Timestamp: now},
287+
{Message: "\ttabbed message\t", Timestamp: now.Add(time.Second)},
288+
{Message: "\n\nnewlines\n\n", Timestamp: now.Add(2 * time.Second)},
289+
},
290+
closeAfterSend: false,
291+
}
292+
293+
w := &kernelLogWatcher{
294+
cfg: types.WatcherConfig{},
295+
startTime: now.Add(-time.Second),
296+
tomb: tomb.NewTomb(),
297+
logCh: make(chan *logtypes.Log, 100),
298+
kmsgParser: mock,
299+
}
300+
301+
logCh, err := w.Watch()
302+
assert.NoError(t, err)
303+
304+
expectedMessages := []string{"message with spaces", "tabbed message", "newlines"}
305+
306+
for _, expected := range expectedMessages {
307+
select {
308+
case log := <-logCh:
309+
assert.Equal(t, expected, log.Message)
310+
case <-time.After(time.Second):
311+
t.Fatalf("timeout waiting for message: %s", expected)
312+
}
313+
}
314+
315+
// Stop the watcher and verify channel closes
316+
w.Stop()
317+
318+
select {
319+
case _, ok := <-logCh:
320+
assert.False(t, ok, "log channel should be closed after Stop()")
321+
case <-time.After(time.Second):
322+
t.Fatal("timeout waiting for log channel to close")
323+
}
324+
}

0 commit comments

Comments
 (0)