Skip to content

Commit 06ae2b9

Browse files
committed
add onMsgCallback
1 parent 96e6d80 commit 06ae2b9

File tree

4 files changed

+47
-38
lines changed

4 files changed

+47
-38
lines changed

cmd/logparser.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func main() {
2222

2323
reader := bufio.NewReader(os.Stdin)
2424
ch := make(chan logparser.LogEntry)
25-
parser := logparser.NewParser(ch, nil)
25+
parser := logparser.NewParser(ch, nil, nil)
2626
t := time.Now()
2727
for {
2828
line, err := reader.ReadString('\n')
@@ -32,7 +32,7 @@ func main() {
3232
}
3333
break
3434
}
35-
ch <- logparser.LogEntry{Content: strings.TrimSuffix(line, "\n"), Level: logparser.LevelUnknown}
35+
ch <- logparser.LogEntry{Timestamp: time.Now(), Content: strings.TrimSuffix(line, "\n"), Level: logparser.LevelUnknown}
3636
}
3737
d := time.Since(t)
3838
defer parser.Stop()

multiline.go

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ var (
1313
)
1414

1515
type Message struct {
16-
Content string
17-
Level Level
16+
Timestamp time.Time
17+
Content string
18+
Level Level
1819
}
1920

2021
type MultilineCollector struct {
2122
Messages chan Message
23+
ts time.Time
2224
level Level
2325
lines []string
2426
isFirstLineContainsTimestamp bool
@@ -80,26 +82,18 @@ func (m *MultilineCollector) Add(entry LogEntry) {
8082

8183
func (m *MultilineCollector) add(entry LogEntry) {
8284
if len(m.lines) == 0 {
85+
m.ts = entry.Timestamp
8386
m.level = GuessLevel(entry.Content)
8487
if m.level == LevelUnknown && entry.Level != LevelUnknown {
8588
m.level = entry.Level
8689
}
8790
m.isFirstLineContainsTimestamp = containsTimestamp(entry.Content)
8891
}
89-
switch m.level {
90-
case LevelCritical, LevelError, LevelWarning:
91-
if len(m.lines) == 0 {
92-
m.appendLine(clean(entry.Content), entry.Content)
93-
} else {
94-
m.appendLine(entry.Content, entry.Content)
95-
}
96-
default:
97-
m.appendLine("", entry.Content)
98-
}
92+
m.appendLine(entry.Content)
9993
m.lastReceiveTime = time.Now()
10094
}
10195

102-
func (m *MultilineCollector) appendLine(value, rawValue string) {
96+
func (m *MultilineCollector) appendLine(value string) {
10397
m.lines = append(m.lines, value)
10498
}
10599

@@ -145,14 +139,16 @@ func (m *MultilineCollector) flushMessage() {
145139
}
146140
content := strings.TrimSpace(strings.Join(m.lines, "\n"))
147141
msg := Message{
148-
Content: content,
149-
Level: m.level,
142+
Timestamp: m.ts,
143+
Content: content,
144+
Level: m.level,
150145
}
151146
m.reset()
152147
m.Messages <- msg
153148
}
154149

155150
func (m *MultilineCollector) reset() {
151+
m.ts = time.Time{}
156152
m.lines = m.lines[:0]
157153
m.isFirstLineContainsTimestamp = false
158154
m.level = LevelUnknown

multiline_test.go

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import (
88
"time"
99
)
1010

11-
func writeByLine(m *MultilineCollector, data string) {
12-
for _, line := range strings.Split(data, "\n") {
13-
m.Add(LogEntry{Content: line, Level: LevelUnknown})
11+
func writeByLine(m *MultilineCollector, data string, ts time.Time) {
12+
for i, line := range strings.Split(data, "\n") {
13+
m.Add(LogEntry{Timestamp: ts.Add(time.Millisecond * time.Duration(i)), Content: line, Level: LevelUnknown})
1414
}
1515
}
1616

@@ -19,8 +19,7 @@ func TestMultilineCollector(t *testing.T) {
1919
m := NewMultilineCollector(ctx, 10*time.Millisecond)
2020
defer cancel()
2121

22-
dateStr := "2020-03-20 08:48:57,067 "
23-
tracebackStr := `ERROR [django.request:222] log 46 140452532862280 Internal Server Error: /article
22+
tracebackStr := `2020-03-20 08:48:57,067 ERROR [django.request:222] log 46 140452532862280 Internal Server Error: /article
2423
Traceback (most recent call last):
2524
File "/usr/local/lib/python3.8/site-packages/django/db/backends/base/base.py", line 220, in ensure_connection
2625
self.connect()
@@ -79,28 +78,28 @@ Traceback (most recent call last):
7978
super(Connection, self).__init__(*args, **kwargs2)
8079
django.db.utils.OperationalError: (1040, 'Too many connections')`
8180

82-
writeByLine(m, dateStr+tracebackStr)
81+
writeByLine(m, tracebackStr, time.Unix(100500, 0))
8382
msg := <-m.Messages
8483
assert.Equal(t, tracebackStr, msg.Content)
84+
assert.Equal(t, int64(100500), msg.Timestamp.Unix())
8585

86-
tracebackStr = `ERROR:__main__:Traceback (most recent call last):
86+
tracebackStr = `2020-03-20 08:48:57,067 ERROR:__main__:Traceback (most recent call last):
8787
File "<stdin>", line 2, in <module>
8888
File "<stdin>", line 2, in do_something_that_might_error
8989
File "<stdin>", line 2, in raise_error
9090
RuntimeError: something bad happened!`
91-
writeByLine(m, dateStr+tracebackStr)
91+
writeByLine(m, tracebackStr, time.Unix(0, 0))
9292
msg = <-m.Messages
9393
assert.Equal(t, tracebackStr, msg.Content)
9494

9595
m.Add(LogEntry{Content: "E0504 07:38:36.184861 1 replica_set.go:450] starting worker #224", Level: LevelUnknown})
9696
m.Add(LogEntry{Content: "E0504 07:38:36.184861 1 replica_set.go:450] starting worker #225", Level: LevelUnknown})
9797
msg = <-m.Messages
98-
assert.Equal(t, "E0504 1 replica_set.go:450] starting worker #224", msg.Content)
98+
assert.Equal(t, "E0504 07:38:36.184861 1 replica_set.go:450] starting worker #224", msg.Content)
9999
msg = <-m.Messages
100-
assert.Equal(t, "E0504 1 replica_set.go:450] starting worker #225", msg.Content)
100+
assert.Equal(t, "E0504 07:38:36.184861 1 replica_set.go:450] starting worker #225", msg.Content)
101101

102-
dateStr = "2020-03-31 11:35:06.158 "
103-
javaStackTraceStr := `[ERROR] javax.servlet.ServletException: Something bad happened
102+
javaStackTraceStr := `2020-03-31 11:35:06.158 [ERROR] javax.servlet.ServletException: Something bad happened
104103
at com.example.myproject.OpenSessionInViewFilter.doFilter(OpenSessionInViewFilter.java:60)
105104
at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1157)
106105
at com.example.myproject.ExceptionHandlerFilter.doFilter(ExceptionHandlerFilter.java:28)
@@ -163,16 +162,14 @@ Caused by: java.sql.SQLException: Violation of unique constraint MY_ENTITY_UK_1:
163162
at org.hibernate.id.insert.AbstractSelectingDelegate.performInsert(AbstractSelectingDelegate.java:57)
164163
... 54 more`
165164

166-
writeByLine(m, dateStr+javaStackTraceStr)
165+
writeByLine(m, javaStackTraceStr, time.Unix(0, 0))
167166
msg = <-m.Messages
168167
assert.Equal(t, javaStackTraceStr, msg.Content)
169168

170169
data := `Order response: {"statusCode":406,"body":{"timestamp":1648205755430,"status":406,"error":"Not Acceptable","exception":"works.weave.socks.orders.controllers.OrdersController$PaymentDeclinedException","message":"Payment declined: amount exceeds 100.00","path":"/orders"},"headers":{"x-application-context":"orders:80","content-type":"application/json;charset=UTF-8","transfer-encoding":"chunked","date":"Fri, 25 Mar 2022 10:55:55 GMT","connection":"close"},"request":{"uri":{"protocol":"http:","slashes":true,"auth":null,"host":"orders","port":80,"hostname":"orders","hash":null,"search":null,"query":null,"pathname":"/orders","path":"/orders","href":"http://orders/orders"},"method":"POST","headers":{"accept":"application/json","content-type":"application/json","content-length":232}}}
171170
Order response: {"timestamp":1648205755430,"status":406,"error":"Not Acceptable","exception":"works.weave.socks.orders.controllers.OrdersController$PaymentDeclinedException","message":"Payment declined: amount exceeds 100.00","path":"/orders"}
172171
`
173-
writeByLine(m, data)
172+
writeByLine(m, data, time.Unix(0, 0))
174173
msg = <-m.Messages
175-
assert.Equal(t,
176-
clean(strings.Split(data, "\n")[0]),
177-
msg.Content)
174+
assert.Equal(t, strings.Split(data, "\n")[0], msg.Content)
178175
}

parser.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ package logparser
33
import (
44
"context"
55
"sync"
6+
"time"
67
)
78

89
type LogEntry struct {
9-
Content string
10-
Level Level
10+
Timestamp time.Time
11+
Content string
12+
Level Level
1113
}
1214

1315
type LogCounter struct {
@@ -26,10 +28,18 @@ type Parser struct {
2628
multilineCollector *MultilineCollector
2729

2830
stop func()
31+
32+
onMsgCb OnMsgCallbackF
2933
}
3034

31-
func NewParser(ch <-chan LogEntry, decoder Decoder) *Parser {
32-
p := &Parser{decoder: decoder, patterns: map[patternKey]*patternStat{}}
35+
type OnMsgCallbackF func(ts time.Time, level Level, patternHash string, msg string)
36+
37+
func NewParser(ch <-chan LogEntry, decoder Decoder, onMsgCallback OnMsgCallbackF) *Parser {
38+
p := &Parser{
39+
decoder: decoder,
40+
patterns: map[patternKey]*patternStat{},
41+
onMsgCb: onMsgCallback,
42+
}
3343
ctx, stop := context.WithCancel(context.Background())
3444
p.stop = stop
3545
p.multilineCollector = NewMultilineCollector(ctx, multilineCollectorTimeout)
@@ -79,6 +89,9 @@ func (p *Parser) inc(msg Message) {
7989
p.patterns[key] = &patternStat{}
8090
}
8191
p.patterns[key].messages++
92+
if p.onMsgCb != nil {
93+
p.onMsgCb(msg.Timestamp, msg.Level, "", msg.Content)
94+
}
8295
return
8396
}
8497

@@ -97,6 +110,9 @@ func (p *Parser) inc(msg Message) {
97110
p.patterns[key] = stat
98111
}
99112
}
113+
if p.onMsgCb != nil {
114+
p.onMsgCb(msg.Timestamp, msg.Level, key.hash, msg.Content)
115+
}
100116
stat.messages++
101117
}
102118

0 commit comments

Comments
 (0)