Skip to content

Commit 8378a54

Browse files
committed
reader/sql: improve status check and implement DaemonReader
1 parent bcace8b commit 8378a54

File tree

23 files changed

+1912
-1464
lines changed

23 files changed

+1912
-1464
lines changed

reader/meta.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"path/filepath"
1010
"strconv"
1111
"strings"
12+
"sync"
1213
"time"
1314

1415
"github.com/json-iterator/go"
@@ -71,7 +72,8 @@ type Meta struct {
7172
RunnerName string
7273
extrainfo map[string]string
7374

74-
subMetas map[string]*Meta //对于tailx模式的情况会有嵌套的meta
75+
subMetaLock sync.RWMutex
76+
subMetas map[string]*Meta //对于tailx模式的情况会有嵌套的meta
7577
}
7678

7779
func getValidDir(dir string) (realPath string, err error) {
@@ -195,6 +197,9 @@ func NewMetaWithConf(conf conf.MapConf) (meta *Meta, err error) {
195197
}
196198

197199
func (m *Meta) AddSubMeta(key string, meta *Meta) error {
200+
m.subMetaLock.Lock()
201+
defer m.subMetaLock.Unlock()
202+
198203
if m.subMetas == nil {
199204
m.subMetas = make(map[string]*Meta)
200205
}
@@ -206,6 +211,9 @@ func (m *Meta) AddSubMeta(key string, meta *Meta) error {
206211
}
207212

208213
func (m *Meta) RemoveSubMeta(key string) {
214+
m.subMetaLock.Lock()
215+
defer m.subMetaLock.Unlock()
216+
209217
delete(m.subMetas, key)
210218
}
211219

@@ -547,7 +555,11 @@ func (m *Meta) GetDoneFiles() ([]File, error) {
547555
if err != nil {
548556
return nil, err
549557
}
558+
550559
//submeta
560+
m.subMetaLock.RLock()
561+
defer m.subMetaLock.RUnlock()
562+
551563
for _, mv := range m.subMetas {
552564
newfiles, err := mv.GetDoneFiles()
553565
if err != nil {
@@ -588,6 +600,10 @@ func (m *Meta) SetEncodingWay(e string) {
588600
if e != "UTF-8" {
589601
m.encodingWay = e
590602
}
603+
604+
m.subMetaLock.RLock()
605+
defer m.subMetaLock.RUnlock()
606+
591607
for _, mv := range m.subMetas {
592608
mv.SetEncodingWay(e)
593609
}
@@ -644,6 +660,10 @@ func (m *Meta) Reset() error {
644660
}
645661
}
646662
}
663+
664+
m.subMetaLock.RLock()
665+
defer m.subMetaLock.RUnlock()
666+
647667
for key, mv := range m.subMetas {
648668
err := mv.Reset()
649669
if err != nil {

0 commit comments

Comments
 (0)