Skip to content

[Fix] Overlapping duration validation#92

Merged
Abdulsametileri merged 2 commits intomainfrom
fix/overlapping-duration-validation
Dec 15, 2025
Merged

[Fix] Overlapping duration validation#92
Abdulsametileri merged 2 commits intomainfrom
fix/overlapping-duration-validation

Conversation

@ugurcanerdogan
Copy link
Member

Description

This PR addresses a critical race condition issue where overlapping Listen() goroutines could be spawned if the configured WorkDuration exceeded the calculated cron interval.

Problem

Previously, if a user configured a WorkDuration that was longer than the interval between cron triggers (e.g., Cron: "Every 1 min", Duration: "2 mins"), the kafka-cronsumer would start a new listener goroutine while the previous one was still running. This led to:

  • Race Conditions: Multiple consumers sharing the same resources.
  • Message Duplication: Concurrent consumers processing the same messages.
  • Potential Panics: Due to non-thread-safe operations in the underlying reader.

Solution

Instead of throwing a panic (which would be a hard breaking change) or allowing the unsafe behavior, we introduced a Smart Auto-Correction mechanism in the configuration validation phase:

  1. Validation Logic: During startup, the Validate() method checks if Duration >= CronInterval.
  2. Auto-Correction: If the duration exceeds the interval, it logs a WARNING and automatically sets the Duration to NonStopWork (0).
  3. Safe Behavior: NonStopWork mode ensures that the consumer runs continuously in a single goroutine, which is the intended behavior when the duration is set to cover the entire interval (and more).

Changes

  • Modified pkg/kafka/config.go: Added logic to parse the cron expression, calculate the interval, and apply the fix.
  • Updated pkg/kafka/config_test.go: Added unit tests to verify that the auto-correction works as expected and invalid configurations are handled gracefully.

Example Scenario

Before:

Consumer: {
    Cron: "*/1 * * * *", // Every 1 minute
    Duration: 2 * time.Minute
}
// Result: Every minute a new goroutine starts. At T+1m, two goroutines run concurrently.

After (With this PR):

// Log: WARN: Consumer duration is greater than cron interval. It is set to NonStopWork (0) automatically.
Consumer: {
    Cron: "*/1 * * * *",
    Duration: 0 // (NonStopWork)
}
// Result: Single goroutine runs continuously without overlap.

@Abdulsametileri Abdulsametileri merged commit 8b1e318 into main Dec 15, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug/Enhancement] Concurrent Listen goroutines when WorkDuration >= Cron interval causes race conditions

2 participants