Skip to content

Commit 8fbf290

Browse files
authored
Merge pull request #933 from wonderflow/fix1
support ignoreHidden for extract reader && fix dead lock on dirx close
2 parents 758953a + 0df9f51 commit 8fbf290

File tree

14 files changed

+351
-117
lines changed

14 files changed

+351
-117
lines changed

reader/autofile/autofile.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/qiniu/logkit/reader"
1515
"github.com/qiniu/logkit/reader/bufreader"
1616
"github.com/qiniu/logkit/reader/config"
17+
"github.com/qiniu/logkit/reader/dirx"
1718
"github.com/qiniu/logkit/reader/tailx"
1819
)
1920

@@ -39,6 +40,9 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (r reader.Reader, err error
3940
return bufreader.NewFileDirReader(meta, conf)
4041
case config.ModeFile:
4142
return bufreader.NewSingleFileReader(meta, conf)
43+
case config.ModeDirx:
44+
conf[config.KeyLogPath] = logpath
45+
return dirx.NewReader(meta, conf)
4246
default:
4347
err = errors.New("can not find property mode for this path " + logpath)
4448
}
@@ -51,6 +55,15 @@ func matchMode(logpath string) (path, mode string, err error) {
5155
logpath = filepath.Dir(logpath)
5256
}
5357
path = logpath
58+
if strings.HasSuffix(logpath, ".tar.gz") || strings.HasSuffix(logpath, ".tar") || strings.HasSuffix(logpath, ".zip") {
59+
mode = config.ModeDirx
60+
return
61+
}
62+
if strings.HasSuffix(logpath, ".gz") {
63+
mode = config.ModeTailx
64+
return
65+
}
66+
5467
//path with * matching tailx mode
5568
matchTailx := strings.Contains(logpath, "*")
5669
if matchTailx == true {

reader/bufreader/bufreader.go

Lines changed: 28 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ type LastSync struct {
5555
type BufReader struct {
5656
stopped int32
5757
buf []byte
58-
mutiLineCache []string
58+
mutiLineCache *LineCache
5959
rd reader.FileReader // reader provided by the client
6060
r, w int // buf read and write positions
6161
err error
@@ -136,6 +136,7 @@ func NewReaderSize(rd reader.FileReader, meta *reader.Meta, size int) (*BufReade
136136
r := new(BufReader)
137137
r.stopped = 0
138138
r.reset(make([]byte, size), rd)
139+
r.mutiLineCache = NewLineCache()
139140

140141
r.Meta = meta
141142
encodingWay := r.Meta.GetEncodingWay()
@@ -163,7 +164,7 @@ func NewReaderSize(rd reader.FileReader, meta *reader.Meta, size int) (*BufReade
163164
}
164165
}
165166
if len(linesbytes) > 0 {
166-
r.mutiLineCache = append(r.mutiLineCache, string(linesbytes))
167+
r.mutiLineCache.Append(string(linesbytes))
167168
}
168169
return r, nil
169170
}
@@ -183,7 +184,7 @@ func (b *BufReader) reset(buf []byte, r reader.FileReader) {
183184
rd: r,
184185
lastByte: -1,
185186
lastRuneSize: -1,
186-
mutiLineCache: make([]string, 0, 16),
187+
mutiLineCache: NewLineCache(),
187188
lastSync: LastSync{},
188189
mux: sync.Mutex{},
189190
statsLock: sync.RWMutex{},
@@ -242,18 +243,24 @@ func (b *BufReader) fill() {
242243
panic(errNegativeRead)
243244
}
244245
if b.latestSource != b.rd.Source() {
245-
//这个情况表示文件的数据源出现了变化,在buf中已经出现了2个数据源的数据,要定位是哪个位置的数据出现的分隔
246-
if rc, ok := b.rd.(seqfile.NewLineBytesRecorder); ok {
247-
SIdx := rc.NewLineBytesIndex()
246+
//这个情况表示文件的数据源出现了变化,在buf中已经出现了至少2个数据源的数据,要定位是哪个位置的数据出现的分隔
247+
if rc, ok := b.rd.(reader.NewSourceRecorder); ok {
248+
SIdx := rc.NewSourceIndex()
248249
for _, v := range SIdx {
249-
// 从 NewLineBytesIndex 函数中返回的index值就是本次读取的批次中上一个DataSource的数据量,加上b.w就是上个DataSource的整体数据
250+
// 从 NewSourceIndex 函数中返回的index值就是本次读取的批次中上一个DataSource的数据量,加上b.w就是上个DataSource的整体数据
250251
b.lastRdSource = append(b.lastRdSource, reader.SourceIndex{
251252
Source: v.Source,
252253
Index: b.w + v.Index,
253254
})
254255
}
255-
b.latestSource = b.rd.Source()
256+
} else {
257+
//如果没实现这个接口,那么就认为到上次读到的为止都是前一次source的文件
258+
b.lastRdSource = append(b.lastRdSource, reader.SourceIndex{
259+
Source: b.latestSource,
260+
Index: b.w,
261+
})
256262
}
263+
b.latestSource = b.rd.Source()
257264
}
258265

259266
b.w += n
@@ -404,24 +411,24 @@ func (b *BufReader) ReadPattern() (string, error) {
404411
line, err := b.ReadString('\n')
405412
//读取到line的情况
406413
if len(line) > 0 {
407-
if len(b.mutiLineCache) <= 0 {
408-
b.mutiLineCache = []string{line}
414+
if b.mutiLineCache.Size() <= 0 {
415+
b.mutiLineCache.Set([]string{line})
409416
continue
410417
}
411418
//匹配行首,成功则返回之前的cache,否则加入到cache,返回空串
412419
if b.multiLineRegexp.Match([]byte(line)) {
413420
tmp := line
414-
line = string(b.FormMutiLine())
415-
b.mutiLineCache = make([]string, 0, 16)
416-
b.mutiLineCache = append(b.mutiLineCache, tmp)
421+
line = string(b.mutiLineCache.Combine())
422+
b.mutiLineCache.Set(make([]string, 0, 16))
423+
b.mutiLineCache.Append(tmp)
417424
return line, err
418425
}
419-
b.mutiLineCache = append(b.mutiLineCache, line)
426+
b.mutiLineCache.Append(line)
420427
maxTimes = 0
421428
} else { //读取不到日志
422429
if err != nil {
423-
line = string(b.FormMutiLine())
424-
b.mutiLineCache = make([]string, 0, 16)
430+
line = string(b.mutiLineCache.Combine())
431+
b.mutiLineCache.Set(make([]string, 0, 16))
425432
return line, err
426433
}
427434
maxTimes++
@@ -432,36 +439,16 @@ func (b *BufReader) ReadPattern() (string, error) {
432439
}
433440
}
434441
//对于读取到了Cache的情况,继续循环,直到超过最大限制
435-
if b.calcMutiLineCache() > MaxHeadPatternBufferSize {
436-
line = string(b.FormMutiLine())
437-
b.mutiLineCache = make([]string, 0, 16)
442+
if b.mutiLineCache.TotalLen() > MaxHeadPatternBufferSize {
443+
line = string(b.mutiLineCache.Combine())
444+
b.mutiLineCache.Set(make([]string, 0, 16))
438445
return line, err
439446
}
440447
}
441448
}
442449

443450
func (b *BufReader) FormMutiLine() []byte {
444-
if len(b.mutiLineCache) <= 0 {
445-
return make([]byte, 0)
446-
}
447-
n := 0
448-
for i := 0; i < len(b.mutiLineCache); i++ {
449-
n += len(b.mutiLineCache[i])
450-
}
451-
452-
xb := make([]byte, n)
453-
bp := copy(xb, b.mutiLineCache[0])
454-
for _, s := range b.mutiLineCache[1:] {
455-
bp += copy(xb[bp:], s)
456-
}
457-
return xb
458-
}
459-
460-
func (b *BufReader) calcMutiLineCache() (ret int) {
461-
for _, v := range b.mutiLineCache {
462-
ret += len(v)
463-
}
464-
return
451+
return b.mutiLineCache.Combine()
465452
}
466453

467454
//ReadLine returns a string line as a normal Reader
@@ -561,7 +548,7 @@ func (b *BufReader) ReadDone() bool {
561548
func (b *BufReader) SyncMeta() {
562549
b.mux.Lock()
563550
defer b.mux.Unlock()
564-
linecache := string(b.FormMutiLine())
551+
linecache := string(b.mutiLineCache.Combine())
565552
//把linecache也缓存
566553
if b.lastSync.cache != linecache || b.lastSync.buf != string(b.buf) || b.r != b.lastSync.r || b.w != b.lastSync.w {
567554
log.Debugf("Runner[%v] %v sync meta started, linecache [%v] buf [%v] (%v %v)", b.Meta.RunnerName, b.Name(), linecache, string(b.buf), b.r, b.w)

reader/bufreader/linecache.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package bufreader
2+
3+
import (
4+
"sync"
5+
)
6+
7+
type LineCache struct {
8+
lines []string
9+
lock *sync.RWMutex
10+
}
11+
12+
func NewLineCache() *LineCache {
13+
return &LineCache{
14+
lines: make([]string, 0, 16),
15+
lock: new(sync.RWMutex),
16+
}
17+
}
18+
19+
func (l *LineCache) Size() int {
20+
l.lock.RLock()
21+
defer l.lock.RUnlock()
22+
return len(l.lines)
23+
}
24+
25+
func (l *LineCache) Set(r []string) {
26+
l.lock.Lock()
27+
defer l.lock.Unlock()
28+
l.lines = r
29+
}
30+
31+
func (l *LineCache) Append(r string) {
32+
l.lock.Lock()
33+
defer l.lock.Unlock()
34+
l.lines = append(l.lines, r)
35+
}
36+
37+
func (l *LineCache) TotalLen() int {
38+
l.lock.RLock()
39+
defer l.lock.RUnlock()
40+
var ret int
41+
for _, v := range l.lines {
42+
ret += len(v)
43+
}
44+
return ret
45+
}
46+
47+
func (l *LineCache) Combine() []byte {
48+
if l.Size() <= 0 {
49+
return make([]byte, 0)
50+
}
51+
n := l.TotalLen()
52+
xb := make([]byte, n)
53+
l.lock.RLock()
54+
defer l.lock.RUnlock()
55+
bp := copy(xb, l.lines[0])
56+
for _, s := range l.lines[1:] {
57+
bp += copy(xb[bp:], s)
58+
}
59+
return xb
60+
}

reader/bufreader/linecache_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package bufreader
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestLinecahe(t *testing.T) {
10+
l := NewLineCache()
11+
l.Append("abc")
12+
assert.Equal(t, 3, l.TotalLen())
13+
l.Append("456")
14+
assert.Equal(t, 6, l.TotalLen())
15+
assert.Equal(t, 2, l.Size())
16+
assert.Equal(t, "abc456", string(l.Combine()))
17+
l.Set([]string{"bbb"})
18+
assert.Equal(t, 3, l.TotalLen())
19+
l.Append("haha")
20+
assert.Equal(t, 7, l.TotalLen())
21+
assert.Equal(t, 2, l.Size())
22+
assert.Equal(t, "bbbhaha", string(l.Combine()))
23+
}

reader/dirx/dir_reader.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
. "github.com/qiniu/logkit/utils/models"
2525
)
2626

27+
var ErrAlreadyExist = errors.New("runner already exist")
28+
2729
type dirReader struct {
2830
status int32 // Note: 原子操作
2931
inactive int32 // Note: 原子操作,当 inactive>0 时才会被 expire 回收
@@ -180,11 +182,17 @@ func HasDirExpired(dir string, expire time.Duration) bool {
180182
return latestModTime.Add(expire).Before(time.Now())
181183
}
182184

185+
//对于读完的直接认为过期,因为不会追加新数据
186+
func (dr *dirReader) ReadDone() bool {
187+
return dr.br.ReadDone()
188+
}
189+
183190
func (dr *dirReader) HasExpired(expire time.Duration) bool {
184-
if dr.br.ReadDone() {
185-
//对于读完的直接认为过期,因为不会追加新数据
186-
return true
191+
// 如果过期时间为 0,则永不过期
192+
if expire.Nanoseconds() == 0 {
193+
return false
187194
}
195+
188196
// 在非 inactive 的情况下,数据尚未读完,有必要先继续处理
189197
return atomic.LoadInt32(&dr.inactive) > 0 && HasDirExpired(dr.logPath, expire)
190198
}
@@ -206,6 +214,7 @@ func (dr *dirReader) SyncMeta() string {
206214

207215
func (dr *dirReader) Close() error {
208216
defer log.Warnf("Runner[%v] log path[%v] reader has closed", dr.runnerName, dr.originalPath)
217+
dr.SyncMeta()
209218
err := dr.br.Close()
210219
if atomic.CompareAndSwapInt32(&dr.status, StatusRunning, StatusStopping) {
211220
log.Warnf("Runner[%v] log path[%v] reader is closing", dr.runnerName, dr.originalPath)
@@ -304,7 +313,7 @@ func (drs *dirReaders) NewReader(opts newReaderOptions, notFirstTime bool) (*dir
304313
}
305314
var fri reader.FileReader
306315
if reader.CompressedFile(opts.LogPath) {
307-
fri, err = extract.NewReader(subMeta, opts.LogPath)
316+
fri, err = extract.NewReader(subMeta, opts.LogPath, extract.Opts{IgnoreHidden: opts.IgnoreHidden, NewFileNewLine: opts.NewFileNewLine, IgnoreFileSuffixes: opts.IgnoreFileSuffixes, ValidFilesRegex: opts.ValidFilesRegex})
308317
if err != nil {
309318
return nil, fmt.Errorf("new extract reader: %v", err)
310319
}
@@ -338,6 +347,11 @@ func (drs *dirReaders) NewReader(opts newReaderOptions, notFirstTime bool) (*dir
338347
drs.lock.Lock()
339348
defer drs.lock.Unlock()
340349

350+
//double check
351+
if _, ok := drs.readers[opts.LogPath]; ok {
352+
return nil, ErrAlreadyExist
353+
}
354+
341355
dr.readcache = drs.cachedLines[opts.LogPath]
342356
opts.Meta.AddSubMeta(opts.LogPath, subMeta)
343357
drs.readers[opts.LogPath] = dr
@@ -358,7 +372,7 @@ func (drs *dirReaders) checkExpiredDirs() {
358372

359373
var expiredDirs []string
360374
for logPath, dr := range drs.readers {
361-
if dr.HasExpired(drs.expire) {
375+
if dr.HasExpired(drs.expire) || (drs.expireDelete && dr.ReadDone()) {
362376
if err := dr.Close(); err != nil {
363377
log.Errorf("Failed to close log path[%v] reader: %v", logPath, err)
364378
}
@@ -380,10 +394,9 @@ func (drs *dirReaders) checkExpiredDirs() {
380394
}
381395

382396
func (drs *dirReaders) getReaders() []*dirReader {
397+
readers := make([]*dirReader, 0, drs.Num())
383398
drs.lock.RLock()
384399
defer drs.lock.RUnlock()
385-
386-
readers := make([]*dirReader, 0, drs.Num())
387400
for _, dr := range drs.readers {
388401
readers = append(readers, dr)
389402
}

reader/dirx/dirx.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,9 @@ func (r *Reader) statLogPath() {
302302
expireMap: r.expireMap,
303303
}, r.notFirstTime)
304304
if err != nil {
305+
if err == ErrAlreadyExist {
306+
continue
307+
}
305308
err = fmt.Errorf("create new reader for log path %q failed: %v", logPath, err)
306309
r.sendError(err)
307310
log.Errorf("Runner[%v] %v, ignored this path", r.meta.RunnerName, err)

0 commit comments

Comments
 (0)