Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@
import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD;
import io.aiven.kafka.tieredstorage.security.RsaEncryptionProvider;
import io.aiven.kafka.tieredstorage.storage.BytesRange;
import io.aiven.kafka.tieredstorage.storage.ObjectDeleter;
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
import io.aiven.kafka.tieredstorage.storage.ObjectUploader;
import io.aiven.kafka.tieredstorage.storage.StorageBackend;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
import io.aiven.kafka.tieredstorage.transform.BaseTransformChunkEnumeration;
import io.aiven.kafka.tieredstorage.transform.CompressionChunkEnumeration;
Expand Down Expand Up @@ -78,9 +76,6 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote.

private final Executor executor = new ForkJoinPool();

private ObjectFetcher fetcher;
private ObjectUploader uploader;
private ObjectDeleter deleter;
Comment on lines -81 to -83
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From our discussions, we explicitly wanted these to be separate, primarily to have separate S3 clients for different purposes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should then this be documented? I vaguely remember that but it's not obvious

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusting the PR accordingly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However I still think that we then need to create separate clients inside s3 implementation instead.

private boolean compressionEnabled;
private boolean compressionHeuristic;
private boolean encryptionEnabled;
Expand All @@ -92,6 +87,7 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote.
private ObjectKey objectKey;

private SegmentManifestProvider segmentManifestProvider;
private StorageBackend storage;

public RemoteStorageManager() {
this(Time.SYSTEM);
Expand All @@ -107,9 +103,7 @@ public RemoteStorageManager() {
public void configure(final Map<String, ?> configs) {
Objects.requireNonNull(configs, "configs must not be null");
final RemoteStorageManagerConfig config = new RemoteStorageManagerConfig(configs);
fetcher = config.storage();
uploader = config.storage();
deleter = config.storage();
storage = config.storage();
objectKey = new ObjectKey(config.keyPrefix());
encryptionEnabled = config.encryptionEnabled();
if (encryptionEnabled) {
Expand All @@ -120,7 +114,7 @@ public void configure(final Map<String, ?> configs) {
aesEncryptionProvider = new AesEncryptionProvider();
}
chunkManager = new ChunkManager(
fetcher,
storage,
objectKey,
aesEncryptionProvider,
config.chunkCache()
Expand All @@ -136,7 +130,7 @@ public void configure(final Map<String, ?> configs) {
objectKey,
config.segmentManifestCacheSize(),
config.segmentManifestCacheRetention(),
fetcher,
storage,
mapper,
executor);
}
Expand Down Expand Up @@ -231,7 +225,7 @@ private void uploadSegmentLog(final RemoteLogSegmentMetadata remoteLogSegmentMet
throws IOException, StorageBackendException {
final String fileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.LOG);
try (final var sis = transformFinisher.toInputStream()) {
uploader.upload(sis, fileKey);
storage.upload(sis, fileKey);
}
}

Expand All @@ -241,7 +235,7 @@ private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMeta
throws StorageBackendException, IOException {
final String key = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.fromIndexType(indexType));
try (index) {
uploader.upload(index, key);
storage.upload(index, key);
}
}

Expand All @@ -252,7 +246,7 @@ private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetad
final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);

try (final ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes())) {
uploader.upload(manifestContent, manifestFileKey);
storage.upload(manifestContent, manifestFileKey);
}
}

Expand Down Expand Up @@ -291,7 +285,7 @@ public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMet
final IndexType indexType) throws RemoteStorageException {
try {
final String key = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.fromIndexType(indexType));
return fetcher.fetch(key);
return storage.fetch(key);
} catch (final StorageBackendException e) {
// TODO: should be aligned with upstream implementation
if (indexType == TRANSACTION) {
Expand All @@ -309,7 +303,7 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment
try {
for (final ObjectKey.Suffix suffix : ObjectKey.Suffix.values()) {
final String key = objectKey.key(remoteLogSegmentMetadata, suffix);
deleter.delete(key);
storage.delete(key);
}
} catch (final StorageBackendException e) {
throw new RemoteStorageException(e);
Expand Down