Skip to content

Commit be7b387

Browse files
authored
Add index-level-encryption support for snapshots and remote-store (#20095)
Signed-off-by: Uday Bhaskar <[email protected]>
1 parent ddf20f8 commit be7b387

File tree

51 files changed

+2140
-162
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+2140
-162
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1212
- Added public getter method in `SourceFieldMapper` to return included field ([#20290](https://github.com/opensearch-project/OpenSearch/pull/20290))
1313
- Support for HTTP/3 (server side) ([#20017](https://github.com/opensearch-project/OpenSearch/pull/20017))
1414
- Add circuit breaker support for gRPC transport to prevent out-of-memory errors ([#20203](https://github.com/opensearch-project/OpenSearch/pull/20203))
15+
- Add index-level-encryption support for snapshots and remote-store ([#20095](https://github.com/opensearch-project/OpenSearch/pull/20095))
1516

1617
### Changed
1718
- Handle custom metadata files in subdirectory-store ([#20157](https://github.com/opensearch-project/OpenSearch/pull/20157))

plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureBlobContainer.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.action.ActionRunnable;
4141
import org.opensearch.action.support.GroupedActionListener;
4242
import org.opensearch.action.support.PlainActionFuture;
43+
import org.opensearch.cluster.metadata.CryptoMetadata;
4344
import org.opensearch.common.Nullable;
4445
import org.opensearch.common.blobstore.BlobContainer;
4546
import org.opensearch.common.blobstore.BlobMetadata;
@@ -136,6 +137,25 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
136137
}
137138
}
138139

140+
@Override
141+
public void writeBlobWithMetadata(
142+
String blobName,
143+
InputStream inputStream,
144+
long blobSize,
145+
boolean failIfAlreadyExists,
146+
@Nullable Map<String, String> metadata,
147+
@Nullable CryptoMetadata cryptoMetadata
148+
) throws IOException {
149+
if (cryptoMetadata != null) {
150+
throw new UnsupportedOperationException(
151+
"Azure Blob Storage repository does not currently support CryptoMetadata. "
152+
+ "Consider using repository-level encryption settings instead."
153+
);
154+
}
155+
// Azure does not support custom metadata, so we just delegate to writeBlob
156+
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
157+
}
158+
139159
@Override
140160
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
141161
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);

plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageBlobContainer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.repositories.gcs;
3434

35+
import org.opensearch.common.Nullable;
3536
import org.opensearch.common.blobstore.BlobContainer;
3637
import org.opensearch.common.blobstore.BlobMetadata;
3738
import org.opensearch.common.blobstore.BlobPath;
@@ -95,6 +96,18 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
9596
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
9697
}
9798

99+
@Override
100+
public void writeBlobWithMetadata(
101+
String blobName,
102+
InputStream inputStream,
103+
long blobSize,
104+
boolean failIfAlreadyExists,
105+
@Nullable Map<String, String> metadata
106+
) throws IOException {
107+
// GCS plugin does not currently store custom metadata, so we delegate to writeBlob
108+
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
109+
}
110+
98111
@Override
99112
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
100113
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);

plugins/repository-hdfs/src/main/java/org/opensearch/repositories/hdfs/HdfsBlobContainer.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.hadoop.fs.Options.CreateOpts;
4141
import org.apache.hadoop.fs.Path;
4242
import org.opensearch.common.Nullable;
43+
import org.opensearch.common.annotation.ExperimentalApi;
4344
import org.opensearch.common.blobstore.BlobContainer;
4445
import org.opensearch.common.blobstore.BlobMetadata;
4546
import org.opensearch.common.blobstore.BlobPath;
@@ -163,6 +164,19 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
163164
});
164165
}
165166

167+
@ExperimentalApi
168+
@Override
169+
public void writeBlobWithMetadata(
170+
String blobName,
171+
InputStream inputStream,
172+
long blobSize,
173+
boolean failIfAlreadyExists,
174+
@Nullable Map<String, String> metadata
175+
) throws IOException {
176+
// HDFS does not support custom metadata, so we just delegate to writeBlob
177+
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
178+
}
179+
166180
@Override
167181
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
168182
final String tempBlob = FsBlobContainer.tempBlobName(blobName);

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.logging.log4j.Logger;
6464
import org.apache.logging.log4j.message.ParameterizedMessage;
6565
import org.opensearch.action.support.PlainActionFuture;
66+
import org.opensearch.cluster.metadata.CryptoMetadata;
6667
import org.opensearch.common.Nullable;
6768
import org.opensearch.common.SetOnce;
6869
import org.opensearch.common.StreamContext;
@@ -90,6 +91,7 @@
9091
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
9192
import org.opensearch.repositories.s3.async.UploadRequest;
9293
import org.opensearch.repositories.s3.utils.HttpRangeUtils;
94+
import org.opensearch.repositories.s3.utils.SseKmsUtil;
9395
import org.opensearch.secure_sm.AccessController;
9496

9597
import java.io.BufferedInputStream;
@@ -192,7 +194,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
192194
}
193195

194196
/**
195-
* Write blob with its object metadata.
197+
* Write blob with its object metadata and optional encryption settings.
196198
*/
197199
@ExperimentalApi
198200
@Override
@@ -201,20 +203,51 @@ public void writeBlobWithMetadata(
201203
InputStream inputStream,
202204
long blobSize,
203205
boolean failIfAlreadyExists,
204-
@Nullable Map<String, String> metadata
206+
@Nullable Map<String, String> metadata,
207+
@Nullable CryptoMetadata cryptoMetadata
205208
) throws IOException {
206209
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
207210
AccessController.doPrivilegedChecked(() -> {
208211
if (blobSize <= getLargeBlobThresholdInBytes()) {
209-
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
212+
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata, cryptoMetadata);
210213
} else {
211-
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
214+
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata, cryptoMetadata);
212215
}
213216
});
214217
}
215218

