Skip to content

Commit b372f21

Browse files
author
Mohit Tambi
committed
Offset handling, data loss, fault tolerance added
1 parent 33cd114 commit b372f21

File tree

8 files changed

+995
-5
lines changed

8 files changed

+995
-5
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,39 @@ public class ConsumerConfig extends AbstractConfig {
180180
"<p>Note that altering partition numbers while setting this config to latest may cause message delivery loss since " +
181181
"producers could start to send messages to newly added partitions (i.e. no initial offsets exist yet) before consumers reset their offsets.";
182182

183+
/**
184+
* <code>enable.data.loss.detection</code>
185+
*/
186+
public static final String ENABLE_DATA_LOSS_DETECTION_CONFIG = "enable.data.loss.detection";
187+
public static final String ENABLE_DATA_LOSS_DETECTION_DOC = "Enables data loss detection for the consumer. When enabled, the consumer will detect and handle scenarios " +
188+
"that could lead to data loss, including offset gaps, topic recreation, and out-of-range offsets. The behavior when data loss is detected depends on the " +
189+
"auto.offset.reset strategy: 'none' will throw a DataLossException, while 'earliest' and 'latest' will log warnings and attempt recovery.";
190+
public static final boolean DEFAULT_ENABLE_DATA_LOSS_DETECTION = false;
191+
192+
/**
193+
* <code>data.loss.detection.gap.threshold</code>
194+
*/
195+
public static final String DATA_LOSS_DETECTION_GAP_THRESHOLD_CONFIG = "data.loss.detection.gap.threshold";
196+
public static final String DATA_LOSS_DETECTION_GAP_THRESHOLD_DOC = "The maximum allowed offset gap before considering it a potential data loss scenario. " +
197+
"Smaller values provide stricter detection but may cause false positives during normal retention. Larger values are more lenient but may miss actual data loss.";
198+
public static final long DEFAULT_DATA_LOSS_DETECTION_GAP_THRESHOLD = 1000L;
199+
200+
/**
201+
* <code>data.loss.detection.validation.interval.ms</code>
202+
*/
203+
public static final String DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS_CONFIG = "data.loss.detection.validation.interval.ms";
204+
public static final String DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS_DOC = "The interval in milliseconds between continuous data loss validation checks during normal consumption. " +
205+
"This helps detect silent data loss due to retention policies between normal poll operations.";
206+
public static final long DEFAULT_DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS = 30000L; // 30 seconds
207+
208+
/**
209+
* <code>data.loss.detection.grace.period.ms</code>
210+
*/
211+
public static final String DATA_LOSS_DETECTION_GRACE_PERIOD_MS_CONFIG = "data.loss.detection.grace.period.ms";
212+
public static final String DATA_LOSS_DETECTION_GRACE_PERIOD_MS_DOC = "Grace period in milliseconds to avoid false positives during topic recreation or broker maintenance. " +
213+
"Suspected data loss events within this period after initialization will be logged as warnings instead of throwing exceptions.";
214+
public static final long DEFAULT_DATA_LOSS_DETECTION_GRACE_PERIOD_MS = 5000L; // 5 seconds
215+
183216
/**
184217
* <code>fetch.min.bytes</code>
185218
*/
@@ -546,6 +579,29 @@ public class ConsumerConfig extends AbstractConfig {
546579
new AutoOffsetResetStrategy.Validator(),
547580
Importance.MEDIUM,
548581
AUTO_OFFSET_RESET_DOC)
582+
.define(ENABLE_DATA_LOSS_DETECTION_CONFIG,
583+
Type.BOOLEAN,
584+
DEFAULT_ENABLE_DATA_LOSS_DETECTION,
585+
Importance.MEDIUM,
586+
ENABLE_DATA_LOSS_DETECTION_DOC)
587+
.define(DATA_LOSS_DETECTION_GAP_THRESHOLD_CONFIG,
588+
Type.LONG,
589+
DEFAULT_DATA_LOSS_DETECTION_GAP_THRESHOLD,
590+
atLeast(1),
591+
Importance.LOW,
592+
DATA_LOSS_DETECTION_GAP_THRESHOLD_DOC)
593+
.define(DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS_CONFIG,
594+
Type.LONG,
595+
DEFAULT_DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS,
596+
atLeast(1000),
597+
Importance.LOW,
598+
DATA_LOSS_DETECTION_VALIDATION_INTERVAL_MS_DOC)
599+
.define(DATA_LOSS_DETECTION_GRACE_PERIOD_MS_CONFIG,
600+
Type.LONG,
601+
DEFAULT_DATA_LOSS_DETECTION_GRACE_PERIOD_MS,
602+
atLeast(0),
603+
Importance.LOW,
604+
DATA_LOSS_DETECTION_GRACE_PERIOD_MS_DOC)
549605
.define(CHECK_CRCS_CONFIG,
550606
Type.BOOLEAN,
551607
true,

0 commit comments

Comments
 (0)