Skip to content

Commit 47e8e8b

Browse files
committed
feat(cache:segment-manifest): Add JMX mbean to clean segment manifest cache
1 parent 2bc2fa9 commit 47e8e8b

File tree

8 files changed

+163
-7
lines changed

8 files changed

+163
-7
lines changed

checkstyle/suppressions.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
<suppress checks="ClassFanOutComplexity" files=".*Test\.java"/>
2424
<suppress checks="ClassFanOutComplexity" files="RemoteStorageManager.java"/>
2525
<suppress checks="ClassFanOutComplexity" files="ChunkCache.java"/>
26+
<suppress checks="ClassFanOutComplexity" files="SegmentManifestProvider.java"/>
2627
<suppress checks="ClassDataAbstractionCoupling" files="CaffeineStatsCounter.java"/>
2728
<suppress checks="ClassDataAbstractionCoupling" files="S3StorageConfig.java"/>
2829
<suppress checks="ClassDataAbstractionCoupling" files="RemoteStorageManager.java"/>

core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ public void configure(final Map<String, ?> configs) {
155155
config.segmentManifestCacheRetention(),
156156
fetcher,
157157
mapper,
158-
executor);
158+
executor,
159+
config.enableJmxOperations());
159160
}
160161

161162
// for testing

core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,11 @@ public class RemoteStorageManagerConfig extends AbstractConfig {
4444
private static final String OBJECT_KEY_PREFIX_CONFIG = "key.prefix";
4545
private static final String OBJECT_KEY_PREFIX_DOC = "The object storage path prefix";
4646

47+
private static final String ENABLE_JMX_OPERATIONS_CONFIG = "enable.jmx";
48+
private static final String ENABLE_JMX_OPERATIONS_DOC = "Enable JMX MBeans operations to manage caches";
49+
4750
private static final String SEGMENT_MANIFEST_CACHE_PREFIX = "segment.manifest.cache.";
48-
private static final String SEGMENT_MANIFEST_CACHE_SIZE_CONFIG = SEGMENT_MANIFEST_CACHE_PREFIX + "size";
51+
public static final String SEGMENT_MANIFEST_CACHE_SIZE_CONFIG = SEGMENT_MANIFEST_CACHE_PREFIX + "size";
4952
private static final Long SEGMENT_MANIFEST_CACHE_SIZE_DEFAULT = 1000L; // TODO consider a better default
5053
private static final String SEGMENT_MANIFEST_CACHE_SIZE_DOC =
5154
"The size in items of the segment manifest cache. "
@@ -107,6 +110,14 @@ public class RemoteStorageManagerConfig extends AbstractConfig {
107110
OBJECT_KEY_PREFIX_DOC
108111
);
109112

113+
CONFIG.define(
114+
ENABLE_JMX_OPERATIONS_CONFIG,
115+
ConfigDef.Type.BOOLEAN,
116+
false,
117+
ConfigDef.Importance.LOW,
118+
ENABLE_JMX_OPERATIONS_DOC
119+
);
120+
110121
CONFIG.define(
111122
SEGMENT_MANIFEST_CACHE_SIZE_CONFIG,
112123
ConfigDef.Type.LONG,
@@ -314,6 +325,10 @@ public Optional<Duration> segmentManifestCacheRetention() {
314325
return Optional.of(Duration.ofMillis(rawValue));
315326
}
316327

328+
public boolean enableJmxOperations() {
329+
return getBoolean(ENABLE_JMX_OPERATIONS_CONFIG);
330+
}
331+
317332
public String keyPrefix() {
318333
return getString(OBJECT_KEY_PREFIX_CONFIG);
319334
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.tieredstorage.manifest;
18+
19+
public interface SegmentManifestCacheManager {
20+
String MBEAN_NAME = "aiven.kafka.server.tieredstorage.cache:type=segment-manifest-cache-manager";
21+
22+
void clean();
23+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.tieredstorage.manifest;
18+
19+
public class SegmentManifestCacheManagerMBean implements SegmentManifestCacheManager {
20+
21+
final SegmentManifestProvider provider;
22+
23+
public SegmentManifestCacheManagerMBean(final SegmentManifestProvider segmentManifestProvider) {
24+
this.provider = segmentManifestProvider;
25+
}
26+
27+
@Override
28+
public void clean() {
29+
provider.cache().invalidateAll();
30+
}
31+
}

core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProvider.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,16 @@
1616

1717
package io.aiven.kafka.tieredstorage.manifest;
1818

19+
import javax.management.InstanceAlreadyExistsException;
20+
import javax.management.MBeanRegistrationException;
21+
import javax.management.MalformedObjectNameException;
22+
import javax.management.NotCompliantMBeanException;
23+
import javax.management.ObjectName;
24+
import javax.management.StandardMBean;
25+
1926
import java.io.IOException;
2027
import java.io.InputStream;
28+
import java.lang.management.ManagementFactory;
2129
import java.time.Duration;
2230
import java.util.Optional;
2331
import java.util.concurrent.ExecutionException;
@@ -31,9 +39,13 @@
3139

3240
import com.fasterxml.jackson.databind.ObjectMapper;
3341
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
42+
import com.github.benmanes.caffeine.cache.Cache;
3443
import com.github.benmanes.caffeine.cache.Caffeine;
44+
import org.slf4j.Logger;
45+
import org.slf4j.LoggerFactory;
3546

3647
public class SegmentManifestProvider {
48+
private static final Logger LOG = LoggerFactory.getLogger(SegmentManifestProvider.class);
3749
private static final String SEGMENT_MANIFEST_METRIC_GROUP_NAME = "segment-manifest-cache";
3850
private static final long GET_TIMEOUT_SEC = 10;
3951

@@ -47,7 +59,8 @@ public SegmentManifestProvider(final Optional<Long> maxCacheSize,
4759
final Optional<Duration> cacheRetention,
4860
final ObjectFetcher fileFetcher,
4961
final ObjectMapper mapper,
50-
final Executor executor) {
62+
final Executor executor,
63+
final boolean enableJmxOperations) {
5164
final var statsCounter = new CaffeineStatsCounter(SEGMENT_MANIFEST_METRIC_GROUP_NAME);
5265
final var cacheBuilder = Caffeine.newBuilder()
5366
.recordStats(() -> statsCounter)
@@ -60,6 +73,29 @@ public SegmentManifestProvider(final Optional<Long> maxCacheSize,
6073
}
6174
});
6275
statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize);
76+
if (enableJmxOperations) {
77+
enableJmxMBean();
78+
}
79+
}
80+
81+
private void enableJmxMBean() {
82+
final var mbeanName = SegmentManifestCacheManager.MBEAN_NAME;
83+
try {
84+
final var name = new ObjectName(mbeanName);
85+
final var mbean = new StandardMBean(
86+
new SegmentManifestCacheManagerMBean(this),
87+
SegmentManifestCacheManager.class);
88+
ManagementFactory.getPlatformMBeanServer().registerMBean(mbean, name);
89+
} catch (NotCompliantMBeanException
90+
| MalformedObjectNameException
91+
| InstanceAlreadyExistsException
92+
| MBeanRegistrationException e) {
93+
LOG.warn("Error creating MBean {}", mbeanName, e);
94+
}
95+
}
96+
97+
Cache<String, SegmentManifest> cache() {
98+
return cache.synchronous();
6399
}
64100

65101
public SegmentManifest get(final String manifestKey)

core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ void minimalConfig() {
5050
assertThat(config.encryptionKeyPairId()).isNull();
5151
assertThat(config.encryptionKeyRing()).isNull();
5252
assertThat(config.keyPrefix()).isEmpty();
53+
assertThat(config.enableJmxOperations()).isFalse();
5354
}
5455

5556
@Test
@@ -294,7 +295,6 @@ void invalidChunkSizeRange() {
294295
.hasMessage("Invalid value 2147483648 for configuration chunk.size: Not a number of type INT");
295296
}
296297

297-
298298
@Test
299299
void invalidCompressionConfig() {
300300
assertThatThrownBy(() -> new RemoteStorageManagerConfig(
@@ -307,4 +307,15 @@ void invalidCompressionConfig() {
307307
.isInstanceOf(ConfigException.class)
308308
.hasMessage("compression.enabled must be enabled if compression.heuristic.enabled is");
309309
}
310+
311+
@Test
312+
void enableJmx() {
313+
final HashMap<String, Object> props = new HashMap<>();
314+
props.put("storage.backend.class", NoopStorageBackend.class);
315+
props.put("chunk.size", "123");
316+
props.put("enable.jmx", true);
317+
final RemoteStorageManagerConfig config = new RemoteStorageManagerConfig(props);
318+
assertThat(config.enableJmxOperations()).isTrue();
319+
}
320+
310321
}

core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616

1717
package io.aiven.kafka.tieredstorage.manifest;
1818

19+
import javax.management.ObjectName;
20+
1921
import java.io.ByteArrayInputStream;
2022
import java.io.IOException;
2123
import java.io.InputStream;
24+
import java.lang.management.ManagementFactory;
2225
import java.time.Duration;
2326
import java.util.Optional;
2427
import java.util.concurrent.ForkJoinPool;
@@ -40,6 +43,7 @@
4043
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4144
import static org.mockito.ArgumentMatchers.anyString;
4245
import static org.mockito.Mockito.doAnswer;
46+
import static org.mockito.Mockito.times;
4347
import static org.mockito.Mockito.verify;
4448
import static org.mockito.Mockito.verifyNoMoreInteractions;
4549
import static org.mockito.Mockito.when;
@@ -68,23 +72,23 @@ class SegmentManifestProviderTest {
6872
void setup() {
6973
provider = new SegmentManifestProvider(
7074
Optional.of(1000L), Optional.empty(), storage, MAPPER,
71-
ForkJoinPool.commonPool());
75+
ForkJoinPool.commonPool(), false);
7276
}
7377

7478
@Test
7579
void unboundedShouldBeCreated() {
7680
assertThatNoException()
7781
.isThrownBy(() -> new SegmentManifestProvider(
7882
Optional.empty(), Optional.of(Duration.ofMillis(1)), storage, MAPPER,
79-
ForkJoinPool.commonPool()));
83+
ForkJoinPool.commonPool(), false));
8084
}
8185

8286
@Test
8387
void withoutRetentionLimitsShouldBeCreated() {
8488
assertThatNoException()
8589
.isThrownBy(() -> new SegmentManifestProvider(
8690
Optional.of(1L), Optional.empty(), storage, MAPPER,
87-
ForkJoinPool.commonPool()));
91+
ForkJoinPool.commonPool(), false));
8892
}
8993

