Skip to content

Commit 3182d87

Browse files
committed
logs dedup
1 parent 17580cb commit 3182d87

File tree

1 file changed

+101
-7
lines changed

1 file changed

+101
-7
lines changed

internal/metrics/log_exporter.go

Lines changed: 101 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ const (
3434
LogCollectionInterval = 30 * time.Second
3535
MaxLogsPerBatch = 500
3636
MaxLogMessageLength = 2048
37-
MaxBufferSize = 10000 // Max logs in memory buffer
37+
MaxBufferSize = 5000 // Max logs in memory buffer (reduced to save memory)
3838

3939
// File tailing
4040
DefaultTailLines = 100
@@ -50,6 +50,10 @@ const (
5050
HTTPTimeout = 30 * time.Second
5151
MaxRetries = 3
5252
RetryBaseDelay = 1 * time.Second
53+
54+
// Deduplication settings
55+
DeduplicationWindow = 5 * time.Minute // Don't send same message within this window
56+
MaxDeduplicationHashes = 10000 // Max hashes to track
5357
)
5458

5559
// =============================================================================
@@ -783,18 +787,29 @@ func (le *LogExporter) addLogEntry(message, service, source, sourcePath string,
783787
le.addEntry(entry)
784788
}
785789

786-
// addEntry adds a log entry to the buffer with deduplication
790+
// addEntry adds a log entry to the buffer with deduplication and filtering
787791
func (le *LogExporter) addEntry(entry LogEntry) {
788-
// Generate hash for deduplication
789-
hash := le.hashEntry(entry)
792+
// 1. Filter by log level - only collect warn, error, fatal
793+
if !le.isImportantLevel(entry.Level) {
794+
return
795+
}
796+
797+
// 2. Filter noise logs (health checks, access logs, etc.)
798+
if le.isNoiseLog(entry.Message) {
799+
return
800+
}
801+
802+
// 3. Generate hash for deduplication (normalize message first)
803+
normalizedMsg := le.normalizeMessage(entry.Message)
804+
hash := le.hashEntryNormalized(entry.Source, entry.Service, string(entry.Level), normalizedMsg)
790805
entry.MessageHash = hash
791806

792807
le.bufferMu.Lock()
793808
defer le.bufferMu.Unlock()
794809

795-
// Check for duplicate (same hash within 1 minute)
810+
// 4. Check for duplicate (same hash within deduplication window)
796811
if lastSeen, ok := le.seenHashes[hash]; ok {
797-
if time.Since(lastSeen) < time.Minute {
812+
if time.Since(lastSeen) < DeduplicationWindow {
798813
return // Skip duplicate
799814
}
800815
}
@@ -812,11 +827,90 @@ func (le *LogExporter) addEntry(entry LogEntry) {
812827
}
813828

814829
// Clean old hashes
815-
if len(le.seenHashes) > MaxBufferSize*2 {
830+
if len(le.seenHashes) > MaxDeduplicationHashes {
816831
le.cleanSeenHashes()
817832
}
818833
}
819834

835+
// isImportantLevel returns true if log level should be collected
836+
func (le *LogExporter) isImportantLevel(level LogLevel) bool {
837+
switch level {
838+
case LogLevelWarn, LogLevelError, LogLevelFatal:
839+
return true
840+
default:
841+
return false
842+
}
843+
}
844+
845+
// isNoiseLog returns true if message is noise that should be filtered
846+
func (le *LogExporter) isNoiseLog(message string) bool {
847+
msgLower := strings.ToLower(message)
848+
849+
// Health check endpoints
850+
noisePatterns := []string{
851+
"/health",
852+
"/healthz",
853+
"/ready",
854+
"/readyz",
855+
"/live",
856+
"/livez",
857+
"/ping",
858+
"/metrics",
859+
"/favicon.ico",
860+
"health check",
861+
"healthcheck",
862+
// Common access log patterns (200 OK responses)
863+
"\" 200 ",
864+
"\" 204 ",
865+
"\" 301 ",
866+
"\" 302 ",
867+
"\" 304 ",
868+
// Routine messages
869+
"request completed",
870+
"request started",
871+
"connection accepted",
872+
"connection closed",
873+
}
874+
875+
for _, pattern := range noisePatterns {
876+
if strings.Contains(msgLower, pattern) {
877+
return true
878+
}
879+
}
880+
881+
return false
882+
}
883+
884+
// normalizeMessage removes variable parts (timestamps, PIDs, IPs) for better deduplication
885+
func (le *LogExporter) normalizeMessage(message string) string {
886+
// Remove timestamps like 2025-01-02T10:30:45
887+
msg := regexp.MustCompile(`\d{4}[-/]\d{2}[-/]\d{2}[T\s]\d{2}:\d{2}:\d{2}(\.\d+)?([+-]\d{2}:?\d{2}|Z)?`).ReplaceAllString(message, "")
888+
889+
// Remove PIDs like [12345] or pid=12345
890+
msg = regexp.MustCompile(`\[\d+\]|\bpid[=:]\d+`).ReplaceAllString(msg, "")
891+
892+
// Remove IP addresses
893+
msg = regexp.MustCompile(`\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}`).ReplaceAllString(msg, "")
894+
895+
// Remove port numbers like :8080
896+
msg = regexp.MustCompile(`:\d{2,5}\b`).ReplaceAllString(msg, "")
897+
898+
// Remove hex IDs (container IDs, request IDs)
899+
msg = regexp.MustCompile(`\b[a-f0-9]{12,}\b`).ReplaceAllString(msg, "")
900+
901+
// Collapse multiple spaces
902+
msg = regexp.MustCompile(`\s+`).ReplaceAllString(msg, " ")
903+
904+
return strings.TrimSpace(msg)
905+
}
906+
907+
// hashEntryNormalized generates hash from normalized components
908+
func (le *LogExporter) hashEntryNormalized(source, service, level, normalizedMsg string) string {
909+
data := fmt.Sprintf("%s|%s|%s|%s", source, service, level, normalizedMsg)
910+
hash := md5.Sum([]byte(data))
911+
return hex.EncodeToString(hash[:8])
912+
}
913+
820914
// hashEntry generates a hash for deduplication
821915
func (le *LogExporter) hashEntry(entry LogEntry) string {
822916
data := fmt.Sprintf("%s|%s|%s|%s", entry.Source, entry.Service, entry.Level, entry.Message)

0 commit comments

Comments
 (0)