Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<suppress checks="ClassFanOutComplexity" files=".*Test\.java"/>
<suppress checks="ClassFanOutComplexity" files="RemoteStorageManager.java"/>
<suppress checks="ClassFanOutComplexity" files="ChunkCache.java"/>
<suppress checks="ClassFanOutComplexity" files="SegmentManifestProvider.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="CaffeineStatsCounter.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="S3StorageConfig.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="RemoteStorageManager.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ public void configure(final Map<String, ?> configs) {
config.segmentManifestCacheRetention(),
fetcher,
mapper,
executor);
executor,
config.enableJmxOperations());
}

// for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ public class RemoteStorageManagerConfig extends AbstractConfig {
private static final String OBJECT_KEY_PREFIX_CONFIG = "key.prefix";
private static final String OBJECT_KEY_PREFIX_DOC = "The object storage path prefix";

private static final String ENABLE_JMX_OPERATIONS_CONFIG = "enable.jmx";
private static final String ENABLE_JMX_OPERATIONS_DOC = "Enable JMX MBeans operations to manage caches";

private static final String SEGMENT_MANIFEST_CACHE_PREFIX = "segment.manifest.cache.";
private static final String SEGMENT_MANIFEST_CACHE_SIZE_CONFIG = SEGMENT_MANIFEST_CACHE_PREFIX + "size";
public static final String SEGMENT_MANIFEST_CACHE_SIZE_CONFIG = SEGMENT_MANIFEST_CACHE_PREFIX + "size";
private static final Long SEGMENT_MANIFEST_CACHE_SIZE_DEFAULT = 1000L; // TODO consider a better default
private static final String SEGMENT_MANIFEST_CACHE_SIZE_DOC =
"The size in items of the segment manifest cache. "
Expand Down Expand Up @@ -107,6 +110,14 @@ public class RemoteStorageManagerConfig extends AbstractConfig {
OBJECT_KEY_PREFIX_DOC
);

CONFIG.define(
ENABLE_JMX_OPERATIONS_CONFIG,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
ENABLE_JMX_OPERATIONS_DOC
);