9094
@Test
@@ -102,6 +106,40 @@ void shouldReturnAndCache() throws StorageBackendException, IOException {
102106
verifyNoMoreInteractions(storage);
103107
}
104108

109+
@Test
110+
void invalidateCache_jmx() throws Exception {
111+
provider = new SegmentManifestProvider(
112+
Optional.of(1000L), Optional.empty(), storage, MAPPER,
113+
ForkJoinPool.commonPool(), true);
114+
115+
final String key = "topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000000023-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest";
116+
final SegmentManifestV1 expectedManifest = new SegmentManifestV1(
117+
new FixedSizeChunkIndex(100, 1000, 110, 110),
118+
false, null
119+
);
120+
when(storage.fetch(key))
121+
.thenReturn(new ByteArrayInputStream(MANIFEST.getBytes()));
122+
assertThat(provider.get(key)).isEqualTo(expectedManifest);
123+
verify(storage).fetch(key);
124+
125+
final var mbeanName = new ObjectName(SegmentManifestCacheManager.MBEAN_NAME);
126+
final var mbeanServer = ManagementFactory.getPlatformMBeanServer();
127+
assertThat(mbeanServer.isRegistered(mbeanName)).isTrue();
128+
129+
final var sizeBefore = provider.cache().estimatedSize();
130+
assertThat(sizeBefore).isEqualTo(1L);
131+
132+
mbeanServer.invoke(mbeanName, "clean", new Object[]{}, new String[]{});
133+
134+
final var sizeAfter = provider.cache().estimatedSize();
135+
assertThat(sizeAfter).isEqualTo(0L);
136+
137+
when(storage.fetch(key))
138+
.thenReturn(new ByteArrayInputStream(MANIFEST.getBytes()));
139+
assertThat(provider.get(key)).isEqualTo(expectedManifest);
140+
verify(storage, times(2)).fetch(key);
141+
}
142+
105143
@Test
106144
void shouldPropagateStorageBackendException() throws StorageBackendException {
107145
when(storage.fetch(anyString()))

0 commit comments

Comments
 (0)