2323import org .apache .kafka .common .metrics .Metrics ;
2424import org .apache .kafka .common .metrics .MetricsReporter ;
2525import org .apache .kafka .common .utils .Time ;
26+ import org .apache .kafka .common .utils .Utils ;
2627import org .apache .kafka .storage .internals .log .LogConfig ;
2728import org .apache .kafka .storage .log .metrics .BrokerTopicStats ;
2829
2930import java .io .Closeable ;
3031import java .io .IOException ;
3132import java .time .Duration ;
3233import java .util .List ;
34+ import java .util .Optional ;
3335import java .util .function .Supplier ;
3436
3537import io .aiven .inkless .cache .BatchCoordinateCache ;
@@ -52,6 +54,18 @@ public final class SharedState implements Closeable {
5254 private final InklessConfig config ;
5355 private final MetadataView metadata ;
5456 private final ControlPlane controlPlane ;
57+ private final StorageBackend fetchStorage ;
58+ // Separate storage client for lagging consumers to:
59+ // 1. Isolate connection pool usage (lagging consumers shouldn't exhaust connections for hot path)
60+ // 2. Allow independent tuning of timeouts/retries for cold storage access patterns
61+ // (This requires some refactoring on how the storage client is built/configured)
62+ private final Optional <StorageBackend > maybeLaggingFetchStorage ;
63+ private final StorageBackend produceStorage ;
64+ // backgroundStorage is shared by FileCleaner and FileMerger executors which run concurrently.
65+ // Kafka storage backends are required to be thread-safe (they share the same Metrics instance).
66+ // However, these tasks perform high-latency object storage calls and retries. A dedicated backend
67+ // instance guarantees they don't contend with hot-path fetch/produce clients and prevents threading/double-close issues.
68+ private final StorageBackend backgroundStorage ;
5569 private final ObjectKeyCreator objectKeyCreator ;
5670 private final KeyAlignmentStrategy keyAlignmentStrategy ;
5771 private final ObjectCache cache ;
@@ -60,12 +74,17 @@ public final class SharedState implements Closeable {
6074 private final Supplier <LogConfig > defaultTopicConfigs ;
6175 private final Metrics storageMetrics ;
6276
63- public SharedState (
77+ private SharedState (
6478 final Time time ,
6579 final int brokerId ,
6680 final InklessConfig config ,
6781 final MetadataView metadata ,
6882 final ControlPlane controlPlane ,
83+ final StorageBackend fetchStorage ,
84+ final Optional <StorageBackend > maybeLaggingFetchStorage ,
85+ final StorageBackend produceStorage ,
86+ final StorageBackend backgroundStorage ,
87+ final Metrics storageMetrics ,
6988 final ObjectKeyCreator objectKeyCreator ,
7089 final KeyAlignmentStrategy keyAlignmentStrategy ,
7190 final ObjectCache cache ,
@@ -84,12 +103,11 @@ public SharedState(
84103 this .batchCoordinateCache = batchCoordinateCache ;
85104 this .brokerTopicStats = brokerTopicStats ;
86105 this .defaultTopicConfigs = defaultTopicConfigs ;
87-
88- final MetricsReporter reporter = new JmxReporter ();
89- this .storageMetrics = new Metrics (
90- new MetricConfig (), List .of (reporter ), Time .SYSTEM ,
91- new KafkaMetricsContext (STORAGE_METRIC_CONTEXT )
92- );
106+ this .fetchStorage = fetchStorage ;
107+ this .maybeLaggingFetchStorage = maybeLaggingFetchStorage ;
108+ this .produceStorage = produceStorage ;
109+ this .backgroundStorage = backgroundStorage ;
110+ this .storageMetrics = storageMetrics ;
93111 }
94112
95113 public static SharedState initialize (
@@ -107,34 +125,88 @@ public static SharedState initialize(
107125 "Value of consume.batch.coordinate.cache.ttl.ms exceeds file.cleaner.retention.period.ms / 2"
108126 );
109127 }
110- return new SharedState (
111- time ,
112- brokerId ,
113- config ,
114- metadata ,
115- controlPlane ,
116- ObjectKey .creator (config .objectKeyPrefix (), config .objectKeyLogPrefixMasked ()),
117- new FixedBlockAlignment (config .fetchCacheBlockBytes ()),
118- new CaffeineCache (
128+
129+ CaffeineCache objectCache = null ;
130+ BatchCoordinateCache batchCoordinateCache = null ;
131+ StorageBackend fetchStorage = null ;
132+ StorageBackend laggingFetchStorage = null ;
133+ StorageBackend produceStorage = null ;
134+ StorageBackend backgroundStorage = null ;
135+ Metrics storageMetrics = null ;
136+ try {
137+ objectCache = new CaffeineCache (
119138 config .cacheMaxCount (),
120139 config .cacheExpirationLifespanSec (),
121140 config .cacheExpirationMaxIdleSec ()
122- ),
123- config .isBatchCoordinateCacheEnabled () ? new CaffeineBatchCoordinateCache (config .batchCoordinateCacheTtl ()) : new NullBatchCoordinateCache (),
124- brokerTopicStats ,
125- defaultTopicConfigs
126- );
141+ );
142+ batchCoordinateCache = config .isBatchCoordinateCacheEnabled ()
143+ ? new CaffeineBatchCoordinateCache (config .batchCoordinateCacheTtl ())
144+ : new NullBatchCoordinateCache ();
145+
146+ final MetricsReporter reporter = new JmxReporter ();
147+ storageMetrics = new Metrics (
148+ new MetricConfig (), List .of (reporter ), Time .SYSTEM ,
149+ new KafkaMetricsContext (STORAGE_METRIC_CONTEXT )
150+ );
151+ fetchStorage = config .storage (storageMetrics );
152+ // If thread pool size is 0, disabling lagging consumer support, don't create a separate client
153+ //
154+ // NOTE: The client for lagging consumers is created only when this SharedState
155+ // is constructed. If fetchLaggingConsumerThreadPoolSize() is 0 at this time, no separate
156+ // client is created and lagging consumer support is effectively disabled for the lifetime
157+ // of this instance, even if the configuration is later reloaded with a non-zero value.
158+ // Enabling lagging consumer support therefore requires a broker restart (or reconstruction
159+ // of the SharedState) so that a new storage client can be created.
160+ laggingFetchStorage = config .fetchLaggingConsumerThreadPoolSize () > 0 ? config .storage (storageMetrics ) : null ;
161+ produceStorage = config .storage (storageMetrics );
162+ backgroundStorage = config .storage (storageMetrics );
163+ final var objectKeyCreator = ObjectKey .creator (config .objectKeyPrefix (), config .objectKeyLogPrefixMasked ());
164+ final var keyAlignmentStrategy = new FixedBlockAlignment (config .fetchCacheBlockBytes ());
165+ return new SharedState (
166+ time ,
167+ brokerId ,
168+ config ,
169+ metadata ,
170+ controlPlane ,
171+ fetchStorage ,
172+ Optional .ofNullable (laggingFetchStorage ),
173+ produceStorage ,
174+ backgroundStorage ,
175+ storageMetrics ,
176+ objectKeyCreator ,
177+ keyAlignmentStrategy ,
178+ objectCache ,
179+ batchCoordinateCache ,
180+ brokerTopicStats ,
181+ defaultTopicConfigs
182+ );
183+ } catch (Exception e ) {
184+ Utils .closeQuietly (backgroundStorage , "backgroundStorage" );
185+ Utils .closeQuietly (produceStorage , "produceStorage" );
186+ Utils .closeQuietly (fetchStorage , "fetchStorage" );
187+ Utils .closeQuietly (laggingFetchStorage , "laggingFetchStorage" );
188+ Utils .closeQuietly (batchCoordinateCache , "batchCoordinateCache" );
189+ Utils .closeQuietly (objectCache , "objectCache" );
190+ Utils .closeQuietly (storageMetrics , "storageMetrics" );
191+
192+ throw new RuntimeException ("Failed to initialize SharedState" , e );
193+ }
127194 }
128195
129196 @ Override
130197 public void close () throws IOException {
131- try {
132- cache .close ();
133- controlPlane .close ();
134- storageMetrics .close ();
135- } catch (Exception e ) {
136- throw new RuntimeException (e );
137- }
198+ // Closing storage backends
199+ Utils .closeQuietly (backgroundStorage , "backgroundStorage" );
200+ Utils .closeQuietly (produceStorage , "produceStorage" );
201+ maybeLaggingFetchStorage .ifPresent (s -> Utils .closeQuietly (s , "laggingFetchStorage" ));
202+ Utils .closeQuietly (fetchStorage , "fetchStorage" );
203+ // Closing storage metrics
204+ Utils .closeQuietly (storageMetrics , "storageMetrics" );
205+ // Closing caches
206+ Utils .closeQuietly (cache , "objectCache" );
207+ Utils .closeQuietly (batchCoordinateCache , "batchCoordinateCache" );
208+ // Closing control plane
209+ Utils .closeQuietly (controlPlane , "controlPlane" );
138210 }
139211
140212 public Time time () {
@@ -185,7 +257,25 @@ public Supplier<LogConfig> defaultTopicConfigs() {
185257 return defaultTopicConfigs ;
186258 }
187259
188- public StorageBackend buildStorage () {
189- return config .storage (storageMetrics );
260+ public StorageBackend fetchStorage () {
261+ return fetchStorage ;
262+ }
263+
264+ /**
265+ * Optional access to the lagging fetch storage backend.
266+ *
267+ * <p>When {@code fetch.lagging.consumer.thread.pool.size == 0}, the lagging consumer
268+ * path is explicitly disabled and this storage backend is not created.</p>
269+ */
270+ public Optional <StorageBackend > maybeLaggingFetchStorage () {
271+ return maybeLaggingFetchStorage ;
272+ }
273+
274+ public StorageBackend produceStorage () {
275+ return produceStorage ;
276+ }
277+
278+ public StorageBackend backgroundStorage () {
279+ return backgroundStorage ;
190280 }
191281}
0 commit comments