Skip to content

Commit 5a04899

Browse files
committed
Implement Collect-Then-Send pattern to eliminate race conditions between file Create and Remove events that could cause duplicate FileTrackers or reading from already-removed files.
1 parent 77d9848 commit 5a04899

File tree

6 files changed

+577
-93
lines changed

6 files changed

+577
-93
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ docker_down dd:
5959
docker-compose down
6060

6161
# Build and push Docker image with version from VERSION file
62-
docker-build db: increment-version
62+
docker-build db:
6363
@NEW_VERSION=$$(cat VERSION); \
6464
echo "Building with version: $$NEW_VERSION"; \
6565
docker build . -t soulgarden/logfowd2:$$NEW_VERSION -t soulgarden/logfowd2:latest --platform linux/amd64; \

README.md

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# logfowd2
22

33
![Tests and linters](https://github.com/soulgarden/logfowd2/actions/workflows/main.yml/badge.svg)
4-
[![Version](https://img.shields.io/badge/version-0.1.0-blue.svg)](https://github.com/soulgarden/logfowd2)
5-
[![Tests](https://img.shields.io/badge/tests-269%20passing-success.svg)](https://github.com/soulgarden/logfowd2)
4+
[![Version](https://img.shields.io/badge/version-0.4.0-blue.svg)](https://github.com/soulgarden/logfowd2)
5+
[![Tests](https://img.shields.io/badge/tests-293%20passing-success.svg)](https://github.com/soulgarden/logfowd2)
66
[![Code Quality](https://img.shields.io/badge/linter-zero%20warnings-success.svg)](https://github.com/soulgarden/logfowd2)
77

88
**High-performance Kubernetes log forwarding tool built with Rust**
@@ -28,7 +28,7 @@ Logfowd2 is a memory-efficient log forwarding daemon designed for Kubernetes env
2828
### Advanced System Optimization
2929
- **MetadataCache System** - High-performance file metadata caching with TTL-based eviction (100ms TTL, LRU)
3030
- **Intelligent Retry Management** - Universal exponential backoff retry mechanism for all async operations
31-
- **Lock Optimization** - Drop/reacquire pattern minimizes lock contention and improves concurrency
31+
- **Atomic Event Handling** - Collect-Then-Send pattern eliminates race conditions in file event processing
3232
- **Event-Driven File Monitoring** - Uses filesystem events for instant rotation detection
3333
- **Historical Log Recovery** - Reads existing log content on startup (no data loss)
3434
- **Symlink Support** - Full support for Kubernetes symlinked log files
@@ -40,7 +40,7 @@ Logfowd2 is a memory-efficient log forwarding daemon designed for Kubernetes env
4040

4141
logfowd2 is built with Domain-Driven Design (DDD) principles:
4242
- **Modular design** - Clear separation between domain, infrastructure, and transport layers
43-
- **Comprehensive testing** - 269 tests covering all critical paths
43+
- **Comprehensive testing** - 293 tests covering all critical paths
4444
- **Type safety** - Leverages Rust's type system for compile-time guarantees
4545
- **Extensible architecture** - Ready for parallel file processing and custom extensions
4646

@@ -81,6 +81,7 @@ Buffer Management State Persist RetryMana
8181
#### Watcher (`src/watcher.rs`)
8282
- **Purpose**: Monitors `/var/log/pods` recursively using filesystem events
8383
- **NotifyBridge Integration**: Uses NotifyBridge to prevent filesystem notify callback blocking
84+
- **Atomic Event Handling**: Collect-Then-Send pattern via `handle_create_event`/`handle_remove_event` eliminates race conditions
8485
- **File Tracking**: Advanced FileTracker with symlink and rapid rotation support, leveraging MetadataCache
8586
- **Metadata Parsing**: Extracts Kubernetes metadata (namespace, pod, container) from log paths
8687
- **Initial Sync**: Processes existing files on startup with position restoration
@@ -140,7 +141,7 @@ Buffer Management State Persist RetryMana
140141
- **Parallel ES Workers**: Concurrent bulk operations with configurable pool sizing
141142
- **Adaptive Batching**: Size and time-based flushing with backpressure awareness
142143
- **Memory Streaming**: Bounded buffer architecture prevents memory growth
143-
- **Advanced Lock Optimization**: Drop/reacquire pattern minimizes lock contention during I/O operations
144+
- **Atomic Event Handling**: Collect-Then-Send pattern eliminates race conditions while maintaining high throughput
144145

145146
### Resource Efficiency
146147
- **Ultra-Low Memory Baseline**: 30-50Mi baseline memory usage
@@ -164,7 +165,7 @@ Buffer Management State Persist RetryMana
164165
## 🧪 Code Quality & Testing
165166

166167
### Test Coverage
167-
- **263 Comprehensive Tests**: Unit, integration, and edge case coverage
168+
- **293 Comprehensive Tests**: Unit, integration, and edge case coverage
168169
- **Domain Testing**: File rotation, symlinks, corrupted files, permission issues
169170
- **Network Testing**: Circuit breaker, retry logic, timeout behavior
170171
- **Memory Testing**: Backpressure, channel overflow, cache eviction
@@ -181,7 +182,7 @@ Buffer Management State Persist RetryMana
181182

182183
### Prerequisites
183184
- **Platform**: Linux/Unix only (uses `std::os::unix` APIs and Unix signals)
184-
- **Rust Toolchain**: 1.85+ (required for Rust 2024 edition support)
185+
- **Rust Toolchain**: 1.91+ (required for Rust 2024 edition support)
185186
- **Kubernetes**: 1.14+ with `/var/log/pods` access
186187
- **Elasticsearch**: 7.x+ or ZincSearch compatible target
187188

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.3.0
1+
0.4.0

src/infrastructure/elasticsearch/dead_letter_queue.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,9 @@ impl DeadLetterQueue {
172172
if returned_count > 0 || permanently_failed_count > 0 {
173173
debug!(
174174
"Returned {} events to DLQ, {} permanently failed (queue size: {})",
175-
returned_count, permanently_failed_count, queue.len()
175+
returned_count,
176+
permanently_failed_count,
177+
queue.len()
176178
);
177179
}
178180
}
@@ -1144,7 +1146,10 @@ mod tests {
11441146

11451147
// Stats should be updated
11461148
let stats = dlq.stats.read().await;
1147-
assert_eq!(stats.events_in_queue, 2, "Stats should reflect remaining events");
1149+
assert_eq!(
1150+
stats.events_in_queue, 2,
1151+
"Stats should reflect remaining events"
1152+
);
11481153
assert_eq!(stats.events_retried, 3, "Should track retried events");
11491154
}
11501155

@@ -1239,7 +1244,10 @@ mod tests {
12391244

12401245
// Stats should be updated
12411246
let stats = dlq.stats.read().await;
1242-
assert_eq!(stats.events_in_queue, 3, "Stats should reflect returned events");
1247+
assert_eq!(
1248+
stats.events_in_queue, 3,
1249+
"Stats should reflect returned events"
1250+
);
12431251
}
12441252

12451253
#[tokio::test]
@@ -1304,7 +1312,10 @@ mod tests {
13041312
async fn test_max_retry_config_default() {
13051313
// TDD: Test that default config has max_retry_count
13061314
let config = DeadLetterQueueConfig::default();
1307-
assert_eq!(config.max_retry_count, 5, "Default max_retry_count should be 5");
1315+
assert_eq!(
1316+
config.max_retry_count, 5,
1317+
"Default max_retry_count should be 5"
1318+
);
13081319
}
13091320

13101321
#[tokio::test]

src/sender.rs

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,10 @@ impl Sender {
172172
}
173173
Err(crate::transport::channels::SendError::Closed(returned_events)) => {
174174
// Channel closed - return events for later retry
175-
warn!("ES channel closed, {} events will be retried", returned_events.len());
175+
warn!(
176+
"ES channel closed, {} events will be retried",
177+
returned_events.len()
178+
);
176179
if self.metrics_enabled {
177180
metrics()
178181
.errors_total
@@ -1203,11 +1206,18 @@ mod tests {
12031206
.await
12041207
.expect("Should receive second batch - events must NOT be lost!")
12051208
.unwrap();
1206-
assert_eq!(batch2.len(), 5, "Second batch should have 5 events - none lost!");
1209+
assert_eq!(
1210+
batch2.len(),
1211+
5,
1212+
"Second batch should have 5 events - none lost!"
1213+
);
12071214

12081215
// Verify total events
12091216
let total_events = batch1.len() + batch2.len();
1210-
assert_eq!(total_events, 10, "All 10 events must be delivered, none lost!");
1217+
assert_eq!(
1218+
total_events, 10,
1219+
"All 10 events must be delivered, none lost!"
1220+
);
12111221

12121222
shutdown.notify_one();
12131223
let _ = timeout(Duration::from_millis(500), sender_handle).await;
@@ -1227,17 +1237,18 @@ mod tests {
12271237
es_sender.send(batch1).await.unwrap();
12281238

12291239
// Now channel is full - try_send should return the events, not lose them
1230-
let batch2 = vec![
1231-
create_test_event("second_1"),
1232-
create_test_event("second_2"),
1233-
];
1240+
let batch2 = vec![create_test_event("second_1"), create_test_event("second_2")];
12341241

12351242
// This should fail with Full error and return our events
12361243
let result = es_sender.try_send(batch2);
12371244

12381245
match result {
12391246
Err(SendError::Full(returned_events)) => {
1240-
assert_eq!(returned_events.len(), 2, "Events must be returned, not lost!");
1247+
assert_eq!(
1248+
returned_events.len(),
1249+
2,
1250+
"Events must be returned, not lost!"
1251+
);
12411252
assert_eq!(returned_events[0].message, "second_1");
12421253
assert_eq!(returned_events[1].message, "second_2");
12431254
}
@@ -1298,7 +1309,10 @@ mod tests {
12981309
})
12991310
.await;
13001311

1301-
assert!(sender_result.is_ok(), "Sender should complete within timeout");
1312+
assert!(
1313+
sender_result.is_ok(),
1314+
"Sender should complete within timeout"
1315+
);
13021316

13031317
// Wait for sender to finish
13041318
let _ = timeout(Duration::from_secs(1), sender_handle).await;
@@ -1426,7 +1440,10 @@ mod tests {
14261440

14271441
// Sender should complete (not hang forever) - within retry timeout
14281442
let result = timeout(Duration::from_secs(6), sender_handle).await;
1429-
assert!(result.is_ok(), "Sender should not hang when ES channel is closed");
1443+
assert!(
1444+
result.is_ok(),
1445+
"Sender should not hang when ES channel is closed"
1446+
);
14301447
}
14311448

14321449
#[tokio::test]

0 commit comments

Comments
 (0)