Skip to content

Commit fc7aa46

Browse files
committed
fix(logwatchers/kmsg): prevent duplicate message replay after restart
1 parent 6411bde commit fc7aa46

File tree

1 file changed

+17
-5
lines changed

1 file changed

+17
-5
lines changed

pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ func (k *kernelLogWatcher) watchLoop() {
113113
klog.Errorf("Failed to close kmsg parser: %v", err)
114114
}
115115

116-
// Try to restart
116+
// Try to restart immediately. Backoff should apply only after a
117+
// failed createParser or SeekEnd attempt inside retryCreateParser().
117118
var restarted bool
118119
kmsgs, restarted = k.retryCreateParser()
119120
if !restarted {
@@ -143,18 +144,29 @@ func (k *kernelLogWatcher) watchLoop() {
143144

144145
// retryCreateParser attempts to create a new kmsg parser.
145146
// It tries immediately first, then waits retryDelay between subsequent failures.
147+
// On success, it seeks the new parser to the end of the kmsg ring buffer to
148+
// avoid replaying messages that were already processed before the restart.
149+
// Any messages written to kmsg between the old parser closing and the new
150+
// parser being seeked are not delivered; this is preferable to replaying an
151+
// entire ring buffer the watcher has already processed, especially when the
152+
// restart was triggered by a kmsg flood.
146153
// It returns the new message channel and true on success, or nil and false if stopping was signaled.
147154
func (k *kernelLogWatcher) retryCreateParser() (<-chan kmsgparser.Message, bool) {
148155
for {
149-
parser, err := kmsgparser.NewParser()
150-
if err == nil {
156+
parser, err := k.createParser()
157+
if err != nil {
158+
klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err)
159+
} else if seekErr := parser.SeekEnd(); seekErr != nil {
160+
klog.Errorf("Failed to seek new kmsg parser to end, retrying in %v: %v", retryDelay, seekErr)
161+
if closeErr := parser.Close(); closeErr != nil {
162+
klog.Errorf("Failed to close kmsg parser after seek failure: %v", closeErr)
163+
}
164+
} else {
151165
k.kmsgParser = parser
152166
klog.Infof("Successfully restarted kmsg parser")
153167
return parser.Parse(), true
154168
}
155169

156-
klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err)
157-
158170
select {
159171
case <-k.tomb.Stopping():
160172
klog.Infof("Stop watching kernel log during restart attempt")

0 commit comments

Comments
 (0)