Skip to content

Commit 3e88c96

Browse files
committed
feat(cache:segment-manifest): Add JMX mbean to clean segment manifest cache
1 parent 9990341 commit 3e88c96

File tree

8 files changed

+163
-7
lines changed

8 files changed

+163
-7
lines changed

checkstyle/suppressions.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
<suppress checks="ClassFanOutComplexity" files=".*Test\.java"/>
2424
<suppress checks="ClassFanOutComplexity" files="RemoteStorageManager.java"/>
2525
<suppress checks="ClassFanOutComplexity" files="ChunkCache.java"/>
26+
<suppress checks="ClassFanOutComplexity" files="SegmentManifestProvider.java"/>
2627
<suppress checks="ClassDataAbstractionCoupling" files="CaffeineStatsCounter.java"/>
2728
<suppress checks="ClassDataAbstractionCoupling" files="S3StorageConfig.java"/>
2829
<suppress checks="ClassDataAbstractionCoupling" files="RemoteStorageManager.java"/>

core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,8 @@ public void configure(final Map<String, ?> configs) {
158158
config.segmentManifestCacheRetention(),
159159
fetcher,
160160
mapper,
161-
executor);
161+
executor,
162+
config.enableJmxOperations());
162163
}
163164

164165
// for testing

core/src/main/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfig.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,11 @@ public class RemoteStorageManagerConfig extends AbstractConfig {
4444
private static final String OBJECT_KEY_PREFIX_CONFIG = "key.prefix";
4545
private static final String OBJECT_KEY_PREFIX_DOC = "The object storage path prefix";
4646

47+
private static final String ENABLE_JMX_OPERATIONS_CONFIG = "enable.jmx";
48+
private static final String ENABLE_JMX_OPERATIONS_DOC = "Enable JMX MBeans operations to manage caches";
49+
4750
private static final String SEGMENT_MANIFEST_CACHE_PREFIX = "segment.manifest.cache.";
48-
private static final String SEGMENT_MANIFEST_CACHE_SIZE_CONFIG = SEGMENT_MANIFEST_CACHE_PREFIX + "size";
51+
public static final String SEGMENT_MANIFEST_CACHE_SIZE_CONFIG = SEGMENT_MANIFEST_CACHE_PREFIX + "size";
4952
private static final Long SEGMENT_MANIFEST_CACHE_SIZE_DEFAULT = 1000L; // TODO consider a better default
5053
private static final String SEGMENT_MANIFEST_CACHE_SIZE_DOC =
5154
"The size in items of the segment manifest cache. "
@@ -107,6 +110,14 @@ public class RemoteStorageManagerConfig extends AbstractConfig {
107110
OBJECT_KEY_PREFIX_DOC
108111
);
109112

113+
CONFIG.define(
114+
ENABLE_JMX_OPERATIONS_CONFIG,
115+
ConfigDef.Type.BOOLEAN,
116+
false,
117+
ConfigDef.Importance.LOW,
118+
ENABLE_JMX_OPERATIONS_DOC
119+
);
120+
110121
CONFIG.define(
111122
SEGMENT_MANIFEST_CACHE_SIZE_CONFIG,
112123
ConfigDef.Type.LONG,
@@ -314,6 +325,10 @@ public Optional<Duration> segmentManifestCacheRetention() {
314325
return Optional.of(Duration.ofMillis(rawValue));
315326
}
316327

328+
public boolean enableJmxOperations() {
329+
return getBoolean(ENABLE_JMX_OPERATIONS_CONFIG);
330+
}
331+
317332
public String keyPrefix() {
318333
return getString(OBJECT_KEY_PREFIX_CONFIG);
319334
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.tieredstorage.manifest;
18+
19+
public interface SegmentManifestCacheManager {
20+
String MBEAN_NAME = "aiven.kafka.server.tieredstorage.cache:type=segment-manifest-cache-manager";
21+
22+
void clean();
23+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.tieredstorage.manifest;
18+
19+
public class SegmentManifestCacheManagerMBean implements SegmentManifestCacheManager {
20+
21+
final SegmentManifestProvider provider;
22+
23+
public SegmentManifestCacheManagerMBean(final SegmentManifestProvider segmentManifestProvider) {
24+
this.provider = segmentManifestProvider;
25+
}
26+
27+
@Override
28+
public void clean() {
29+
provider.cache().invalidateAll();
30+
}
31+
}

core/src/main/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProvider.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,16 @@
1616

1717
package io.aiven.kafka.tieredstorage.manifest;
1818

19+
import javax.management.InstanceAlreadyExistsException;
20+
import javax.management.MBeanRegistrationException;
21+
import javax.management.MalformedObjectNameException;
22+
import javax.management.NotCompliantMBeanException;
23+
import javax.management.ObjectName;
24+
import javax.management.StandardMBean;
25+
1926
import java.io.IOException;
2027
import java.io.InputStream;
28+
import java.lang.management.ManagementFactory;
2129
import java.time.Duration;
2230
import java.util.Optional;
2331
import java.util.concurrent.ExecutionException;
@@ -34,9 +42,13 @@
3442

3543
import com.fasterxml.jackson.databind.ObjectMapper;
3644
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
45+
import com.github.benmanes.caffeine.cache.Cache;
3746
import com.github.benmanes.caffeine.cache.Caffeine;
47+
import org.slf4j.Logger;
48+
import org.slf4j.LoggerFactory;
3849

3950
public class SegmentManifestProvider {
51+
private static final Logger LOG = LoggerFactory.getLogger(SegmentManifestProvider.class);
4052
private static final String SEGMENT_MANIFEST_METRIC_GROUP_NAME = "segment-manifest-cache";
4153
private static final long GET_TIMEOUT_SEC = 10;
4254

@@ -52,7 +64,8 @@ public SegmentManifestProvider(final ObjectKey objectKey,
5264
final Optional<Duration> cacheRetention,
5365
final ObjectFetcher fileFetcher,
5466
final ObjectMapper mapper,
55-
final Executor executor) {
67+
final Executor executor,
68+
final boolean enableJmxOperations) {
5669
this.objectKey = objectKey;
5770
final var cacheBuilder = Caffeine.newBuilder()
5871
.recordStats(() -> new CaffeineStatsCounter(SEGMENT_MANIFEST_METRIC_GROUP_NAME))
@@ -64,6 +77,29 @@ public SegmentManifestProvider(final ObjectKey objectKey,
6477
return mapper.readValue(is, SegmentManifest.class);
6578
}
6679
});
80+
if (enableJmxOperations) {
81+
enableJmxMBean();
82+
}
83+
}
84+
85+
private void enableJmxMBean() {
86+
final var mbeanName = SegmentManifestCacheManager.MBEAN_NAME;
87+
try {
88+
final var name = new ObjectName(mbeanName);
89+
final var mbean = new StandardMBean(
90+
new SegmentManifestCacheManagerMBean(this),
91+
SegmentManifestCacheManager.class);
92+
ManagementFactory.getPlatformMBeanServer().registerMBean(mbean, name);
93+
} catch (NotCompliantMBeanException
94+
| MalformedObjectNameException
95+
| InstanceAlreadyExistsException
96+
| MBeanRegistrationException e) {
97+
LOG.warn("Error creating MBean {}", mbeanName, e);
98+
}
99+
}
100+
101+
Cache<String, SegmentManifest> cache() {
102+
return cache.synchronous();
67103
}
68104

69105
public SegmentManifest get(final RemoteLogSegmentMetadata remoteLogSegmentMetadata)

core/src/test/java/io/aiven/kafka/tieredstorage/config/RemoteStorageManagerConfigTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ void minimalConfig() {
5050
assertThat(config.encryptionKeyPairId()).isNull();
5151
assertThat(config.encryptionKeyRing()).isNull();
5252
assertThat(config.keyPrefix()).isEmpty();
53+
assertThat(config.enableJmxOperations()).isFalse();
5354
}
5455

5556
@Test
@@ -294,7 +295,6 @@ void invalidChunkSizeRange() {
294295
.hasMessage("Invalid value 2147483648 for configuration chunk.size: Not a number of type INT");
295296
}
296297

297-
298298
@Test
299299
void invalidCompressionConfig() {
300300
assertThatThrownBy(() -> new RemoteStorageManagerConfig(
@@ -307,4 +307,15 @@ void invalidCompressionConfig() {
307307
.isInstanceOf(ConfigException.class)
308308
.hasMessage("compression.enabled must be enabled if compression.heuristic.enabled is");
309309
}
310+
311+
@Test
312+
void enableJmx() {
313+
final HashMap<String, Object> props = new HashMap<>();
314+
props.put("storage.backend.class", NoopStorageBackend.class);
315+
props.put("chunk.size", "123");
316+
props.put("enable.jmx", true);
317+
final RemoteStorageManagerConfig config = new RemoteStorageManagerConfig(props);
318+
assertThat(config.enableJmxOperations()).isTrue();
319+
}
320+
310321
}

core/src/test/java/io/aiven/kafka/tieredstorage/manifest/SegmentManifestProviderTest.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616

1717
package io.aiven.kafka.tieredstorage.manifest;
1818

19+
import javax.management.ObjectName;
20+
1921
import java.io.ByteArrayInputStream;
2022
import java.io.IOException;
2123
import java.io.InputStream;
24+
import java.lang.management.ManagementFactory;
2225
import java.time.Duration;
2326
import java.util.Map;
2427
import java.util.Optional;
@@ -48,6 +51,7 @@
4851
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4952
import static org.mockito.ArgumentMatchers.anyString;
5053
import static org.mockito.Mockito.doAnswer;
54+
import static org.mockito.Mockito.times;
5155
import static org.mockito.Mockito.verify;
5256
import static org.mockito.Mockito.verifyNoMoreInteractions;
5357
import static org.mockito.Mockito.when;
@@ -86,23 +90,23 @@ class SegmentManifestProviderTest {
8690
void setup() {
8791
provider = new SegmentManifestProvider(
8892
OBJECT_KEY, Optional.of(1000L), Optional.empty(), storage, MAPPER,
89-
ForkJoinPool.commonPool());
93+
ForkJoinPool.commonPool(), false);
9094
}
9195

9296
@Test
9397
void unboundedShouldBeCreated() {
9498
assertThatNoException()
9599
.isThrownBy(() -> new SegmentManifestProvider(
96100
OBJECT_KEY, Optional.empty(), Optional.of(Duration.ofMillis(1)), storage, MAPPER,
97-
ForkJoinPool.commonPool()));
101+
ForkJoinPool.commonPool(), false));
98102
}
99103

100104
@Test
101105
void withoutRetentionLimitsShouldBeCreated() {
102106
assertThatNoException()
103107
.isThrownBy(() -> new SegmentManifestProvider(
104108
OBJECT_KEY, Optional.of(1L), Optional.empty(), storage, MAPPER,
105-
ForkJoinPool.commonPool()));
109+
ForkJoinPool.commonPool(), false));
106110
}
107111

108112
@Test
@@ -120,6 +124,40 @@ void shouldReturnAndCache() throws StorageBackendException, IOException {
120124
verifyNoMoreInteractions(storage);
121125
}
122126

127+
@Test
128+
void invalidateCache_jmx() throws Exception {
129+
provider = new SegmentManifestProvider(
130+
OBJECT_KEY, Optional.of(1000L), Optional.empty(), storage, MAPPER,
131+
ForkJoinPool.commonPool(), true);
132+
133+
final String key = "topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000000023-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest";
134+
final SegmentManifestV1 expectedManifest = new SegmentManifestV1(
135+
new FixedSizeChunkIndex(100, 1000, 110, 110),
136+
false, null
137+
);
138+
when(storage.fetch(key))
139+
.thenReturn(new ByteArrayInputStream(MANIFEST.getBytes()));
140+
assertThat(provider.get(REMOTE_LOG_METADATA)).isEqualTo(expectedManifest);
141+
verify(storage).fetch(key);
142+
143+
final var mbeanName = new ObjectName(SegmentManifestCacheManager.MBEAN_NAME);
144+
final var mbeanServer = ManagementFactory.getPlatformMBeanServer();
145+
assertThat(mbeanServer.isRegistered(mbeanName)).isTrue();
146+
147+
final var sizeBefore = provider.cache().estimatedSize();
148+
assertThat(sizeBefore).isEqualTo(1L);
149+
150+
mbeanServer.invoke(mbeanName, "clean", new Object[]{}, new String[]{});
151+
152+
final var sizeAfter = provider.cache().estimatedSize();
153+
assertThat(sizeAfter).isEqualTo(0L);
154+
155+
when(storage.fetch(key))
156+
.thenReturn(new ByteArrayInputStream(MANIFEST.getBytes()));
157+
assertThat(provider.get(REMOTE_LOG_METADATA)).isEqualTo(expectedManifest);
158+
verify(storage, times(2)).fetch(key);
159+
}
160+
123161
@Test
124162
void shouldPropagateStorageBackendException() throws StorageBackendException {
125163
when(storage.fetch(anyString()))

0 commit comments

Comments
 (0)