@@ -17,15 +17,21 @@ limitations under the License.
17
17
package kernelmonitor
18
18
19
19
import (
20
+ "bufio"
21
+ "bytes"
22
+ "fmt"
23
+ "io"
20
24
"os"
25
+ "strings"
21
26
"time"
22
27
23
28
"k8s.io/node-problem-detector/pkg/kernelmonitor/translator"
24
29
"k8s.io/node-problem-detector/pkg/kernelmonitor/types"
25
30
"k8s.io/node-problem-detector/pkg/kernelmonitor/util"
26
31
32
+ "github.com/coreos/go-systemd/sdjournal"
27
33
"github.com/golang/glog"
28
- "github.com/hpcloud /tail"
34
+ "github.com/google/cadvisor/utils /tail"
29
35
utilclock "github.com/pivotal-golang/clock"
30
36
)
31
37
@@ -54,12 +60,12 @@ type KernelLogWatcher interface {
54
60
55
61
type kernelLogWatcher struct {
56
62
// trans is the translator translates the log into internal format.
57
- trans translator.Translator
58
- cfg WatcherConfig
59
- tl * tail. Tail
60
- logCh chan * types.KernelLog
61
- tomb * util.Tomb
62
- clock utilclock.Clock
63
+ trans translator.Translator
64
+ cfg WatcherConfig
65
+ reader * bufio. Reader
66
+ logCh chan * types.KernelLog
67
+ tomb * util.Tomb
68
+ clock utilclock.Clock
63
69
}
64
70
65
71
// NewKernelLogWatcher creates a new kernel log watcher.
@@ -88,25 +94,17 @@ func (k *kernelLogWatcher) Watch() (<-chan *types.KernelLog, error) {
88
94
// To avoid this, we decide to add this temporarily hack. When KernelMonitor can't find the kernel
89
95
// log file, it will print a log and then return nil channel and no error. Since nil channel will
90
96
// always be blocked, the NodeProblemDetector will block forever.
91
- // TODO(random-liu):
92
- // 1. Add journald supports to support GCI.
93
- // 2. Schedule KernelMonitor only on supported node (with node label and selector)
94
97
if _ , err := os .Stat (path ); os .IsNotExist (err ) {
95
98
glog .Infof ("kernel log %q is not found, kernel monitor doesn't support the os distro" , path )
96
99
return nil , nil
97
100
}
98
101
// TODO(random-liu): Rate limit tail file.
99
- // TODO(random-liu): Figure out what happens if log lines are removed.
100
102
// Notice that, kernel log watcher doesn't look back to the rolled out logs.
101
- var err error
102
- k .tl , err = tail .TailFile (path , tail.Config {
103
- Poll : true ,
104
- ReOpen : true ,
105
- Follow : true ,
106
- })
103
+ reader , err := getLogReader (path )
107
104
if err != nil {
108
105
return nil , err
109
106
}
107
+ k .reader = bufio .NewReader (reader )
110
108
glog .Info ("Start watching kernel log" )
111
109
go k .watchLoop ()
112
110
return k .logCh , nil
@@ -126,15 +124,35 @@ func (k *kernelLogWatcher) watchLoop() {
126
124
if err != nil {
127
125
glog .Fatalf ("failed to parse duration %q: %v" , k .cfg .Lookback , err )
128
126
}
127
+ var buffer bytes.Buffer
129
128
for {
130
129
select {
131
- case line := <- k .tl .Lines :
132
- // Notice that tail has trimmed '\n'
133
- if line .Err != nil {
134
- glog .Errorf ("Tail error: %v" , line .Err )
135
- continue
136
- }
137
- log , err := k .trans .Translate (line .Text )
130
+ case <- k .tomb .Stopping ():
131
+ glog .Infof ("Stop watching kernel log" )
132
+ return
133
+ default :
134
+ }
135
+
136
+ line , err := k .reader .ReadString ('\n' )
137
+ if err != nil && err != io .EOF {
138
+ glog .Errorf ("exiting kernel log watch with error: %v" , err )
139
+ return
140
+ }
141
+ if err == io .EOF {
142
+ buffer .WriteString (line )
143
+ time .Sleep (100 * time .Millisecond )
144
+ continue
145
+ }
146
+ if line == "" {
147
+ time .Sleep (100 * time .Millisecond )
148
+ continue
149
+ }
150
+ if err == nil {
151
+ buffer .WriteString (line )
152
+ // trim `\n`
153
+ line = strings .TrimRight (buffer .String (), "\n " )
154
+ buffer .Reset ()
155
+ log , err := k .trans .Translate (line )
138
156
if err != nil {
139
157
glog .Infof ("Unable to parse line: %q, %v" , line , err )
140
158
continue
@@ -144,14 +162,48 @@ func (k *kernelLogWatcher) watchLoop() {
144
162
continue
145
163
}
146
164
k .logCh <- log
147
- case <- k .tomb .Stopping ():
148
- k .tl .Stop ()
149
- glog .Infof ("Stop watching kernel log" )
150
- return
151
165
}
152
166
}
153
167
}
154
168
169
+ // getLogReader gets a kernel log reader.
170
+ func getLogReader (path string ) (io.Reader , error ) {
171
+ if len (path ) != 0 {
172
+ return tryLogFile (path )
173
+ }
174
+ return tryJournal ()
175
+ }
176
+
177
+ func tryJournal () (io.Reader , error ) {
178
+ r , err := sdjournal .NewJournalReader (sdjournal.JournalReaderConfig {
179
+ NumFromTail : uint64 (0 ),
180
+ Matches : []sdjournal.Match {
181
+ {
182
+ Field : sdjournal .SD_JOURNAL_FIELD_TRANSPORT ,
183
+ Value : "kernel" ,
184
+ },
185
+ },
186
+ })
187
+ if err != nil {
188
+ return nil , fmt .Errorf ("error opening journal: %v" , err )
189
+ }
190
+ if r == nil {
191
+ return nil , fmt .Errorf ("got a nil reader" )
192
+ }
193
+ glog .Info ("Kernel log watcher use journal" )
194
+ return r , nil
195
+ }
196
+
197
+ func tryLogFile (path string ) (io.Reader , error ) {
198
+ tail , err := tail .NewTail (path )
199
+ if err != nil {
200
+ return nil , err
201
+ }
202
+ glog .Infof ("Kernel log watcher use log file: %s" , path )
203
+ time .Sleep (1000 * time .Millisecond )
204
+ return tail , nil
205
+ }
206
+
155
207
func parseDuration (s string ) (time.Duration , error ) {
156
208
// If the duration is not configured, just return 0 by default
157
209
if s == "" {
0 commit comments