219+
/**
220+
* Write blob with its object metadata.
221+
*/
222+
@ExperimentalApi
223+
@Override
224+
public void writeBlobWithMetadata(
225+
String blobName,
226+
InputStream inputStream,
227+
long blobSize,
228+
boolean failIfAlreadyExists,
229+
@Nullable Map<String, String> metadata
230+
) throws IOException {
231+
// Delegate to crypto-aware version with null CryptoMetadata
232+
writeBlobWithMetadata(blobName, inputStream, blobSize, failIfAlreadyExists, metadata, null);
233+
}
234+
216235
@Override
217236
public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException {
237+
CryptoMetadata crypto = writeContext.getCryptoMetadata();
238+
239+
String indexKmsKey = null;
240+
String indexEncContext = null;
241+
242+
if (crypto != null) {
243+
indexKmsKey = crypto.getKeyArn().orElse(null);
244+
indexEncContext = crypto.getEncryptionContext().orElse(null);
245+
}
246+
String mergeEncContext = SseKmsUtil.mergeAndEncodeEncryptionContexts(
247+
indexEncContext,
248+
blobStore.serverSideEncryptionEncryptionContext()
249+
);
250+
218251
UploadRequest uploadRequest = new UploadRequest(
219252
blobStore.bucket(),
220253
buildKey(writeContext.getFileName()),
@@ -226,9 +259,9 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
226259
blobStore.isUploadRetryEnabled(),
227260
writeContext.getMetadata(),
228261
blobStore.serverSideEncryptionType(),
229-
blobStore.serverSideEncryptionKmsKey(),
262+
indexKmsKey != null ? indexKmsKey : blobStore.serverSideEncryptionKmsKey(),
230263
blobStore.serverSideEncryptionBucketKey(),
231-
blobStore.serverSideEncryptionEncryptionContext(),
264+
mergeEncContext,
232265
blobStore.expectedBucketOwner()
233266
);
234267
try {
@@ -250,7 +283,8 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
250283
uploadRequest.getKey(),
251284
inputStream.getInputStream(),
252285
uploadRequest.getContentLength(),
253-
uploadRequest.getMetadata()
286+
uploadRequest.getMetadata(),
287+
crypto
254288
);
255289
completionListener.onResponse(null);
256290
} catch (Exception ex) {
@@ -536,7 +570,8 @@ void executeSingleUpload(
536570
final String blobName,
537571
final InputStream input,
538572
final long blobSize,
539-
final Map<String, String> metadata
573+
final Map<String, String> metadata,
574+
@Nullable CryptoMetadata cryptoMetadata
540575
) throws IOException {
541576

542577
// Extra safety checks
@@ -559,7 +594,8 @@ void executeSingleUpload(
559594
if (CollectionUtils.isNotEmpty(metadata)) {
560595
putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata);
561596
}
562-
configureEncryptionSettings(putObjectRequestBuilder, blobStore);
597+
598+
configureEncryptionSettings(putObjectRequestBuilder, blobStore, cryptoMetadata);
563599

564600
PutObjectRequest putObjectRequest = putObjectRequestBuilder.build();
565601
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
@@ -585,7 +621,8 @@ void executeMultipartUpload(
585621
final String blobName,
586622
final InputStream input,
587623
final long blobSize,
588-
final Map<String, String> metadata
624+
final Map<String, String> metadata,
625+
@Nullable CryptoMetadata cryptoMetadata
589626
) throws IOException {
590627

591628
ensureMultiPartUploadSize(blobSize);
@@ -616,7 +653,7 @@ void executeMultipartUpload(
616653
createMultipartUploadRequestBuilder.metadata(metadata);
617654
}
618655

619-
configureEncryptionSettings(createMultipartUploadRequestBuilder, blobStore);
656+
configureEncryptionSettings(createMultipartUploadRequestBuilder, blobStore, cryptoMetadata);
620657

621658
final InputStream requestInputStream;
622659
if (blobStore.isUploadRetryEnabled()) {

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/utils/SseKmsUtil.java

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,33 +12,83 @@
1212
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
1313
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
1414

15+
import org.opensearch.cluster.metadata.CryptoMetadata;
16+
import org.opensearch.common.Nullable;
17+
import org.opensearch.repositories.blobstore.EncryptionContextUtils;
1518
import org.opensearch.repositories.s3.S3BlobStore;
1619
import org.opensearch.repositories.s3.async.UploadRequest;
1720

1821
public class SseKmsUtil {
22+
/**
23+
* Merges index-level and repository-level encryption contexts, converts to JSON format if needed,
24+
* and Base64 encodes for S3.
25+
* <p>
26+
* Delegates to centralized EncryptionContextUtils for consistent behavior.
27+
*
28+
* @param indexEncContext Index-level encryption context - can be cryptofs or JSON format
29+
* @param repoEncContext Repository-level encryption context - already Base64 encoded JSON
30+
* @return Base64 encoded merged JSON encryption context, or null if both are null
31+
*/
32+
public static String mergeAndEncodeEncryptionContexts(@Nullable String indexEncContext, @Nullable String repoEncContext) {
33+
return EncryptionContextUtils.mergeAndEncodeEncryptionContexts(indexEncContext, repoEncContext);
34+
}
1935

20-
public static void configureEncryptionSettings(CreateMultipartUploadRequest.Builder builder, S3BlobStore blobStore) {
36+
public static void configureEncryptionSettings(
37+
CreateMultipartUploadRequest.Builder builder,
38+
S3BlobStore blobStore,
39+
@Nullable CryptoMetadata cryptoMetadata
40+
) {
2141
if (blobStore.serverSideEncryptionType().equals(ServerSideEncryption.AES256.toString())) {
2242
builder.serverSideEncryption(ServerSideEncryption.AES256);
2343
} else if (blobStore.serverSideEncryptionType().equals(ServerSideEncryption.AWS_KMS.toString())) {
44+
String indexKmsKey = null;
45+
String indexEncContext = null;
46+
47+
if (cryptoMetadata != null) {
48+
indexKmsKey = cryptoMetadata.getKeyArn().orElse(null);
49+
indexEncContext = cryptoMetadata.getEncryptionContext().orElse(null);
50+
}
51+
52+
String kmsKey = (indexKmsKey != null) ? indexKmsKey : blobStore.serverSideEncryptionKmsKey();
53+
String encContext = mergeAndEncodeEncryptionContexts(indexEncContext, blobStore.serverSideEncryptionEncryptionContext());
54+
2455
builder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
25-
builder.ssekmsKeyId(blobStore.serverSideEncryptionKmsKey());
56+
builder.ssekmsKeyId(kmsKey);
2657
builder.bucketKeyEnabled(blobStore.serverSideEncryptionBucketKey());
27-
builder.ssekmsEncryptionContext(blobStore.serverSideEncryptionEncryptionContext());
58+
builder.ssekmsEncryptionContext(encContext);
2859
}
2960
}
3061

31-
public static void configureEncryptionSettings(PutObjectRequest.Builder builder, S3BlobStore blobStore) {
62+
public static void configureEncryptionSettings(
63+
PutObjectRequest.Builder builder,
64+
S3BlobStore blobStore,
65+
@Nullable CryptoMetadata cryptoMetadata
66+
) {
3267
if (blobStore.serverSideEncryptionType().equals(ServerSideEncryption.AES256.toString())) {
3368
builder.serverSideEncryption(ServerSideEncryption.AES256);
3469
} else if (blobStore.serverSideEncryptionType().equals(ServerSideEncryption.AWS_KMS.toString())) {
70+
String indexKmsKey = null;
71+
String indexEncContext = null;
72+
73+
if (cryptoMetadata != null) {
74+
indexKmsKey = cryptoMetadata.getKeyArn().orElse(null);
75+
indexEncContext = cryptoMetadata.getEncryptionContext().orElse(null);
76+
}
77+
78+
String kmsKey = (indexKmsKey != null) ? indexKmsKey : blobStore.serverSideEncryptionKmsKey();
79+
String encContext = mergeAndEncodeEncryptionContexts(indexEncContext, blobStore.serverSideEncryptionEncryptionContext());
80+
3581
builder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
36-
builder.ssekmsKeyId(blobStore.serverSideEncryptionKmsKey());
82+
builder.ssekmsKeyId(kmsKey);
3783
builder.bucketKeyEnabled(blobStore.serverSideEncryptionBucketKey());
38-
builder.ssekmsEncryptionContext(blobStore.serverSideEncryptionEncryptionContext());
84+
builder.ssekmsEncryptionContext(encContext);
3985
}
4086
}
4187

88+
public static void configureEncryptionSettings(PutObjectRequest.Builder builder, S3BlobStore blobStore) {
89+
configureEncryptionSettings(builder, blobStore, null);
90+
}
91+
4292
public static void configureEncryptionSettings(CreateMultipartUploadRequest.Builder builder, UploadRequest uploadRequest) {
4393
if (uploadRequest.getServerSideEncryptionType().equals(ServerSideEncryption.AES256.toString())) {
4494
builder.serverSideEncryption(ServerSideEncryption.AES256);
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.repositories.s3;
10+
11+
import org.opensearch.cluster.metadata.CryptoMetadata;
12+
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.repositories.s3.utils.SseKmsUtil;
14+
import org.opensearch.test.OpenSearchTestCase;
15+
16+
import java.nio.charset.StandardCharsets;
17+
import java.util.Base64;
18+
19+
public class S3BlobContainerEncryptionTests extends OpenSearchTestCase {
20+
21+
public void testEncryptionContextMerging() {
22+
String indexContext = "tenant=acme,classification=confidential";
23+
String repoContext = Base64.getEncoder().encodeToString("{\"repo\":\"test\",\"env\":\"prod\"}".getBytes(StandardCharsets.UTF_8));
24+
25+
String merged = SseKmsUtil.mergeAndEncodeEncryptionContexts(indexContext, repoContext);
26+
assertNotNull(merged);
27+
28+
String decoded = new String(Base64.getDecoder().decode(merged), StandardCharsets.UTF_8);
29+
assertTrue(decoded.contains("\"tenant\":\"acme\""));
30+
assertTrue(decoded.contains("\"classification\":\"confidential\""));
31+
assertTrue(decoded.contains("\"repo\":\"test\""));
32+
assertTrue(decoded.contains("\"env\":\"prod\""));
33+
}
34+
35+
public void testCryptoMetadataExtraction() {
36+
Settings cryptoSettings = Settings.builder()
37+
.put("kms.key_arn", "arn:aws:kms:us-east-1:123456789:key/index-key")
38+
.put("kms.encryption_context", "tenant=acme,env=staging")
39+
.build();
40+
CryptoMetadata cryptoMetadata = new CryptoMetadata("index-provider", "aws-kms", cryptoSettings);
41+
42+
assertEquals("arn:aws:kms:us-east-1:123456789:key/index-key", cryptoMetadata.settings().get("kms.key_arn"));
43+
assertEquals("tenant=acme,env=staging", cryptoMetadata.settings().get("kms.encryption_context"));
44+
}
45+
}

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import static org.mockito.ArgumentMatchers.anyLong;
8989
import static org.mockito.ArgumentMatchers.anyMap;
9090
import static org.mockito.ArgumentMatchers.anyString;
91+
import static org.mockito.ArgumentMatchers.isNull;
9192
import static org.mockito.Mockito.any;
9293
import static org.mockito.Mockito.doAnswer;
9394
import static org.mockito.Mockito.doNothing;
@@ -757,7 +758,8 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, W
757758
anyString(),
758759
any(InputStream.class),
759760
anyLong(),
760-
anyMap()
761+
anyMap(),
762+
isNull()
761763
);
762764

763765
if (expectException) {

0 commit comments

Comments
 (0)