Skip to content

Commit 0c8a302

Browse files
committed
fix log collector
1 parent 92b22ac commit 0c8a302

File tree

2 files changed

+67
-6
lines changed

2 files changed

+67
-6
lines changed

internal/metrics/log_collector.go

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package metrics
33
import (
44
"bufio"
55
"context"
6+
"crypto/md5"
7+
"encoding/hex"
68
"encoding/json"
79
"fmt"
810
"os"
@@ -11,6 +13,7 @@ import (
1113
"regexp"
1214
"strconv"
1315
"strings"
16+
"sync"
1417
"time"
1518
)
1619

@@ -20,6 +23,12 @@ const (
2023
maxLogLineLen = 2000 // Maximum length per log line
2124
)
2225

26+
// Package-level singleton for log collector to maintain deduplication state across collection cycles
27+
var (
28+
globalLogCollector *LogCollector
29+
globalLogCollectorOnce sync.Once
30+
)
31+
2332
// DockerContainer represents a running docker container
2433
type DockerContainer struct {
2534
ID string `json:"Id"`
@@ -39,6 +48,10 @@ type LogCollector struct {
3948
// Cache of running docker containers (container name/id -> DockerContainer)
4049
dockerContainers map[string]DockerContainer
4150
dockerLoaded bool
51+
52+
// Deduplication: track sent log hashes to avoid sending same logs twice
53+
sentLogHashes map[string]time.Time // hash -> when it was sent
54+
sentLogHashesMu sync.Mutex
4255
}
4356

4457
// NewLogCollector creates a new LogCollector
@@ -52,12 +65,24 @@ func NewLogCollector() *LogCollector {
5265
regexp.MustCompile(`(?i)(denied|unauthorized|forbidden|permission)`),
5366
},
5467
dockerContainers: make(map[string]DockerContainer),
68+
sentLogHashes: make(map[string]time.Time),
5569
}
5670
// Pre-load docker containers
5771
lc.loadDockerContainers()
5872
return lc
5973
}
6074

75+
// GetLogCollector returns the global log collector singleton
76+
// This ensures deduplication state is maintained across collection cycles
77+
func GetLogCollector() *LogCollector {
78+
globalLogCollectorOnce.Do(func() {
79+
globalLogCollector = NewLogCollector()
80+
})
81+
// Refresh docker containers list on each call (they might have changed)
82+
globalLogCollector.loadDockerContainers()
83+
return globalLogCollector
84+
}
85+
6186
// loadDockerContainers loads all running docker containers
6287
func (lc *LogCollector) loadDockerContainers() {
6388
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(logTimeout)*time.Second)
@@ -194,19 +219,55 @@ func (lc *LogCollector) CollectServiceLogs(service *ServiceInfo) ([]string, stri
194219
return nil, ""
195220
}
196221

197-
// collectDockerLogs collects recent logs from a Docker container (last 1 minute only)
222+
// collectDockerLogs collects recent logs from a Docker container
198223
func (lc *LogCollector) collectDockerLogs(containerID string) ([]string, error) {
199224
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(logTimeout)*time.Second)
200225
defer cancel()
201226

202-
// Get logs from last 1 minute only (to avoid duplicates on each 30s collection)
203-
cmd := exec.CommandContext(ctx, "docker", "logs", "--since", "1m", "--tail", fmt.Sprintf("%d", maxLogLines), "--timestamps", containerID)
227+
// Get last N lines of logs
228+
cmd := exec.CommandContext(ctx, "docker", "logs", "--tail", fmt.Sprintf("%d", maxLogLines), "--timestamps", containerID)
204229
output, err := cmd.CombinedOutput()
205230
if err != nil {
206231
return nil, err
207232
}
208233

209-
return lc.filterLogLines(string(output)), nil
234+
// Filter for error/warning lines, then deduplicate
235+
filtered := lc.filterLogLines(string(output))
236+
return lc.deduplicateLogs(filtered), nil
237+
}
238+
239+
// hashLogLine creates a hash of a log line for deduplication
240+
func (lc *LogCollector) hashLogLine(line string) string {
241+
hash := md5.Sum([]byte(line))
242+
return hex.EncodeToString(hash[:])
243+
}
244+
245+
// deduplicateLogs filters out logs that have already been sent
246+
func (lc *LogCollector) deduplicateLogs(logs []string) []string {
247+
lc.sentLogHashesMu.Lock()
248+
defer lc.sentLogHashesMu.Unlock()
249+
250+
// Clean up old hashes (older than 10 minutes) to prevent memory growth
251+
cutoff := time.Now().Add(-10 * time.Minute)
252+
for hash, sentAt := range lc.sentLogHashes {
253+
if sentAt.Before(cutoff) {
254+
delete(lc.sentLogHashes, hash)
255+
}
256+
}
257+
258+
var newLogs []string
259+
now := time.Now()
260+
261+
for _, log := range logs {
262+
hash := lc.hashLogLine(log)
263+
if _, alreadySent := lc.sentLogHashes[hash]; !alreadySent {
264+
// This is a new log line, mark it as sent
265+
lc.sentLogHashes[hash] = now
266+
newLogs = append(newLogs, log)
267+
}
268+
}
269+
270+
return newLogs
210271
}
211272

212273
// collectJournaldLogs collects recent error logs from journald

internal/metrics/service_detector.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -523,8 +523,8 @@ func GetServices() ([]ServiceInfo, error) {
523523
return nil, err
524524
}
525525

526-
// Collect logs for each service
527-
logCollector := NewLogCollector()
526+
// Collect logs for each service (using singleton to maintain deduplication state)
527+
logCollector := GetLogCollector()
528528
services = logCollector.GetAllServiceLogs(services)
529529

530530
return services, nil

0 commit comments

Comments
 (0)