Skip to content

Commit 63b7ab0

Browse files
authored
JAMES-4131 Configure a fallback bucket for S3 (#2719)
1 parent a77e116 commit 63b7ab0

File tree

12 files changed

+160
-27
lines changed

12 files changed

+160
-27
lines changed

docs/modules/servers/partials/configure/blobstore.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ BucketPrefix is the prefix of bucket names in James BlobStore
124124
| objectstorage.namespace
125125
| BlobStore default bucket name. Most of blobs storing in BlobStore are inside the default bucket.
126126
Unless a special case like storing blobs of deleted messages.
127+
128+
| objectstorage.namespace.read.fallback
129+
| BlobStore fallback bucket name. Allows to fallback to a previous used bucket when blob is missing from the default one.
130+
It can be useful when migrating blobs to a new bucket for example.
127131
|===
128132

129133
==== SSE-C Configuration

server/apps/distributed-app/helm-chart/james/configs/blob.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ cache.sizeThresholdInBytes=16 KiB
4141
# Optional, default is bucketPrefix + `default`
4242
objectstorage.namespace=james-${env:JAMES_BUCKET_SUFFIX}
4343

44+
# Fallback bucket name
45+
# Optional, read this bucket when default bukcket reads fails if configured
46+
# objectstorage.namespace.read.fallback=james-fallback
47+
4448
# ========================================= ObjectStorage on S3 =============================================
4549
# Mandatory if you choose aws-s3 storage service, S3 authentication endpoint
4650
objectstorage.s3.endPoint=${env:OS_S3_ENDPOINT}

server/apps/distributed-app/sample-configuration/blob.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ cache.enable=false
6969
# Optional, default is bucketPrefix + `default`
7070
# objectstorage.namespace=james
7171

72+
# Fallback bucket name
73+
# Optional, read this bucket when default bukcket reads fails if configured
74+
# objectstorage.namespace.read.fallback=james-fallback
75+
7276
# ========================================= ObjectStorage on S3 =============================================
7377
# Mandatory if you choose s3 storage service, S3 authentication endpoint
7478
objectstorage.s3.endPoint=http://s3.docker.test:8000/

server/apps/distributed-pop3-app/sample-configuration/blob.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ cache.enable=false
5757
# Optional, default is bucketPrefix + `default`
5858
# objectstorage.namespace=james
5959

60+
# Fallback bucket name
61+
# Optional, read this bucket when default bukcket reads fails if configured
62+
# objectstorage.namespace.read.fallback=james-fallback
63+
6064
# ========================================= ObjectStorage on S3 =============================================
6165
# Mandatory if you choose s3 storage service, S3 authentication endpoint
6266
objectstorage.s3.endPoint=http://s3.docker.test:8000/

server/apps/postgres-app/sample-configuration-distributed/blob.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ encryption.aes.enable=false
4949
# Optional, default is bucketPrefix + `default`
5050
# objectstorage.namespace=james
5151

52+
# Fallback bucket name
53+
# Optional, read this bucket when default bukcket reads fails if configured
54+
# objectstorage.namespace.read.fallback=james-fallback
55+
5256
# ========================================= ObjectStorage on S3 =============================================
5357
# Mandatory if you choose s3 storage service, S3 authentication endpoint
5458
objectstorage.s3.endPoint=http://s3.docker.test:8000/

server/apps/scaling-pulsar-smtp/sample-configuration/blob.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ cache.enable=false
6767
# Optional, default is bucketPrefix + `default`
6868
# objectstorage.namespace=james
6969

70+
# Fallback bucket name
71+
# Optional, read this bucket when default bukcket reads fails if configured
72+
# objectstorage.namespace.read.fallback=james-fallback
73+
7074
# ========================================= ObjectStorage on S3 =============================================
7175
# Mandatory if you choose s3 storage service, S3 authentication endpoint
7276
#objectstorage.s3.endPoint=http://s3.docker.test:8000/

server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStoreDAO.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828

2929
import com.google.common.io.ByteSource;
3030

31-
import reactor.core.publisher.Mono;
32-
3331
public interface BlobStoreDAO {
3432
class ReactiveByteSource {
3533
private final long size;
@@ -58,11 +56,6 @@ public Publisher<ByteBuffer> getContent() {
5856
*/
5957
InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException;
6058

61-
default Publisher<ReactiveByteSource> readAsByteSource(BucketName bucketName, BlobId blobId) {
62-
return Mono.from(readBytes(bucketName, blobId))
63-
.map(bytes -> new ReactiveByteSource(bytes.length, Mono.just(ByteBuffer.wrap(bytes))));
64-
}
65-
6659
Publisher<InputStream> readReactive(BucketName bucketName, BlobId blobId);
6760

6861
/**

server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/BucketNameResolver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ Optional<BucketName> unresolve(BucketName bucketName) {
108108
}).orElse(Optional.of(bucketName));
109109
}
110110

111-
private boolean isNameSpace(BucketName bucketName) {
111+
public boolean isNameSpace(BucketName bucketName) {
112112
return namespace
113113
.map(existingNamespace -> existingNamespace.equals(bucketName))
114114
.orElse(false);

server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreConfiguration.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ class ReadyToBuild {
7777
private Optional<Retry> uploadRetrySpec;
7878
private boolean ssecEnabled;
7979
private Optional<S3SSECConfiguration> ssecConfiguration = Optional.empty();
80+
private Optional<BucketName> fallbackBucketName;
8081

8182
public ReadyToBuild(AwsS3AuthConfiguration specificAuthConfiguration, Region region) {
8283
this.specificAuthConfiguration = specificAuthConfiguration;
@@ -89,6 +90,7 @@ public ReadyToBuild(AwsS3AuthConfiguration specificAuthConfiguration, Region reg
8990
this.connectionTimeout = Optional.empty();
9091
this.inMemoryReadLimit = Optional.empty();
9192
this.uploadRetrySpec = Optional.empty();
93+
this.fallbackBucketName = Optional.empty();
9294
}
9395

9496
public ReadyToBuild defaultBucketName(Optional<BucketName> defaultBucketName) {
@@ -155,11 +157,16 @@ public ReadyToBuild ssecDisabled() {
155157
return this;
156158
}
157159

160+
public ReadyToBuild fallbackBucketName(Optional<BucketName> fallbackBucketName) {
161+
this.fallbackBucketName = fallbackBucketName;
162+
return this;
163+
}
164+
158165
public S3BlobStoreConfiguration build() {
159166
return new S3BlobStoreConfiguration(bucketPrefix, defaultBucketName, region,
160167
specificAuthConfiguration, httpConcurrency.orElse(DEFAULT_HTTP_CONCURRENCY),
161168
inMemoryReadLimit, readTimeout, writeTimeout, connectionTimeout, uploadRetrySpec.orElse(DEFAULT_UPLOAD_RETRY_SPEC),
162-
ssecEnabled, ssecConfiguration);
169+
ssecEnabled, ssecConfiguration, fallbackBucketName);
163170
}
164171
}
165172

@@ -179,6 +186,7 @@ public S3BlobStoreConfiguration build() {
179186
private final Retry uploadRetrySpec;
180187
private final boolean ssecEnabled;
181188
private final Optional<S3SSECConfiguration> ssecConfiguration;
189+
private final Optional<BucketName> fallbackNamespace;
182190

183191
private final Optional<Duration> readTimeout;
184192
private final Optional<Duration> writeTimeout;
@@ -196,7 +204,8 @@ public S3BlobStoreConfiguration build() {
196204
Optional<Duration> connectionTimeout,
197205
Retry uploadRetrySpec,
198206
boolean ssecEnabled,
199-
Optional<S3SSECConfiguration> ssecConfiguration) {
207+
Optional<S3SSECConfiguration> ssecConfiguration,
208+
Optional<BucketName> fallbackNamespace) {
200209
this.bucketPrefix = bucketPrefix;
201210
this.namespace = namespace;
202211
this.region = region;
@@ -209,6 +218,7 @@ public S3BlobStoreConfiguration build() {
209218
this.uploadRetrySpec = uploadRetrySpec;
210219
this.ssecEnabled = ssecEnabled;
211220
this.ssecConfiguration = ssecConfiguration;
221+
this.fallbackNamespace = fallbackNamespace;
212222
}
213223

214224
public Optional<Long> getInMemoryReadLimit() {
@@ -259,6 +269,10 @@ public Optional<S3SSECConfiguration> getSSECConfiguration() {
259269
return ssecConfiguration;
260270
}
261271

272+
public Optional<BucketName> getFallbackNamespace() {
273+
return fallbackNamespace;
274+
}
275+
262276
@Override
263277
public final boolean equals(Object o) {
264278
if (o instanceof S3BlobStoreConfiguration that) {
@@ -273,7 +287,8 @@ public final boolean equals(Object o) {
273287
&& Objects.equals(this.uploadRetrySpec, that.uploadRetrySpec)
274288
&& Objects.equals(this.specificAuthConfiguration, that.specificAuthConfiguration)
275289
&& Objects.equals(this.ssecEnabled, that.ssecEnabled)
276-
&& Objects.equals(this.ssecConfiguration, that.ssecConfiguration);
290+
&& Objects.equals(this.ssecConfiguration, that.ssecConfiguration)
291+
&& Objects.equals(this.fallbackNamespace, that.fallbackNamespace);
277292
}
278293
return false;
279294
}
@@ -282,7 +297,7 @@ public final boolean equals(Object o) {
282297
public final int hashCode() {
283298
return Objects.hash(namespace, bucketPrefix, httpConcurrency, specificAuthConfiguration,
284299
readTimeout, writeTimeout, connectionTimeout, uploadRetrySpec, ssecConfiguration, region,
285-
inMemoryReadLimit, ssecEnabled);
300+
inMemoryReadLimit, ssecEnabled, fallbackNamespace);
286301
}
287302

288303
@Override
@@ -300,6 +315,7 @@ public String toString() {
300315
.add("uploadRetrySpec", uploadRetrySpec)
301316
.add("ssecEnabled", ssecEnabled)
302317
.add("ssecConfiguration", ssecConfiguration)
318+
.add("fallbackNamespace", fallbackNamespace)
303319
.toString();
304320
}
305321
}

server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import reactor.core.scheduler.Schedulers;
5656
import reactor.util.retry.RetryBackoffSpec;
5757
import software.amazon.awssdk.core.BytesWrapper;
58+
import software.amazon.awssdk.core.ResponseBytes;
5859
import software.amazon.awssdk.core.async.AsyncRequestBody;
5960
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
6061
import software.amazon.awssdk.core.async.SdkPublisher;
@@ -115,6 +116,7 @@ public long size() {
115116
private final S3BlobStoreConfiguration configuration;
116117
private final BlobId.Factory blobIdFactory;
117118
private final S3RequestOption s3RequestOption;
119+
private final java.util.Optional<BucketName> fallbackNamespace;
118120

119121
@Inject
120122
public S3BlobStoreDAO(S3ClientFactory s3ClientFactory,
@@ -125,6 +127,7 @@ public S3BlobStoreDAO(S3ClientFactory s3ClientFactory,
125127
this.client = s3ClientFactory.get();
126128
this.blobIdFactory = blobIdFactory;
127129
this.s3RequestOption = s3RequestOption;
130+
this.fallbackNamespace = configuration.getFallbackNamespace();
128131

129132
bucketNameResolver = BucketNameResolver.builder()
130133
.prefix(configuration.getBucketPrefix())
@@ -154,23 +157,24 @@ public Publisher<InputStream> readReactive(BucketName bucketName, BlobId blobId)
154157
.map(res -> ReactorUtils.toInputStream(res.flux));
155158
}
156159

157-
@Override
158-
public Publisher<ReactiveByteSource> readAsByteSource(BucketName bucketName, BlobId blobId) {
159-
BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
160-
161-
return getObject(resolvedBucketName, blobId)
162-
.onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e))
163-
.onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + resolvedBucketName.asString(), e))
164-
.map(res -> new ReactiveByteSource(res.sdkResponse.contentLength(), res.flux));
165-
}
166-
167160
private static class FluxResponse {
168161
final CompletableFuture<FluxResponse> supportingCompletableFuture = new CompletableFuture<>();
169162
GetObjectResponse sdkResponse;
170163
Flux<ByteBuffer> flux;
171164
}
172165

173166
private Mono<FluxResponse> getObject(BucketName bucketName, BlobId blobId) {
167+
return getObjectFromStore(bucketName, blobId)
168+
.onErrorResume(e -> e instanceof NoSuchKeyException || e instanceof NoSuchBucketException, e -> {
169+
if (fallbackNamespace.isPresent() && bucketNameResolver.isNameSpace(bucketName)) {
170+
BucketName resolvedFallbackBucketName = bucketNameResolver.resolve(fallbackNamespace.get());
171+
return getObjectFromStore(resolvedFallbackBucketName, blobId);
172+
}
173+
return Mono.error(e);
174+
});
175+
}
176+
177+
private Mono<FluxResponse> getObjectFromStore(BucketName bucketName, BlobId blobId) {
174178
return buildGetObjectRequestBuilder(bucketName, blobId)
175179
.flatMap(getObjectRequestBuilder -> Mono.fromFuture(() ->
176180
client.getObject(getObjectRequestBuilder.build(),
@@ -208,14 +212,29 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) {
208212
public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
209213
BucketName resolvedBucketName = bucketNameResolver.resolve(bucketName);
210214

211-
return buildGetObjectRequestBuilder(resolvedBucketName, blobId)
212-
.flatMap(putObjectRequest -> Mono.fromFuture(() ->
213-
client.getObject(putObjectRequest.build(), new MinimalCopyBytesResponseTransformer(configuration, blobId)))
215+
return getObjectBytes(resolvedBucketName, blobId)
214216
.onErrorMap(NoSuchBucketException.class, e -> new ObjectNotFoundException("Bucket not found " + resolvedBucketName.asString(), e))
215217
.onErrorMap(NoSuchKeyException.class, e -> new ObjectNotFoundException("Blob not found " + blobId.asString() + " in bucket " + resolvedBucketName.asString(), e))
216218
.publishOn(Schedulers.parallel())
217219
.map(BytesWrapper::asByteArrayUnsafe)
218-
.onErrorMap(e -> e.getCause() instanceof OutOfMemoryError, Throwable::getCause));
220+
.onErrorMap(e -> e.getCause() instanceof OutOfMemoryError, Throwable::getCause);
221+
}
222+
223+
private Mono<ResponseBytes<GetObjectResponse>> getObjectBytes(BucketName bucketName, BlobId blobId) {
224+
return getObjectBytesFromStore(bucketName, blobId)
225+
.onErrorResume(e -> e instanceof NoSuchKeyException || e instanceof NoSuchBucketException, e -> {
226+
if (fallbackNamespace.isPresent() && bucketNameResolver.isNameSpace(bucketName)) {
227+
BucketName resolvedFallbackBucketName = bucketNameResolver.resolve(fallbackNamespace.get());
228+
return getObjectBytesFromStore(resolvedFallbackBucketName, blobId);
229+
}
230+
return Mono.error(e);
231+
});
232+
}
233+
234+
private Mono<ResponseBytes<GetObjectResponse>> getObjectBytesFromStore(BucketName bucketName, BlobId blobId) {
235+
return buildGetObjectRequestBuilder(bucketName, blobId)
236+
.flatMap(putObjectRequest -> Mono.fromFuture(() ->
237+
client.getObject(putObjectRequest.build(), new MinimalCopyBytesResponseTransformer(configuration, blobId))));
219238
}
220239

221240
private Mono<GetObjectRequest.Builder> buildGetObjectRequestBuilder(BucketName bucketName, BlobId blobId) {

0 commit comments

Comments
 (0)