Skip to content

Commit d6c4996

Browse files
authored
feat(inkless:consume): add rate-limited fetch for lagging consumers (#467)
* feat(inkless): rate-limited hot/cold path for fetch operations Implements hot/cold path separation to protect Inkless from load spikes caused by lagging consumers while maintaining performance for recent consumers. Hot Path (Recent Data): - Uses object cache for fast repeated access - Dedicated executor pool - Prioritizes low-latency for up-to-date consumers Cold Path (Lagging Consumers): - Bypasses cache to avoid evicting hot data - Dedicated executor pool (default: half of hot-path pool) with bounded queue - Optional rate limiting (default 200 req/s) to control S3 costs and CPU usage - Separate storage client for resource isolation - AbortPolicy for backpressure: queue full → RejectedExecutionException → Kafka error handler → consumer backs off via fetch purgatory Path Selection: - Age threshold: Data older than cache TTL (default 60s) uses cold path - Configurable via fetch.lagging.consumer.threshold.ms (-1 = auto) - Based on batch timestamp (max across partitions in same object) - Handles future timestamps (clock skew) by treating as recent data Backpressure Mechanism: - Bounded queue prevents OOM (1600 tasks ≈ 3.2MB, ~8 seconds buffer) - AbortPolicy rejects tasks when full (no request handler blocking) - Only RejectedExecutionException tracked as rejections (capacity/backpressure) - InterruptedException treated as fetch operation failure, not rejection - Exceptions propagate to Kafka error handler (no custom empty responses) - Consumers receive error → fetch purgatory backoff → natural retry Fixes partial failure scenario: Reader#allOfFileExtents now handles partial failures gracefully and ensure that successful partitions are returned. Robustness Improvements: - Null safety: Validates laggingObjectFetcher is non-null when executor exists - Validation: FindBatchesJob validates response count matches request count - Edge cases: Handles future timestamps (clock skew) and empty batch coordinates - Resource management: Rate limiter only created when lagging consumer feature enabled Monitoring Metrics: - LaggingConsumerRequestRate: cold path request rate - LaggingConsumerRejectedRate: rejection events (queue full/shutdown) - LaggingRateLimitWaitTime: rate limiting latency histogram - RecentDataRequestRate: hot path request rate Configuration: - fetch.lagging.consumer.thread.pool.size (default: 16) Controls cold path concurrency - fetch.lagging.consumer.request.rate.limit (default: 200 req/s) Controls S3 request rate (set 0 to disable) - fetch.lagging.consumer.threshold.ms (default: -1 = cache TTL) Age threshold for hot/cold path selection -1 uses cache TTL automatically (validated to always be >= cache lifespan) Design Trade-offs: - Separate storage client doubles connection pools but prevents hot path degradation from cold path bursts - Cache bypass for cold path means no deduplication across lagging consumers, acceptable because objects are multi-partition blobs (future: split by partition may allow to consider enabling caching) * fixup! feat(inkless): rate-limited hot/cold path for fetch operations Reformat to align style and reorder methods for clarity * fixup! feat(inkless): rate-limited hot/cold path for fetch operations * fixup! feat(inkless): rate-limited hot/cold path for fetch operations Cover for potential bug where multiple file extents are found for the same object, and the first fails but the second succeeds, potentially returning incomplete data. This fixup includes the additional data to keep to enable an ordered response on the FetchCompleter, and properly serve up to the available data in order. * fixup! feat(inkless): rate-limited hot/cold path for fetch operations * fixup! feat(inkless): rate-limited hot/cold path for fetch operations Adding CRC check after reading from remote to ensure batch is valid.
1 parent 05ff0ac commit d6c4996

File tree

17 files changed

+3841
-652
lines changed

17 files changed

+3841
-652
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2453,6 +2453,7 @@ project(':storage:inkless') {
24532453
}
24542454
implementation libs.metrics
24552455
implementation libs.caffeine
2456+
implementation libs.bucket4jCore
24562457

24572458
testImplementation project(':clients').sourceSets.test.output.classesDirs
24582459
testImplementation project(':test-common')

docs/inkless/configs.rst

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,22 @@ Under ``inkless.``
3939
* Default: io.aiven.inkless.storage_backend.in_memory.InMemoryStorage
4040
* Importance: high
4141

42+
``fetch.lagging.consumer.request.rate.limit``
43+
Maximum requests per second for lagging consumer data fetches. Set to 0 to disable rate limiting. The upper bound of 10000 req/s is a safety limit to prevent misconfiguration. For high-throughput systems, consider the relationship between this rate limit, thread pool size, and storage backend capacity. At the default rate of 200 req/s with ~50ms per request latency, this allows ~10 concurrent requests.
44+
45+
* Type: int
46+
* Default: 200
47+
* Valid Values: [0,...,10000]
48+
* Importance: medium
49+
50+
``fetch.lagging.consumer.threshold.ms``
51+
The time threshold in milliseconds to distinguish between recent and lagging consumers. Fetch requests for data strictly older than this threshold (dataAge > threshold, based on batch timestamp) will use the lagging consumer path. Set to -1 to use the default heuristic: the cache expiration lifespan. This provides a grace period ensuring data remains in cache before being considered 'lagging', accounting for cache warm-up and typical consumer lag variations. Must be >= cache expiration lifespan (see consume.cache.expiration.lifespan.sec). This is a startup-only configuration (no dynamic reconfiguration support). Both threshold and cache lifespan must be set together at startup to maintain the constraint.
52+
53+
* Type: long
54+
* Default: -1
55+
* Valid Values: [-1,...]
56+
* Importance: medium
57+
4258
``object.key.prefix``
4359
The object storage key prefix. It cannot start of finish with a slash.
4460

@@ -125,8 +141,16 @@ Under ``inkless.``
125141
* Valid Values: [0,...]
126142
* Importance: low
127143

144+
``fetch.lagging.consumer.thread.pool.size``
145+
Thread pool size for lagging consumer fetch requests (consumers reading old data). Set to 0 to disable the lagging consumer feature (all requests will use the recent data path). The default value of 16 is designed as approximately half of the default fetch.data.thread.pool.size (32), providing sufficient capacity for typical cold storage access patterns while leaving headroom for the hot path. The queue capacity is automatically set to thread.pool.size * 100, providing burst buffering (e.g., 16 threads = 1600 queue capacity ≈ 8 seconds buffer at 200 req/s). Tune based on lagging consumer SLA and expected load patterns.
146+
147+
* Type: int
148+
* Default: 16
149+
* Valid Values: [0,...]
150+
* Importance: low
151+
128152
``fetch.metadata.thread.pool.size``
129-
Thread pool size to concurrently fetch metadata from batch coordinator
153+
Thread pool size to concurrently fetch metadata from batch coordinator. Note: This executor is shared between hot and cold path requests. The hot/cold path separation only applies to data fetching (after metadata is retrieved). A burst of lagging consumer requests can still compete with recent consumer requests at the metadata layer. For workloads with significant lagging consumer traffic, consider increasing this value proportionally to the combined fetch.data.thread.pool.size + fetch.lagging.consumer.thread.pool.size to prevent metadata fetching from becoming a bottleneck in mixed hot/cold workloads.
130154

131155
* Type: int
132156
* Default: 8

gradle/dependencies.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ versions += [
5858
awsSdk: "2.29.6",
5959
azureSdk: "1.2.28",
6060
bcpkix: "1.80",
61-
caffeine: "3.2.0",
6261
bndlib: "7.1.0",
62+
bucket4j: "8.14.0",
63+
caffeine: "3.2.0",
6364
checkstyle: project.hasProperty('checkstyleVersion') ? checkstyleVersion : "10.20.2",
6465
commonsBeanutils: "1.11.0",
6566
commonsValidator: "1.9.0",
@@ -164,6 +165,7 @@ libs += [
164165
azureSdkBom: "com.azure:azure-sdk-bom:$versions.azureSdk",
165166
bcpkix: "org.bouncycastle:bcpkix-jdk18on:$versions.bcpkix",
166167
bndlib:"biz.aQute.bnd:biz.aQute.bndlib:$versions.bndlib",
168+
bucket4jCore: "com.bucket4j:bucket4j_jdk11-core:$versions.bucket4j",
167169
caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
168170
classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
169171
commonsBeanutils: "commons-beanutils:commons-beanutils:$versions.commonsBeanutils",

storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java

Lines changed: 164 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.kafka.common.config.AbstractConfig;
2121
import org.apache.kafka.common.config.ConfigDef;
22+
import org.apache.kafka.common.config.ConfigException;
2223
import org.apache.kafka.common.metrics.Metrics;
2324

2425
import java.lang.reflect.InvocationTargetException;
@@ -136,9 +137,54 @@ public class InklessConfig extends AbstractConfig {
136137
private static final int FETCH_DATA_THREAD_POOL_SIZE_DEFAULT = 32;
137138

138139
public static final String FETCH_METADATA_THREAD_POOL_SIZE_CONFIG = "fetch.metadata.thread.pool.size";
139-
public static final String FETCH_METADATA_THREAD_POOL_SIZE_DOC = "Thread pool size to concurrently fetch metadata from batch coordinator";
140+
public static final String FETCH_METADATA_THREAD_POOL_SIZE_DOC = "Thread pool size to concurrently fetch metadata from batch coordinator. "
141+
+ "Note: This executor is shared between hot and cold path requests. The hot/cold path separation "
142+
+ "only applies to data fetching (after metadata is retrieved). A burst of lagging consumer requests "
143+
+ "can still compete with recent consumer requests at the metadata layer. For workloads with significant "
144+
+ "lagging consumer traffic, consider increasing this value proportionally to the combined "
145+
+ "fetch.data.thread.pool.size + fetch.lagging.consumer.thread.pool.size to prevent metadata fetching "
146+
+ "from becoming a bottleneck in mixed hot/cold workloads.";
140147
private static final int FETCH_METADATA_THREAD_POOL_SIZE_DEFAULT = 8;
141148

149+
public static final String FETCH_LAGGING_CONSUMER_THREAD_POOL_SIZE_CONFIG = "fetch.lagging.consumer.thread.pool.size";
150+
public static final String FETCH_LAGGING_CONSUMER_THREAD_POOL_SIZE_DOC = "Thread pool size for lagging consumer fetch requests (consumers reading old data). "
151+
+ "Set to 0 to disable the lagging consumer feature (all requests will use the recent data path). "
152+
+ "The default value of 16 is designed as approximately half of the default fetch.data.thread.pool.size (32), "
153+
+ "providing sufficient capacity for typical cold storage access patterns while leaving headroom for the hot path. "
154+
+ "The queue capacity is automatically set to thread.pool.size * 100, providing burst buffering "
155+
+ "(e.g., 16 threads = 1600 queue capacity ≈ 8 seconds buffer at 200 req/s). "
156+
+ "Tune based on lagging consumer SLA and expected load patterns.";
157+
// Default 16: Designed as half of default fetch.data.thread.pool.size (32), sufficient for typical
158+
// cold storage access patterns while leaving headroom for hot path. Tune based on lagging consumer SLA.
159+
private static final int FETCH_LAGGING_CONSUMER_THREAD_POOL_SIZE_DEFAULT = 16;
160+
161+
public static final String FETCH_LAGGING_CONSUMER_THRESHOLD_MS_CONFIG = "fetch.lagging.consumer.threshold.ms";
162+
public static final String FETCH_LAGGING_CONSUMER_THRESHOLD_MS_DOC = "The time threshold in milliseconds to distinguish between recent and lagging consumers. "
163+
+ "Fetch requests for data strictly older than this threshold (dataAge > threshold, based on batch timestamp) will use the lagging consumer path. "
164+
+ "Set to -1 to use the default heuristic: the cache expiration lifespan. "
165+
+ "This provides a grace period ensuring data remains in cache before being considered 'lagging', "
166+
+ "accounting for cache warm-up and typical consumer lag variations. "
167+
+ "Must be >= cache expiration lifespan (see " + CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_CONFIG + "). "
168+
+ "This is a startup-only configuration (no dynamic reconfiguration support). "
169+
+ "Both threshold and cache lifespan must be set together at startup to maintain the constraint.";
170+
/**
171+
* Default value for {@link #FETCH_LAGGING_CONSUMER_THRESHOLD_MS_CONFIG}.
172+
* A value of -1 means "auto-detect from cache TTL" - the {@link #fetchLaggingConsumerThresholdMs()} method
173+
* will automatically use the cache expiration lifespan as the effective threshold.
174+
*/
175+
private static final int FETCH_LAGGING_CONSUMER_THRESHOLD_MS_DEFAULT = -1;
176+
177+
public static final String FETCH_LAGGING_CONSUMER_REQUEST_RATE_LIMIT_CONFIG = "fetch.lagging.consumer.request.rate.limit";
178+
public static final String FETCH_LAGGING_CONSUMER_REQUEST_RATE_LIMIT_DOC = "Maximum requests per second for lagging consumer data fetches. "
179+
+ "Set to 0 to disable rate limiting. "
180+
+ "The upper bound of 10000 req/s is a safety limit to prevent misconfiguration. For high-throughput systems, "
181+
+ "consider the relationship between this rate limit, thread pool size, and storage backend capacity. "
182+
+ "At the default rate of 200 req/s with ~50ms per request latency, this allows ~10 concurrent requests.";
183+
// Default 200 req/s: Conservative limit based on typical object storage GET request costs and latency.
184+
// At ~50ms per request, 200 req/s = ~10 concurrent requests, balancing throughput with cost control.
185+
// Tune based on storage backend capacity and budget constraints.
186+
private static final int FETCH_LAGGING_CONSUMER_REQUEST_RATE_LIMIT_DEFAULT = 200;
187+
142188
public static final String FETCH_FIND_BATCHES_MAX_BATCHES_PER_PARTITION_CONFIG = "fetch.find.batches.max.per.partition";
143189
public static final String FETCH_FIND_BATCHES_MAX_BATCHES_PER_PARTITION_DOC = "The maximum number of batches to find per partition when processing a fetch request. "
144190
+ "A value of 0 means all available batches are fetched. "
@@ -322,6 +368,32 @@ public static ConfigDef configDef() {
322368
ConfigDef.Importance.LOW,
323369
FETCH_METADATA_THREAD_POOL_SIZE_DOC
324370
);
371+
configDef.define(
372+
FETCH_LAGGING_CONSUMER_THREAD_POOL_SIZE_CONFIG,
373+
ConfigDef.Type.INT,
374+
FETCH_LAGGING_CONSUMER_THREAD_POOL_SIZE_DEFAULT,
375+
ConfigDef.Range.atLeast(0),
376+
ConfigDef.Importance.LOW,
377+
FETCH_LAGGING_CONSUMER_THREAD_POOL_SIZE_DOC
378+
);
379+
configDef.define(
380+
FETCH_LAGGING_CONSUMER_THRESHOLD_MS_CONFIG,
381+
ConfigDef.Type.LONG,
382+
FETCH_LAGGING_CONSUMER_THRESHOLD_MS_DEFAULT,
383+
ConfigDef.Range.atLeast(-1),
384+
ConfigDef.Importance.MEDIUM,
385+
FETCH_LAGGING_CONSUMER_THRESHOLD_MS_DOC
386+
);
387+
configDef.define(
388+
FETCH_LAGGING_CONSUMER_REQUEST_RATE_LIMIT_CONFIG,
389+
ConfigDef.Type.INT,
390+
FETCH_LAGGING_CONSUMER_REQUEST_RATE_LIMIT_DEFAULT,
391+
ConfigDef.Range.between(0, 10000),
392+
// Safety limit to prevent misconfiguration. For high-throughput systems,
393+
// consider the relationship between this rate limit, thread pool size, and storage backend capacity.
394+
ConfigDef.Importance.MEDIUM,
395+
FETCH_LAGGING_CONSUMER_REQUEST_RATE_LIMIT_DOC
396+
);
325397
configDef.define(
326398
FETCH_FIND_BATCHES_MAX_BATCHES_PER_PARTITION_CONFIG,
327399
ConfigDef.Type.INT,
@@ -362,7 +434,69 @@ public InklessConfig(final AbstractConfig config) {
362434
}
363435

364436
public InklessConfig(final Map<String, ?> props) {
365-
super(configDef(), props);
437+
super(validate(props), props);
438+
}
439+
440+
private static ConfigDef validate(final Map<String, ?> props) {
441+
final ConfigDef configDef = configDef();
442+
// Parse the properties using ConfigDef directly for validation. This avoids creating a
443+
// temporary AbstractConfig instance while still leveraging the same parsing and defaulting
444+
// logic that AbstractConfig would use. Note: We still parse twice (once here, once in super()),
445+
// but this avoids the overhead of creating an AbstractConfig instance. This is necessary to
446+
// avoid 'this-escape' warnings in JDK 23+ and ensure super() is the first statement for JDK 17.
447+
// The performance impact is minimal since config parsing only happens at startup.
448+
final Map<String, Object> parsedProps = configDef.parse(props);
449+
450+
final long thresholdMs =
451+
((Number) parsedProps.get(FETCH_LAGGING_CONSUMER_THRESHOLD_MS_CONFIG)).longValue();
452+
final int cacheLifespanSec =
453+
((Number) parsedProps.get(CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_CONFIG)).intValue();
454+
final long lifespanMs = Duration.ofSeconds(cacheLifespanSec).toMillis();
455+
456+
// Validate threshold is not less than cache lifespan (unless using default heuristic).
457+
// If threshold < cache lifespan, we'd route requests for potentially cached data to the
458+
// cold path, defeating the cache and unnecessarily loading the cold path/storage backend.
459+
//
460+
// Note: This validation occurs at construction time. These configurations are startup-only
461+
// and do not support dynamic reconfiguration. Both threshold and cache lifespan must be set
462+
// together at startup to maintain the constraint that threshold >= cache lifespan.
463+
//
464+
// Explicitly reject threshold=0 with a clear error message: While threshold=0 would always fail
465+
// the cache lifespan validation below (since minimum cache lifespan is 10 seconds = 10000ms),
466+
// we check it explicitly here to provide a more specific error message that explains why 0 is
467+
// invalid. With threshold=0, the runtime check (dataAge > threshold) would route almost all cached
468+
// data (anything with dataAge > 0) to the cold path, defeating the cache. Only data with
469+
// dataAge == 0 would use the hot path, which is negligible.
470+
if (thresholdMs == 0) {
471+
throw new ConfigException(
472+
FETCH_LAGGING_CONSUMER_THRESHOLD_MS_CONFIG,
473+
thresholdMs,
474+
"Lagging consumer threshold cannot be 0. Use -1 to auto-detect from cache TTL, or set a value >= cache lifespan ("
475+
+ lifespanMs + "ms). Threshold=0 would route almost all cached data to the cold path, defeating the cache."
476+
);
477+
}
478+
//
479+
// Minimum allowed value: threshold == cache lifespan (>=, not >) is valid because:
480+
// - The runtime check uses dataAge > threshold (strictly greater), so dataAge == threshold uses hot path
481+
// - Data can still be in cache at exactly TTL seconds old (cache expiration runs periodically)
482+
// - With threshold == cache lifespan, when dataAge == cache lifespan, data might still be cached
483+
// and correctly uses hot path. When dataAge > cache lifespan, data is expired and uses cold path.
484+
// This ensures we only route data to cold path after it's guaranteed to be expired from cache.
485+
//
486+
// Special case: thresholdMs == -1 is explicitly excluded from validation (condition checks != -1)
487+
// because fetchLaggingConsumerThresholdMs() will automatically use cache lifespan as the effective
488+
// runtime value, which is always >= cache lifespan by definition. This design allows operators
489+
// to use -1 as a "use cache TTL" heuristic without needing to know the exact cache lifespan value.
490+
if (thresholdMs != -1 && thresholdMs < lifespanMs) {
491+
throw new ConfigException(
492+
FETCH_LAGGING_CONSUMER_THRESHOLD_MS_CONFIG,
493+
thresholdMs,
494+
"Lagging consumer threshold (" + thresholdMs + "ms) must be >= cache lifespan ("
495+
+ lifespanMs + "ms) to avoid routing requests for cached data to the lagging path."
496+
);
497+
}
498+
499+
return configDef;
366500
}
367501

368502
@SuppressWarnings("unchecked")
@@ -459,6 +593,34 @@ public int fetchMetadataThreadPoolSize() {
459593
return getInt(FETCH_METADATA_THREAD_POOL_SIZE_CONFIG);
460594
}
461595

596+
public int fetchLaggingConsumerThreadPoolSize() {
597+
return getInt(FETCH_LAGGING_CONSUMER_THREAD_POOL_SIZE_CONFIG);
598+
}
599+
600+
/**
601+
* Returns the effective lagging consumer threshold in milliseconds.
602+
* <p>
603+
* If the configured value is -1 (auto), this method returns the cache expiration lifespan,
604+
* which serves as the default heuristic. This ensures the effective threshold is always >= cache
605+
* lifespan, which is why validation in the constructor skips threshold=-1 (it will automatically
606+
* use cache lifespan at runtime).
607+
* </p>
608+
*
609+
* @return the effective threshold in milliseconds (cache lifespan if configured as -1, otherwise the configured value)
610+
*/
611+
public long fetchLaggingConsumerThresholdMs() {
612+
final long configuredValue = getLong(FETCH_LAGGING_CONSUMER_THRESHOLD_MS_CONFIG);
613+
if (configuredValue == -1) {
614+
// Use heuristic: cache TTL (provides grace period for recent data)
615+
return Duration.ofSeconds(getInt(CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_CONFIG)).toMillis();
616+
}
617+
return configuredValue;
618+
}
619+
620+
public int fetchLaggingConsumerRequestRateLimit() {
621+
return getInt(FETCH_LAGGING_CONSUMER_REQUEST_RATE_LIMIT_CONFIG);
622+
}
623+
462624
public int maxBatchesPerPartitionToFind() {
463625
return getInt(FETCH_FIND_BATCHES_MAX_BATCHES_PER_PARTITION_CONFIG);
464626
}

0 commit comments

Comments
 (0)