Skip to content

Commit 05ff0ac

Browse files
authored
refactor(inkless:consume): only remote fetches to run on data executor (#466)
* refactor(inkless:consume): only remote fetches to run on data executor Refactor fetch planning by changing how fetch operations are scheduled on the data executor. It only assigns the data executor for remote fetch operations and let the calling thread to run the cache.get(), avoiding the scenario where calls to cache are blocked by all data executor threads being used. By doing this, the CacheFetchJob component becomes irrelevant as most of the logic is already on the planning, even the scheduling decision. Changes: - Delete CacheFetchJob - Extend ObjectCache to allow passing the executor to run the load function - Refactor the FetchPlanner to use the new approach - Update documentation on FetchPlanner and Reader to clarify how stages are ran - Move CacheFetchJobTest to FetchPlannerTest to keep and improve coverage * fixup! refactor(inkless:consume): only remote fetches to run on data executor Remove the temporal merged as this PR does not use bytes or timestamp. Let that for the following PRs to define * fixup! refactor(inkless:consume): only remote fetches to run on data executor Apply suggestion on simplified test * fixup! refactor(inkless:consume): only remote fetches to run on data executor Improve error handling on cache async get * fixup! refactor(inkless:consume): only remote fetches to run on data executor Improve comment as suggested * fixup! refactor(inkless:consume): only remote fetches to run on data executor Further refactoring: Use same constructor as existing methods * fixup! refactor(inkless:consume): only remote fetches to run on data executor * fixup! refactor(inkless:consume): only remote fetches to run on data executor * fixup! refactor(inkless:consume): only remote fetches to run on data executor * fixup! refactor(inkless:consume): only remote fetches to run on data executor * fixup! refactor(inkless:consume): only remote fetches to run on data executor
1 parent 3218448 commit 05ff0ac

File tree

10 files changed

+624
-361
lines changed

10 files changed

+624
-361
lines changed

storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCache.java

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Inkless
3+
* Copyright (C) 2024 - 2025 Aiven OY
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as published by
7+
* the Free Software Foundation, either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
17+
*/
118
package io.aiven.inkless.cache;
219

320
import com.github.benmanes.caffeine.cache.AsyncCache;
@@ -7,6 +24,7 @@
724
import java.io.IOException;
825
import java.time.Duration;
926
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.Executor;
1028
import java.util.function.Function;
1129

1230
import io.aiven.inkless.generated.CacheKey;
@@ -26,7 +44,8 @@ public final class CaffeineCache implements ObjectCache {
2644
public CaffeineCache(
2745
final long maxCacheSize,
2846
final long lifespanSeconds,
29-
final int maxIdleSeconds) {
47+
final int maxIdleSeconds
48+
) {
3049
cache = Caffeine.newBuilder()
3150
.maximumSize(maxCacheSize)
3251
.expireAfterWrite(Duration.ofSeconds(lifespanSeconds))
@@ -42,20 +61,30 @@ public void close() throws IOException {
4261
}
4362

4463
@Override
45-
public FileExtent computeIfAbsent(final CacheKey key, final Function<CacheKey, FileExtent> mappingFunction) {
46-
final CompletableFuture<FileExtent> future = new CompletableFuture<>();
47-
final CompletableFuture<FileExtent> existingFuture = cache.asMap().computeIfAbsent(key, (cacheKey) -> {
48-
return future;
64+
public CompletableFuture<FileExtent> computeIfAbsent(
65+
final CacheKey key,
66+
final Function<CacheKey, FileExtent> load,
67+
final Executor loadExecutor
68+
) {
69+
// Caffeine's AsyncCache.get() provides atomic cache population per key.
70+
// When multiple threads concurrently request the same uncached key, the mapping function
71+
// is invoked only once, and all waiting threads receive the same CompletableFuture.
72+
// This guarantees that the load function is called at most once per key for successful operations,
73+
// preventing duplicate fetch operations from object storage. Failed loads are invalidated and may be retried.
74+
return cache.get(key, (k, defaultExecutor) -> {
75+
// Use the provided executor instead of Caffeine's default executor.
76+
// This allows us to control which thread pool handles the fetch and blocks there,
77+
// while Caffeine's internal threads remain unblocked, so cache operations can continue to be served.
78+
return CompletableFuture.supplyAsync(() -> load.apply(k), loadExecutor)
79+
.whenComplete((result, throwable) -> {
80+
// Evict the entry if the future completed exceptionally.
81+
// While Caffeine has built-in failed future cleanup, it happens asynchronously.
82+
// Explicit invalidation ensures immediate removal for faster retry on subsequent requests.
83+
if (throwable != null) {
84+
cache.synchronous().invalidate(k);
85+
}
86+
});
4987
});
50-
// If existing future is not the same object as created in this function
51-
// there was a pending cache load and this call is required to join the existing future
52-
// and discard the created one.
53-
if (future != existingFuture) {
54-
return existingFuture.join();
55-
}
56-
final FileExtent fileExtent = mappingFunction.apply(key);
57-
future.complete(fileExtent);
58-
return fileExtent;
5988
}
6089

6190
@Override

storage/inkless/src/main/java/io/aiven/inkless/cache/CaffeineCacheMetrics.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Inkless
3+
* Copyright (C) 2024 - 2025 Aiven OY
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as published by
7+
* the Free Software Foundation, either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
17+
*/
118
package io.aiven.inkless.cache;
219

320
import org.apache.kafka.common.MetricNameTemplate;

storage/inkless/src/main/java/io/aiven/inkless/cache/MemoryCache.java

Lines changed: 0 additions & 68 deletions
This file was deleted.

storage/inkless/src/main/java/io/aiven/inkless/cache/ObjectCache.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,29 @@
2020
import org.apache.kafka.common.cache.Cache;
2121

2222
import java.io.Closeable;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.Executor;
2325
import java.util.function.Function;
2426

2527
import io.aiven.inkless.generated.CacheKey;
2628
import io.aiven.inkless.generated.FileExtent;
2729

2830
public interface ObjectCache extends Cache<CacheKey, FileExtent>, Closeable {
29-
FileExtent computeIfAbsent(CacheKey key, Function<CacheKey, FileExtent> mappingFunction);
31+
/**
32+
* Asynchronously computes the value if absent, using the provided executor for the computation.
33+
* This method always returns a {@link CompletableFuture} promptly, regardless of whether the key
34+
* is already present in the cache. The cache lookup and bookkeeping may still perform synchronous
35+
* work on the calling thread, but on a cache miss the actual load computation is executed on the
36+
* supplied {@code loadExecutor}.
37+
*
38+
* @param key the cache key
39+
* @param load the function to compute the value if absent
40+
* @param loadExecutor the executor to use for async computation on cache miss
41+
* @return a CompletableFuture that will complete with the cached or computed value
42+
*/
43+
CompletableFuture<FileExtent> computeIfAbsent(
44+
CacheKey key,
45+
Function<CacheKey, FileExtent> load,
46+
Executor loadExecutor
47+
);
3048
}

storage/inkless/src/main/java/io/aiven/inkless/consume/CacheFetchJob.java

Lines changed: 0 additions & 111 deletions
This file was deleted.

0 commit comments

Comments
 (0)