@@ -32,6 +32,8 @@ class LogProcessor {
32
32
summaryTimeout : 900000 , // 15 minutes timeout for "stopped" message
33
33
enableStoppedMessages : true // Whether to send "stopped" messages
34
34
} ;
35
+ this . lastProcessedLine = new Map ( ) ; // Track last processed line for each stream
36
+ this . lastProcessedTimestamp = new Map ( ) ; // Track last processed timestamp for each stream
35
37
}
36
38
37
39
async initialize ( ) {
@@ -60,7 +62,9 @@ class LogProcessor {
60
62
buffer : [ ] ,
61
63
lastSent : 0 ,
62
64
isActive : true ,
63
- interval : null
65
+ interval : null ,
66
+ lastProcessedLine : null , // Track last processed line
67
+ lastProcessedTimestamp : null // Track last processed timestamp
64
68
} ;
65
69
66
70
this . activeStreams . set ( streamKey , logStream ) ;
@@ -114,9 +118,31 @@ class LogProcessor {
114
118
}
115
119
116
120
const lines = chunk . split ( '\n' ) . filter ( line => line . trim ( ) ) ;
117
-
118
- for ( const line of lines ) {
119
- if ( this . shouldProcessLine ( line , logStream . config ) ) {
121
+ let newLines = lines ;
122
+
123
+ // Deduplication: Only process new lines since last processed
124
+ if ( logStream . lastProcessedLine ) {
125
+ const lastIdx = lines . lastIndexOf ( logStream . lastProcessedLine ) ;
126
+ if ( lastIdx !== - 1 && lastIdx < lines . length - 1 ) {
127
+ newLines = lines . slice ( lastIdx + 1 ) ;
128
+ } else if ( lastIdx === lines . length - 1 ) {
129
+ newLines = [ ] ;
130
+ }
131
+ }
132
+
133
+ for ( const line of newLines ) {
134
+ // Timestamp deduplication (if possible)
135
+ let skip = false ;
136
+ let lineTimestamp = null ;
137
+ const tsMatch = line . match ( / \d { 4 } - \d { 2 } - \d { 2 } T \d { 2 } : \d { 2 } : \d { 2 } (?: \. \d + ) ? Z ? / ) ;
138
+ if ( tsMatch ) {
139
+ lineTimestamp = new Date ( tsMatch [ 0 ] ) . getTime ( ) ;
140
+ if ( logStream . lastProcessedTimestamp && lineTimestamp <= logStream . lastProcessedTimestamp ) {
141
+ this . logger . debug ( `[Dedup] Skipping old log line (timestamp): ${ line } ` ) ;
142
+ skip = true ;
143
+ }
144
+ }
145
+ if ( ! skip && this . shouldProcessLine ( line , logStream . config ) ) {
120
146
// Track the log pattern for consolidation
121
147
const patternData = this . trackLogPattern ( streamKey , line , logStream . config , logStream . slackService ) ;
122
148
@@ -135,7 +161,15 @@ class LogProcessor {
135
161
this . sendLogBuffer ( streamKey , logStream ) ;
136
162
}
137
163
}
164
+ } else if ( ! skip ) {
165
+ this . logger . debug ( `[Dedup] Skipping log line (filter/level): ${ line } ` ) ;
166
+ }
167
+ // Update last processed timestamp if available
168
+ if ( lineTimestamp ) {
169
+ logStream . lastProcessedTimestamp = lineTimestamp ;
138
170
}
171
+ // Always update last processed line
172
+ logStream . lastProcessedLine = line ;
139
173
}
140
174
141
175
// Check for stopped patterns after processing chunk
@@ -593,7 +627,7 @@ class LogProcessor {
593
627
this . logger . debug ( `Sent stopped message for pattern: ${ patternData . pattern } ` ) ;
594
628
// Remove the pattern from tracking and allow it to be reported again
595
629
this . logPatterns . delete ( patternKey ) ;
596
- // (If you want to keep the object for stats, you could instead reset isSuppressed )
630
+ // Clear suppression for this pattern (if any extra state is needed )
597
631
} catch ( error ) {
598
632
this . logger . error ( `Failed to send stopped message for ${ patternKey } ` , error ) ;
599
633
}
0 commit comments