Skip to content

Commit 5e4bdf8

Browse files
authored
Systemd "container" improvements (#278)
1 parent 1f36c57 commit 5e4bdf8

File tree

4 files changed

+25
-19
lines changed

4 files changed

+25
-19
lines changed

containers/container.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1071,13 +1071,13 @@ func (c *Container) runLogParser(logPath string) {
10711071
switch c.cgroup.ContainerType {
10721072
case cgroup.ContainerTypeSystemdService:
10731073
ch := make(chan logparser.LogEntry)
1074-
if err := JournaldSubscribe(c.cgroup, ch); err != nil {
1074+
if err := JournaldSubscribe(c.metadata.systemd.Unit, ch); err != nil {
10751075
klog.Warningln(err)
10761076
return
10771077
}
10781078
parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId), multilineCollectorTimeout, *flags.LogPatternsPerContainer)
10791079
stop := func() {
1080-
JournaldUnsubscribe(c.cgroup)
1080+
JournaldUnsubscribe(c.metadata.systemd.Unit)
10811081
}
10821082
klog.InfoS("started journald logparser", "cg", c.cgroup.Id)
10831083
c.logParsers["journald"] = &LogParser{parser: parser, stop: stop}

containers/journald.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package containers
33
import (
44
"fmt"
55

6-
"github.com/coroot/coroot-node-agent/cgroup"
76
"github.com/coroot/coroot-node-agent/logs"
87
"github.com/coroot/coroot-node-agent/proc"
98
"github.com/coroot/logparser"
@@ -25,20 +24,20 @@ func JournaldInit() error {
2524
return nil
2625
}
2726

28-
func JournaldSubscribe(cg *cgroup.Cgroup, ch chan<- logparser.LogEntry) error {
27+
func JournaldSubscribe(unit string, ch chan<- logparser.LogEntry) error {
2928
if journaldReader == nil {
3029
return fmt.Errorf("journald reader not initialized")
3130
}
32-
err := journaldReader.Subscribe(cg.Id, ch)
31+
err := journaldReader.Subscribe(unit, ch)
3332
if err != nil {
3433
return err
3534
}
3635
return nil
3736
}
3837

39-
func JournaldUnsubscribe(cg *cgroup.Cgroup) {
38+
func JournaldUnsubscribe(unit string) {
4039
if journaldReader == nil {
4140
return
4241
}
43-
journaldReader.Unsubscribe(cg.Id)
42+
journaldReader.Unsubscribe(unit)
4443
}

containers/systemd.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,16 @@ func getSystemdProperties(id string) SystemdProperties {
9090
ctx, cancel := context.WithTimeout(context.Background(), dbusTimeout)
9191
defer cancel()
9292
parts := strings.Split(id, "/")
93-
unit := parts[len(parts)-1]
94-
props.Unit = unit
95-
properties, err := dbusConn.GetAllPropertiesContext(ctx, unit)
93+
for _, p := range parts {
94+
if strings.HasSuffix(p, ".service") {
95+
props.Unit = p
96+
break
97+
}
98+
}
99+
if props.Unit == "" {
100+
props.Unit = parts[len(parts)-1]
101+
}
102+
properties, err := dbusConn.GetAllPropertiesContext(ctx, props.Unit)
96103
if err != nil {
97104
klog.Warningln("failed to get systemd properties:", err)
98105
return props

logs/journald_reader.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func (r *JournaldReader) follow() {
8686
Level: logparser.LevelByPriority(e.Fields[sdjournal.SD_JOURNAL_FIELD_PRIORITY]),
8787
}
8888
r.lock.Lock()
89-
ch, ok := r.subscribers[e.Fields[sdjournal.SD_JOURNAL_FIELD_SYSTEMD_CGROUP]]
89+
ch, ok := r.subscribers[e.Fields[sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT]]
9090
r.lock.Unlock()
9191
if !ok {
9292
continue
@@ -95,24 +95,24 @@ func (r *JournaldReader) follow() {
9595
}
9696
}
9797

98-
func (r *JournaldReader) Subscribe(cgroup string, ch chan<- logparser.LogEntry) error {
98+
func (r *JournaldReader) Subscribe(unit string, ch chan<- logparser.LogEntry) error {
9999
r.lock.Lock()
100100
defer r.lock.Unlock()
101-
if _, ok := r.subscribers[cgroup]; ok {
102-
return fmt.Errorf(`duplicate subscriber for cgroup %s`, cgroup)
101+
if _, ok := r.subscribers[unit]; ok {
102+
return fmt.Errorf(`duplicate subscriber for unit %s`, unit)
103103
}
104-
r.subscribers[cgroup] = ch
104+
r.subscribers[unit] = ch
105105
return nil
106106
}
107107

108-
func (r *JournaldReader) Unsubscribe(cgroup string) {
108+
func (r *JournaldReader) Unsubscribe(unit string) {
109109
r.lock.Lock()
110110
defer r.lock.Unlock()
111-
if _, ok := r.subscribers[cgroup]; !ok {
112-
klog.Warning("unknown subscriber for cgroup", cgroup)
111+
if _, ok := r.subscribers[unit]; !ok {
112+
klog.Warning("unknown subscriber for unit", unit)
113113
return
114114
}
115-
delete(r.subscribers, cgroup)
115+
delete(r.subscribers, unit)
116116
}
117117

118118
func (r *JournaldReader) Close() {

0 commit comments

Comments
 (0)