CONFIG.define(
SEGMENT_MANIFEST_CACHE_SIZE_CONFIG,
ConfigDef.Type.LONG,
Expand Down Expand Up @@ -314,6 +325,10 @@ public Optional<Duration> segmentManifestCacheRetention() {
return Optional.of(Duration.ofMillis(rawValue));
}

public boolean enableJmxOperations() {
return getBoolean(ENABLE_JMX_OPERATIONS_CONFIG);
}

public String keyPrefix() {
return getString(OBJECT_KEY_PREFIX_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.manifest;

public interface SegmentManifestCacheManager {
String MBEAN_NAME = "aiven.kafka.server.tieredstorage.cache:type=segment-manifest-cache-manager";

void clean();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.manifest;

public class SegmentManifestCacheManagerMBean implements SegmentManifestCacheManager {

final SegmentManifestProvider provider;

public SegmentManifestCacheManagerMBean(final SegmentManifestProvider segmentManifestProvider) {
this.provider = segmentManifestProvider;
}

@Override
public void clean() {
provider.cache().invalidateAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,16 @@

package io.aiven.kafka.tieredstorage.manifest;

import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanRegistrationException;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;

import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
Expand All @@ -31,9 +39,13 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SegmentManifestProvider {
private static final Logger LOG = LoggerFactory.getLogger(SegmentManifestProvider.class);
private static final String SEGMENT_MANIFEST_METRIC_GROUP_NAME = "segment-manifest-cache";
private static final long GET_TIMEOUT_SEC = 10;

Expand All @@ -47,7 +59,8 @@ public SegmentManifestProvider(final Optional<Long> maxCacheSize,
final Optional<Duration> cacheRetention,
final ObjectFetcher fileFetcher,
final ObjectMapper mapper,
final Executor executor) {
final Executor executor,
final boolean enableJmxOperations) {
final var statsCounter = new CaffeineStatsCounter(SEGMENT_MANIFEST_METRIC_GROUP_NAME);
final var cacheBuilder = Caffeine.newBuilder()
.recordStats(() -> statsCounter)
Expand All @@ -60,6 +73,29 @@ public SegmentManifestProvider(final Optional<Long> maxCacheSize,
}
});
statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize);
if (enableJmxOperations) {
enableJmxMBean();
}
}

private void enableJmxMBean() {
final var mbeanName = SegmentManifestCacheManager.MBEAN_NAME;
try {
final var name = new ObjectName(mbeanName);
final var mbean = new StandardMBean(
new SegmentManifestCacheManagerMBean(this),
SegmentManifestCacheManager.class);
ManagementFactory.getPlatformMBeanServer().registerMBean(mbean, name);
} catch (NotCompliantMBeanException
| MalformedObjectNameException
| InstanceAlreadyExistsException
| MBeanRegistrationException e) {
LOG.warn("Error creating MBean {}", mbeanName, e);
}
}

Cache<String, SegmentManifest> cache() {
return cache.synchronous();
}

public SegmentManifest get(final String manifestKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ void minimalConfig() {
assertThat(config.encryptionKeyPairId()).isNull();
assertThat(config.encryptionKeyRing()).isNull();
assertThat(config.keyPrefix()).isEmpty();
assertThat(config.enableJmxOperations()).isFalse();
}

@Test
Expand Down Expand Up @@ -294,7 +295,6 @@ void invalidChunkSizeRange() {
.hasMessage("Invalid value 2147483648 for configuration chunk.size: Not a number of type INT");
}


@Test
void invalidCompressionConfig() {
assertThatThrownBy(() -> new RemoteStorageManagerConfig(
Expand All @@ -307,4 +307,15 @@ void invalidCompressionConfig() {
.isInstanceOf(ConfigException.class)
.hasMessage("compression.enabled must be enabled if compression.heuristic.enabled is");
}

@Test
void enableJmx() {
final HashMap<String, Object> props = new HashMap<>();
props.put("storage.backend.class", NoopStorageBackend.class);
props.put("chunk.size", "123");
props.put("enable.jmx", true);
final RemoteStorageManagerConfig config = new RemoteStorageManagerConfig(props);
assertThat(config.enableJmxOperations()).isTrue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

package io.aiven.kafka.tieredstorage.manifest;

import javax.management.ObjectName;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ForkJoinPool;
Expand All @@ -40,6 +43,7 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -68,23 +72,23 @@ class SegmentManifestProviderTest {
void setup() {
provider = new SegmentManifestProvider(
Optional.of(1000L), Optional.empty(), storage, MAPPER,
ForkJoinPool.commonPool());
ForkJoinPool.commonPool(), false);
}

@Test
void unboundedShouldBeCreated() {
assertThatNoException()
.isThrownBy(() -> new SegmentManifestProvider(
Optional.empty(), Optional.of(Duration.ofMillis(1)), storage, MAPPER,
ForkJoinPool.commonPool()));
ForkJoinPool.commonPool(), false));
}

@Test
void withoutRetentionLimitsShouldBeCreated() {
assertThatNoException()
.isThrownBy(() -> new SegmentManifestProvider(
Optional.of(1L), Optional.empty(), storage, MAPPER,
ForkJoinPool.commonPool()));
ForkJoinPool.commonPool(), false));
}

@Test
Expand All @@ -102,6 +106,40 @@ void shouldReturnAndCache() throws StorageBackendException, IOException {
verifyNoMoreInteractions(storage);
}

@Test
void invalidateCache_jmx() throws Exception {
provider = new SegmentManifestProvider(
Optional.of(1000L), Optional.empty(), storage, MAPPER,
ForkJoinPool.commonPool(), true);

final String key = "topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000000023-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest";
final SegmentManifestV1 expectedManifest = new SegmentManifestV1(
new FixedSizeChunkIndex(100, 1000, 110, 110),
false, null
);
when(storage.fetch(key))
.thenReturn(new ByteArrayInputStream(MANIFEST.getBytes()));
assertThat(provider.get(key)).isEqualTo(expectedManifest);
verify(storage).fetch(key);

final var mbeanName = new ObjectName(SegmentManifestCacheManager.MBEAN_NAME);
final var mbeanServer = ManagementFactory.getPlatformMBeanServer();
assertThat(mbeanServer.isRegistered(mbeanName)).isTrue();

final var sizeBefore = provider.cache().estimatedSize();
assertThat(sizeBefore).isEqualTo(1L);

mbeanServer.invoke(mbeanName, "clean", new Object[]{}, new String[]{});

final var sizeAfter = provider.cache().estimatedSize();
assertThat(sizeAfter).isEqualTo(0L);

when(storage.fetch(key))
.thenReturn(new ByteArrayInputStream(MANIFEST.getBytes()));
assertThat(provider.get(key)).isEqualTo(expectedManifest);
verify(storage, times(2)).fetch(key);
}

@Test
void shouldPropagateStorageBackendException() throws StorageBackendException {
when(storage.fetch(anyString()))
Expand Down