Skip to content

Commit fb33d09

Browse files
authored
[+] rewrite ParseLogs() with timers and eliminate extra checks (#770)
Use `time.After()` instead of blocking `time.Sleep()`. Move preparation to the beginning of the function and do it only once. If configuration changes, reaper will destroy the current context and start another one with new configuration. No need to recalculate `csvlogRegex` and `logsGlobPath`. Add deferred latest handle closing. Add non-blocking send to sinks channel. Add unit tests.
1 parent ba468d3 commit fb33d09

File tree

2 files changed

+485
-86
lines changed

2 files changed

+485
-86
lines changed

internal/metrics/logparse.go

Lines changed: 51 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package metrics
22

33
import (
44
"bufio"
5+
"cmp"
56
"context"
67
"io"
78
"os"
@@ -74,7 +75,6 @@ func getFileWithNextModTimestamp(logsGlobPath, currentFile string) (string, erro
7475
if err != nil {
7576
continue
7677
}
77-
//log.Debugf("Stat().ModTime() for %s: %v", f, fi.ModTime())
7878
if (nextMod.IsZero() || fi.ModTime().Before(nextMod)) && fi.ModTime().After(fiCurrent.ModTime()) {
7979
nextMod = fi.ModTime()
8080
nextFile = f
@@ -91,13 +91,13 @@ func eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal map[string]i
9191
if ok {
9292
allSeverityCounts[strings.ToLower(s)] = parsedCount
9393
} else {
94-
allSeverityCounts[strings.ToLower(s)] = 0
94+
allSeverityCounts[strings.ToLower(s)] = int64(0)
9595
}
9696
parsedCount, ok = eventCountsTotal[s]
9797
if ok {
9898
allSeverityCounts[strings.ToLower(s)+"_total"] = parsedCount
9999
} else {
100-
allSeverityCounts[strings.ToLower(s)+"_total"] = 0
100+
allSeverityCounts[strings.ToLower(s)+"_total"] = int64(0)
101101
}
102102
}
103103
return MeasurementEnvelope{
@@ -114,66 +114,54 @@ func ParseLogs(ctx context.Context, mdb *sources.SourceConn, realDbname string,
114114
var latest, previous, serverMessagesLang string
115115
var latestHandle *os.File
116116
var reader *bufio.Reader
117-
var linesRead = 0 // to skip over already parsed lines on Postgres server restart for example
118-
var logsMatchRegex, logsMatchRegexPrev, logsGlobPath string
117+
var linesRead int // to skip over already parsed lines on Postgres server restart for example
118+
var logsMatchRegex, logsGlobPath string
119119
var lastSendTime time.Time // to storage channel
120120
var eventCounts = make(map[string]int64) // for the specific DB. [WARNING: 34, ERROR: 10, ...], zeroed on storage send
121121
var eventCountsTotal = make(map[string]int64) // for the whole instance
122-
var hostConfig sources.HostConfigAttrs
123122
var err error
124123
var firstRun = true
125124
var csvlogRegex *regexp.Regexp
126-
logger := log.GetLogger(ctx).WithField("source", mdb.Name)
127-
for { // re-try loop. re-start in case of FS errors or just to refresh host config
128-
select {
129-
case <-ctx.Done():
125+
var currInterval time.Duration
126+
127+
logger := log.GetLogger(ctx).WithField("source", mdb.Name).WithField("metric", specialMetricServerLogEventCounts)
128+
129+
csvlogRegex, err = regexp.Compile(cmp.Or(mdb.HostConfig.LogsMatchRegex, CSVLogDefaultRegEx))
130+
if err != nil {
131+
logger.WithError(err).Print("Invalid regex: ", logsMatchRegex)
132+
return
133+
}
134+
logger.Debugf("Changing logs parsing regex to: %s", logsMatchRegex)
135+
136+
if mdb.HostConfig.LogsGlobPath != "" {
137+
logsGlobPath = mdb.HostConfig.LogsGlobPath
138+
} else {
139+
if logsGlobPath, err = tryDetermineLogFolder(ctx, mdb.Conn); err != nil {
140+
logger.WithError(err).Print("Could not determine Postgres logs parsing folder in ", logsGlobPath)
130141
return
131-
default:
132142
}
143+
}
144+
logger.Debugf("Considering log files determined by glob pattern: %s", logsGlobPath)
133145

134-
if hostConfig.LogsMatchRegex != "" {
135-
logsMatchRegex = hostConfig.LogsMatchRegex
136-
}
137-
if logsMatchRegex == "" {
138-
logger.Debug("Log parsing enabled with default CSVLOG regex")
139-
logsMatchRegex = CSVLogDefaultRegEx
140-
}
141-
if hostConfig.LogsGlobPath != "" {
142-
logsGlobPath = hostConfig.LogsGlobPath
143-
}
144-
if logsGlobPath == "" {
145-
logsGlobPath, err = tryDetermineLogFolder(ctx, mdb.Conn)
146-
if err != nil {
147-
logger.WithError(err).Print("Could not determine Postgres logs parsing folder. Configured logs_glob_path = ", logsGlobPath)
148-
time.Sleep(60 * time.Second)
149-
continue
150-
}
151-
}
152-
serverMessagesLang, err = tryDetermineLogMessagesLanguage(ctx, mdb.Conn)
153-
if err != nil {
154-
logger.WithError(err).Warning("Could not determine language (lc_collate) used for server logs, cannot parse logs...")
155-
time.Sleep(60 * time.Second)
156-
continue
157-
}
146+
if serverMessagesLang, err = tryDetermineLogMessagesLanguage(ctx, mdb.Conn); err != nil {
147+
logger.WithError(err).Print("Could not determine language (lc_collate) used for server logs")
148+
return
149+
}
158150

159-
if logsMatchRegexPrev != logsMatchRegex { // avoid regex recompile if no changes
160-
csvlogRegex, err = regexp.Compile(logsMatchRegex)
161-
if err != nil {
162-
logger.WithError(err).Print("Invalid regex: ", logsMatchRegex)
163-
time.Sleep(60 * time.Second)
164-
continue
151+
for { // re-try loop. re-start in case of FS errors or just to refresh host config
152+
select {
153+
case <-ctx.Done():
154+
return
155+
case <-time.After(currInterval):
156+
if currInterval == 0 {
157+
currInterval = time.Second * time.Duration(interval)
165158
}
166-
logger.Infof("Changing logs parsing regex to: %s", logsMatchRegex)
167-
logsMatchRegexPrev = logsMatchRegex
168159
}
169160

170-
logger.Debugf("Considering log files determined by glob pattern: %s", logsGlobPath)
171-
172161
if latest == "" || firstRun {
173162
globMatches, err := filepath.Glob(logsGlobPath)
174163
if err != nil || len(globMatches) == 0 {
175-
logger.Infof("No logfiles found to parse from glob '%s'. Sleeping 60s...", logsGlobPath)
176-
time.Sleep(60 * time.Second)
164+
logger.Infof("No logfiles found to parse from glob '%s'", logsGlobPath)
177165
continue
178166
}
179167

@@ -182,8 +170,7 @@ func ParseLogs(ctx context.Context, mdb *sources.SourceConn, realDbname string,
182170
// find latest timestamp
183171
latest, _ = getFileWithLatestTimestamp(globMatches)
184172
if latest == "" {
185-
logger.Warningf("Could not determine the latest logfile. Sleeping 60s...")
186-
time.Sleep(60 * time.Second)
173+
logger.Warningf("Could not determine the latest logfile")
187174
continue
188175
}
189176

@@ -196,22 +183,21 @@ func ParseLogs(ctx context.Context, mdb *sources.SourceConn, realDbname string,
196183
if latestHandle == nil {
197184
latestHandle, err = os.Open(latest)
198185
if err != nil {
199-
logger.Warningf("Failed to open logfile %s: %s. Sleeping 60s...", latest, err)
200-
time.Sleep(60 * time.Second)
186+
logger.Warningf("Failed to open logfile %s: %s", latest, err)
201187
continue
202188
}
189+
defer latestHandle.Close()
203190
reader = bufio.NewReader(latestHandle)
204191
if previous == latest && linesRead > 0 { // handle postmaster restarts
205192
i := 1
206193
for i <= linesRead {
207194
_, err = reader.ReadString('\n')
208195
if err == io.EOF && i < linesRead {
209-
logger.Warningf("Failed to open logfile %s: %s. Sleeping 60s...", latest, err)
196+
logger.Warningf("Failed to open logfile %s: %s", latest, err)
210197
linesRead = 0
211198
break
212199
} else if err != nil {
213200
logger.Warningf("Failed to skip %d logfile lines for %s, there might be duplicates reported. Error: %s", linesRead, latest, err)
214-
time.Sleep(60 * time.Second)
215201
linesRead = i
216202
break
217203
}
@@ -224,62 +210,37 @@ func ParseLogs(ctx context.Context, mdb *sources.SourceConn, realDbname string,
224210
}
225211
}
226212

227-
var eofSleepMillis = 0
228-
// readLoopStart := time.Now()
229-
230213
for {
231-
// if readLoopStart.Add(time.Second * time.Duration(opts.Source.Refresh)).Before(time.Now()) {
232-
// break // refresh config
233-
// }
234214
line, err := reader.ReadString('\n')
235215
if err != nil && err != io.EOF {
236-
logger.Warningf("Failed to read logfile %s: %s. Sleeping 60s...", latest, err)
237-
err = latestHandle.Close()
238-
if err != nil {
239-
logger.Warningf("Failed to close logfile %s properly: %s", latest, err)
240-
}
216+
logger.Warningf("Failed to read logfile %s: %s", latest, err)
217+
_ = latestHandle.Close()
241218
latestHandle = nil
242-
time.Sleep(60 * time.Second)
243219
break
244220
}
245221

246222
if err == io.EOF {
247-
//log.Debugf("EOF reached for logfile %s", latest)
248-
if eofSleepMillis < 5000 && float64(eofSleepMillis) < interval*1000 {
249-
eofSleepMillis += 100 // progressively sleep more if nothing going on but not more that 5s or metric interval
250-
}
251-
time.Sleep(time.Millisecond * time.Duration(eofSleepMillis))
252-
253223
// check for newly opened logfiles
254224
file, _ := getFileWithNextModTimestamp(logsGlobPath, latest)
255225
if file != "" {
256226
previous = latest
257227
latest = file
258-
err = latestHandle.Close()
228+
_ = latestHandle.Close()
259229
latestHandle = nil
260-
if err != nil {
261-
logger.Warningf("Failed to close logfile %s properly: %s", latest, err)
262-
}
263230
logger.Infof("Switching to new logfile: %s", file)
264231
linesRead = 0
265232
break
266233
}
267234
} else {
268-
eofSleepMillis = 0
269235
linesRead++
270236
}
271237

272238
if err == nil && line != "" {
273-
274239
matches := csvlogRegex.FindStringSubmatch(line)
275240
if len(matches) == 0 {
276-
//log.Debugf("No logline regex match for line:") // normal case actually for queries spanning multiple loglines
277-
//log.Debugf(line)
278241
goto send_to_storage_if_needed
279242
}
280-
281243
result := regexMatchesToMap(csvlogRegex, matches)
282-
//log.Debugf("RegexMatchesToMap: %+v", result)
283244
errorSeverity, ok := result["error_severity"]
284245
if !ok {
285246
logger.Error("error_severity group must be defined in parse regex:", csvlogRegex)
@@ -303,12 +264,16 @@ func ParseLogs(ctx context.Context, mdb *sources.SourceConn, realDbname string,
303264
}
304265

305266
send_to_storage_if_needed:
306-
if lastSendTime.IsZero() || lastSendTime.Before(time.Now().Add(-1*time.Second*time.Duration(interval))) {
267+
if lastSendTime.IsZero() || lastSendTime.Before(time.Now().Add(-time.Second*time.Duration(interval))) {
307268
logger.Debugf("Sending log event counts for last interval to storage channel. Local eventcounts: %+v, global eventcounts: %+v", eventCounts, eventCountsTotal)
308-
storeCh <- eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal, mdb)
309-
zeroEventCounts(eventCounts)
310-
zeroEventCounts(eventCountsTotal)
311-
lastSendTime = time.Now()
269+
select {
270+
case <-ctx.Done():
271+
return
272+
case storeCh <- eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal, mdb):
273+
zeroEventCounts(eventCounts)
274+
zeroEventCounts(eventCountsTotal)
275+
lastSendTime = time.Now()
276+
}
312277
}
313278

314279
} // file read loop

0 commit comments

Comments
 (0)