@@ -17,15 +17,20 @@ limitations under the License.
17
17
package kernelmonitor
18
18
19
19
import (
20
+ "bufio"
21
+ "bytes"
22
+ "io"
20
23
"os"
24
+ "strings"
21
25
"time"
22
26
23
27
"k8s.io/node-problem-detector/pkg/kernelmonitor/translator"
24
28
"k8s.io/node-problem-detector/pkg/kernelmonitor/types"
25
29
"k8s.io/node-problem-detector/pkg/kernelmonitor/util"
26
30
31
+ "github.com/coreos/go-systemd/sdjournal"
27
32
"github.com/golang/glog"
28
- "github.com/hpcloud /tail"
33
+ "github.com/google/cadvisor/utils /tail"
29
34
utilclock "github.com/pivotal-golang/clock"
30
35
)
31
36
@@ -54,12 +59,12 @@ type KernelLogWatcher interface {
54
59
55
60
type kernelLogWatcher struct {
56
61
// trans is the translator translates the log into internal format.
57
- trans translator.Translator
58
- cfg WatcherConfig
59
- tl * tail. Tail
60
- logCh chan * types.KernelLog
61
- tomb * util.Tomb
62
- clock utilclock.Clock
62
+ trans translator.Translator
63
+ cfg WatcherConfig
64
+ reader * bufio. Reader
65
+ logCh chan * types.KernelLog
66
+ tomb * util.Tomb
67
+ clock utilclock.Clock
63
68
}
64
69
65
70
// NewKernelLogWatcher creates a new kernel log watcher.
@@ -88,25 +93,17 @@ func (k *kernelLogWatcher) Watch() (<-chan *types.KernelLog, error) {
88
93
// To avoid this, we decide to add this temporarily hack. When KernelMonitor can't find the kernel
89
94
// log file, it will print a log and then return nil channel and no error. Since nil channel will
90
95
// always be blocked, the NodeProblemDetector will block forever.
91
- // TODO(random-liu):
92
- // 1. Add journald supports to support GCI.
93
- // 2. Schedule KernelMonitor only on supported node (with node label and selector)
94
96
if _ , err := os .Stat (path ); os .IsNotExist (err ) {
95
97
glog .Infof ("kernel log %q is not found, kernel monitor doesn't support the os distro" , path )
96
98
return nil , nil
97
99
}
98
100
// TODO(random-liu): Rate limit tail file.
99
- // TODO(random-liu): Figure out what happens if log lines are removed.
100
101
// Notice that, kernel log watcher doesn't look back to the rolled out logs.
101
- var err error
102
- k .tl , err = tail .TailFile (path , tail.Config {
103
- Poll : true ,
104
- ReOpen : true ,
105
- Follow : true ,
106
- })
102
+ reader , err := getLogReader (path )
107
103
if err != nil {
108
104
return nil , err
109
105
}
106
+ k .reader = bufio .NewReader (reader )
110
107
glog .Info ("Start watching kernel log" )
111
108
go k .watchLoop ()
112
109
return k .logCh , nil
@@ -126,15 +123,31 @@ func (k *kernelLogWatcher) watchLoop() {
126
123
if err != nil {
127
124
glog .Fatalf ("failed to parse duration %q: %v" , k .cfg .Lookback , err )
128
125
}
126
+ var buffer bytes.Buffer
129
127
for {
128
+
130
129
select {
131
- case line := <- k .tl .Lines :
132
- // Notice that tail has trimmed '\n'
133
- if line .Err != nil {
134
- glog .Errorf ("Tail error: %v" , line .Err )
135
- continue
136
- }
137
- log , err := k .trans .Translate (line .Text )
130
+ case <- k .tomb .Stopping ():
131
+ glog .Infof ("Stop watching kernel log" )
132
+ return
133
+ default :
134
+ }
135
+
136
+ line , err := k .reader .ReadString ('\n' )
137
+ if err != nil && err != io .EOF {
138
+ glog .Errorf ("exiting kernel log watch with error: %v" , err )
139
+ return
140
+ }
141
+ if line == "" {
142
+ time .Sleep (100 * time .Millisecond )
143
+ continue
144
+ }
145
+ if err == nil {
146
+ buffer .WriteString (line )
147
+ // trime `\n`
148
+ line = strings .TrimRight (buffer .String (), "\n " )
149
+ buffer .Reset ()
150
+ log , err := k .trans .Translate (line )
138
151
if err != nil {
139
152
glog .Infof ("Unable to parse line: %q, %v" , line , err )
140
153
continue
@@ -144,14 +157,54 @@ func (k *kernelLogWatcher) watchLoop() {
144
157
continue
145
158
}
146
159
k .logCh <- log
147
- case <- k .tomb .Stopping ():
148
- k .tl .Stop ()
149
- glog .Infof ("Stop watching kernel log" )
150
- return
160
+ } else { // err == io.EOF
161
+ buffer .WriteString (line )
151
162
}
152
163
}
153
164
}
154
165
166
+ // getLogReader gets a kernel log reader.
167
+ func getLogReader (path string ) (io.Reader , error ) {
168
+ reader , err := tryJournal ()
169
+ if err == nil {
170
+ return reader , nil
171
+ }
172
+ reader , err = tryLogFile (path )
173
+ if err == nil {
174
+ return reader , nil
175
+ }
176
+ return nil , err
177
+ }
178
+
179
+ func tryJournal () (io.Reader , error ) {
180
+ r , err := sdjournal .NewJournalReader (sdjournal.JournalReaderConfig {
181
+ NumFromTail : uint64 (0 ),
182
+ Matches : []sdjournal.Match {
183
+ {
184
+ Field : sdjournal .SD_JOURNAL_FIELD_TRANSPORT ,
185
+ Value : "kernel" ,
186
+ },
187
+ },
188
+ })
189
+ if err != nil {
190
+ return nil , fmt .Errorf ("Error opening journal: %v" , err )
191
+ }
192
+ if r == nil {
193
+ return nil , fmt .Errorf ("Got a nil reader" )
194
+ }
195
+ glog .Info ("Kernel log watcher use journal" )
196
+ return r , nil
197
+ }
198
+
199
+ func tryLogFile (path string ) (io.Reader , error ) {
200
+ tail , err := tail .NewTail (path )
201
+ if err != nil {
202
+ return nil , err
203
+ }
204
+ glog .Infof ("Kernel log watcher use log file: %s" , path )
205
+ return tail , nil
206
+ }
207
+
155
208
func parseDuration (s string ) (time.Duration , error ) {
156
209
// If the duration is not configured, just return 0 by default
157
210
if s == "" {
0 commit comments