Skip to content

Commit 748fecd

Browse files
committed
Remove opt-in knob for restarting kmsg parser and simplify retry loop
1 parent c530d1f commit 748fecd

File tree

3 files changed

+31
-181
lines changed

3 files changed

+31
-181
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ debug.test
99
/output/
1010
coverage.out
1111
.idea/
12+
*.DS_Store
13+
*.iml

pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
"time"
2323

2424
"github.com/euank/go-kmsg-parser/kmsgparser"
25-
klog "k8s.io/klog/v2"
25+
"k8s.io/klog/v2"
2626

2727
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
2828
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
@@ -33,10 +33,6 @@ import (
3333
const (
3434
// retryDelay is the time to wait before attempting to restart the kmsg parser.
3535
retryDelay = 5 * time.Second
36-
37-
// RestartOnErrorKey is the configuration key to enable restarting
38-
// the kmsg parser when the channel closes due to an error.
39-
RestartOnErrorKey = "restartOnError"
4036
)
4137

4238
type kernelLogWatcher struct {
@@ -92,12 +88,6 @@ func (k *kernelLogWatcher) Stop() {
9288
k.tomb.Stop()
9389
}
9490

95-
// restartOnError checks if the restart on error configuration is enabled.
96-
func (k *kernelLogWatcher) restartOnError() bool {
97-
value, exists := k.cfg.PluginConfig[RestartOnErrorKey]
98-
return exists && value == "true"
99-
}
100-
10191
// watchLoop is the main watch loop of kernel log watcher.
10292
func (k *kernelLogWatcher) watchLoop() {
10393
kmsgs := k.kmsgParser.Parse()
@@ -116,14 +106,7 @@ func (k *kernelLogWatcher) watchLoop() {
116106
return
117107
case msg, ok := <-kmsgs:
118108
if !ok {
119-
klog.Error("Kmsg channel closed")
120-
121-
// Only attempt to restart if configured to do so
122-
if !k.restartOnError() {
123-
return
124-
}
125-
126-
klog.Infof("Attempting to restart kmsg parser")
109+
klog.Error("Kmsg channel closed, attempting to restart kmsg parser")
127110

128111
// Close the old parser
129112
if err := k.kmsgParser.Close(); err != nil {
@@ -159,24 +142,24 @@ func (k *kernelLogWatcher) watchLoop() {
159142
}
160143

161144
// retryCreateParser attempts to create a new kmsg parser.
145+
// It tries immediately first, then waits retryDelay between subsequent failures.
162146
// It returns the new message channel and true on success, or nil and false if stopping was signaled.
163147
func (k *kernelLogWatcher) retryCreateParser() (<-chan kmsgparser.Message, bool) {
164148
for {
149+
parser, err := kmsgparser.NewParser()
150+
if err == nil {
151+
k.kmsgParser = parser
152+
klog.Infof("Successfully restarted kmsg parser")
153+
return parser.Parse(), true
154+
}
155+
156+
klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err)
157+
165158
select {
166159
case <-k.tomb.Stopping():
167160
klog.Infof("Stop watching kernel log during restart attempt")
168161
return nil, false
169162
case <-time.After(retryDelay):
170163
}
171-
172-
parser, err := kmsgparser.NewParser()
173-
if err != nil {
174-
klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err)
175-
continue
176-
}
177-
178-
k.kmsgParser = parser
179-
klog.Infof("Successfully restarted kmsg parser")
180-
return parser.Parse(), true
181164
}
182165
}

pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go

Lines changed: 17 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -192,149 +192,6 @@ func TestWatch(t *testing.T) {
192192
}
193193
}
194194

195-
func TestRestartOnErrorConfig(t *testing.T) {
196-
testCases := []struct {
197-
name string
198-
pluginConfig map[string]string
199-
expected bool
200-
}{
201-
{
202-
name: "nil config returns false",
203-
pluginConfig: nil,
204-
expected: false,
205-
},
206-
{
207-
name: "empty config returns false",
208-
pluginConfig: map[string]string{},
209-
expected: false,
210-
},
211-
{
212-
name: "key not present returns false",
213-
pluginConfig: map[string]string{"otherKey": "true"},
214-
expected: false,
215-
},
216-
{
217-
name: "key present but set to false returns false",
218-
pluginConfig: map[string]string{RestartOnErrorKey: "false"},
219-
expected: false,
220-
},
221-
{
222-
name: "key present and set to true returns true",
223-
pluginConfig: map[string]string{RestartOnErrorKey: "true"},
224-
expected: true,
225-
},
226-
{
227-
name: "key present but uppercase TRUE returns false",
228-
pluginConfig: map[string]string{RestartOnErrorKey: "TRUE"},
229-
expected: false,
230-
},
231-
{
232-
name: "key present but mixed case True returns false",
233-
pluginConfig: map[string]string{RestartOnErrorKey: "True"},
234-
expected: false,
235-
},
236-
}
237-
238-
for _, tc := range testCases {
239-
t.Run(tc.name, func(t *testing.T) {
240-
w := &kernelLogWatcher{
241-
cfg: types.WatcherConfig{
242-
PluginConfig: tc.pluginConfig,
243-
},
244-
}
245-
assert.Equal(t, tc.expected, w.restartOnError())
246-
})
247-
}
248-
}
249-
250-
func TestWatcherStopsOnChannelCloseWhenRestartDisabled(t *testing.T) {
251-
now := time.Now()
252-
253-
mock := &mockKmsgParser{
254-
kmsgs: []kmsgparser.Message{
255-
{Message: "test message", Timestamp: now},
256-
},
257-
closeAfterSend: true,
258-
}
259-
260-
w := &kernelLogWatcher{
261-
cfg: types.WatcherConfig{
262-
PluginConfig: map[string]string{
263-
RestartOnErrorKey: "false",
264-
},
265-
},
266-
startTime: now.Add(-time.Second),
267-
tomb: tomb.NewTomb(),
268-
logCh: make(chan *logtypes.Log, 100),
269-
kmsgParser: mock,
270-
}
271-
272-
logCh, err := w.Watch()
273-
assert.NoError(t, err)
274-
275-
// Should receive the message
276-
select {
277-
case log := <-logCh:
278-
assert.Equal(t, "test message", log.Message)
279-
case <-time.After(time.Second):
280-
t.Fatal("timeout waiting for log message")
281-
}
282-
283-
// Log channel should be closed since restart is disabled
284-
select {
285-
case _, ok := <-logCh:
286-
assert.False(t, ok, "log channel should be closed")
287-
case <-time.After(time.Second):
288-
t.Fatal("timeout waiting for log channel to close")
289-
}
290-
291-
// Verify parser was closed
292-
assert.True(t, mock.WasCloseCalled(), "parser Close() should have been called")
293-
}
294-
295-
func TestWatcherStopsOnChannelCloseWhenRestartNotConfigured(t *testing.T) {
296-
now := time.Now()
297-
298-
mock := &mockKmsgParser{
299-
kmsgs: []kmsgparser.Message{
300-
{Message: "test message", Timestamp: now},
301-
},
302-
closeAfterSend: true,
303-
}
304-
305-
w := &kernelLogWatcher{
306-
cfg: types.WatcherConfig{
307-
// No PluginConfig set
308-
},
309-
startTime: now.Add(-time.Second),
310-
tomb: tomb.NewTomb(),
311-
logCh: make(chan *logtypes.Log, 100),
312-
kmsgParser: mock,
313-
}
314-
315-
logCh, err := w.Watch()
316-
assert.NoError(t, err)
317-
318-
// Should receive the message
319-
select {
320-
case log := <-logCh:
321-
assert.Equal(t, "test message", log.Message)
322-
case <-time.After(time.Second):
323-
t.Fatal("timeout waiting for log message")
324-
}
325-
326-
// Log channel should be closed since restart is not configured
327-
select {
328-
case _, ok := <-logCh:
329-
assert.False(t, ok, "log channel should be closed")
330-
case <-time.After(time.Second):
331-
t.Fatal("timeout waiting for log channel to close")
332-
}
333-
334-
// Verify parser was closed
335-
assert.True(t, mock.WasCloseCalled(), "parser Close() should have been called")
336-
}
337-
338195
func TestWatcherStopsGracefullyOnTombStop(t *testing.T) {
339196
now := time.Now()
340197

@@ -346,11 +203,7 @@ func TestWatcherStopsGracefullyOnTombStop(t *testing.T) {
346203
}
347204

348205
w := &kernelLogWatcher{
349-
cfg: types.WatcherConfig{
350-
PluginConfig: map[string]string{
351-
RestartOnErrorKey: "true",
352-
},
353-
},
206+
cfg: types.WatcherConfig{},
354207
startTime: now.Add(-time.Second),
355208
tomb: tomb.NewTomb(),
356209
logCh: make(chan *logtypes.Log, 100),
@@ -392,7 +245,7 @@ func TestWatcherProcessesEmptyMessages(t *testing.T) {
392245
{Message: "valid message", Timestamp: now.Add(time.Second)},
393246
{Message: "", Timestamp: now.Add(2 * time.Second)},
394247
},
395-
closeAfterSend: true,
248+
closeAfterSend: false,
396249
}
397250

398251
w := &kernelLogWatcher{
@@ -414,10 +267,12 @@ func TestWatcherProcessesEmptyMessages(t *testing.T) {
414267
t.Fatal("timeout waiting for log message")
415268
}
416269

417-
// Channel should close, no more messages
270+
// Stop the watcher and verify channel closes
271+
w.Stop()
272+
418273
select {
419274
case _, ok := <-logCh:
420-
assert.False(t, ok, "log channel should be closed")
275+
assert.False(t, ok, "log channel should be closed after Stop()")
421276
case <-time.After(time.Second):
422277
t.Fatal("timeout waiting for log channel to close")
423278
}
@@ -432,7 +287,7 @@ func TestWatcherTrimsMessageWhitespace(t *testing.T) {
432287
{Message: "\ttabbed message\t", Timestamp: now.Add(time.Second)},
433288
{Message: "\n\nnewlines\n\n", Timestamp: now.Add(2 * time.Second)},
434289
},
435-
closeAfterSend: true,
290+
closeAfterSend: false,
436291
}
437292

438293
w := &kernelLogWatcher{
@@ -456,4 +311,14 @@ func TestWatcherTrimsMessageWhitespace(t *testing.T) {
456311
t.Fatalf("timeout waiting for message: %s", expected)
457312
}
458313
}
314+
315+
// Stop the watcher and verify channel closes
316+
w.Stop()
317+
318+
select {
319+
case _, ok := <-logCh:
320+
assert.False(t, ok, "log channel should be closed after Stop()")
321+
case <-time.After(time.Second):
322+
t.Fatal("timeout waiting for log channel to close")
323+
}
459324
}

0 commit comments

Comments
 (0)