Skip to content

Commit 6700ae0

Browse files
authored
Merge pull request #324 from aiven/ivanyu/multiple-rsa-keypairs
Provide keyring in configuration
2 parents 5fe998f + 7a107ff commit 6700ae0

File tree

8 files changed

+258
-80
lines changed

8 files changed

+258
-80
lines changed

core/src/integration-test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060
import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD;
6161
import io.aiven.kafka.tieredstorage.security.EncryptedDataKey;
6262
import io.aiven.kafka.tieredstorage.security.RsaEncryptionProvider;
63-
import io.aiven.kafka.tieredstorage.security.RsaKeyReader;
6463

6564
import com.fasterxml.jackson.databind.JsonNode;
6665
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -108,8 +107,6 @@ class RemoteStorageManagerTest extends RsaKeyAwareTest {
108107
static final String TARGET_MANIFEST_FILE =
109108
"test/topic-AAAAAAAAAAAAAAAAAAAAAQ/7/00000000000000000023-AAAAAAAAAAAAAAAAAAAAAA.rsm-manifest";
110109

111-
static final String KEY_ENCRYPTION_KEY_ID = "static-key-id";
112-
113110
private static List<Arguments> provideEndToEnd() {
114111
final List<Arguments> result = new ArrayList<>();
115112
for (final int chunkSize : List.of(1024 * 1024 - 1, 1024 * 1024 * 1024 - 1, Integer.MAX_VALUE / 2)) {
@@ -128,9 +125,7 @@ private static List<Arguments> provideEndToEnd() {
128125
void init() throws IOException {
129126
rsm = new RemoteStorageManager();
130127

131-
rsaEncryptionProvider = new RsaEncryptionProvider(
132-
KEY_ENCRYPTION_KEY_ID,
133-
Map.of(KEY_ENCRYPTION_KEY_ID, RsaKeyReader.read(publicKeyPem, privateKeyPem)));
128+
rsaEncryptionProvider = new RsaEncryptionProvider(KEY_ENCRYPTION_KEY_ID, keyRing);
134129
aesEncryptionProvider = new AesEncryptionProvider();
135130

136131
sourceDir = Path.of(tmpDir.toString(), "source");
@@ -194,8 +189,10 @@ void endToEnd(final int chunkSize,
194189
"chunk.cache.size", Integer.toString(100 * 1024 * 1024)
195190
));
196191
if (encryption) {
197-
config.put("encryption.public.key.file", publicKeyPem.toString());
198-
config.put("encryption.private.key.file", privateKeyPem.toString());
192+
config.put("encryption.key.pair.id", KEY_ENCRYPTION_KEY_ID);
193+
config.put("encryption.key.pairs", KEY_ENCRYPTION_KEY_ID);
194+
config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".public.key.file", publicKeyPem.toString());
195+
config.put("encryption.key.pairs." + KEY_ENCRYPTION_KEY_ID + ".private.key.file", privateKeyPem.toString());
199196
}
200197
rsm.configure(config);
201198

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.tieredstorage;
18+
19+
import java.nio.file.Path;
20+
import java.util.Objects;
21+
22+
public class KeyPairPaths {
23+
public final Path publicKey;
24+
public final Path privateKey;
25+
26+
public KeyPairPaths(final Path publicKey, final Path privateKey) {
27+
this.publicKey = Objects.requireNonNull(publicKey, "publicKey cannot be null");
28+
this.privateKey = Objects.requireNonNull(privateKey, "privateKey cannot be null");
29+
}
30+
31+
@Override
32+
public boolean equals(final Object o) {
33+
if (this == o) {
34+
return true;
35+
}
36+
if (o == null || getClass() != o.getClass()) {
37+
return false;
38+
}
39+
final KeyPairPaths that = (KeyPairPaths) o;
40+
return Objects.equals(publicKey, that.publicKey) && Objects.equals(privateKey, that.privateKey);
41+
}
42+
43+
@Override
44+
public int hashCode() {
45+
return Objects.hash(publicKey, privateKey);
46+
}
47+
48+
@Override
49+
public String toString() {
50+
return "KeyPairPaths("
51+
+ "publicKey=" + publicKey
52+
+ ", privateKey=" + privateKey
53+
+ ")";
54+
}
55+
}

core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.InputStream;
2525
import java.nio.file.Files;
2626
import java.security.KeyPair;
27+
import java.util.HashMap;
2728
import java.util.Map;
2829
import java.util.Objects;
2930
import java.util.Optional;
@@ -110,8 +111,6 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote.
110111

111112
private SegmentManifestProvider segmentManifestProvider;
112113

113-
private static final String KEY_ENCRYPTION_KEY_ID = "static-key-id";
114-
115114
public RemoteStorageManager() {
116115
this(Time.SYSTEM);
117116
}
@@ -134,11 +133,11 @@ public void configure(final Map<String, ?> configs) {
134133
objectKey = new ObjectKey(config.keyPrefix());
135134
encryptionEnabled = config.encryptionEnabled();
136135
if (encryptionEnabled) {
137-
final KeyPair keyPair =
138-
RsaKeyReader.read(config.encryptionPublicKeyFile(), config.encryptionPrivateKeyFile());
139-
rsaEncryptionProvider = new RsaEncryptionProvider(
140-
KEY_ENCRYPTION_KEY_ID,
141-
Map.of(KEY_ENCRYPTION_KEY_ID, keyPair));
136+
final Map<String, KeyPair> keyRing = new HashMap<>();
137+
config.encryptionKeyRing().forEach((keyId, keyPaths) ->
138+
keyRing.put(keyId, RsaKeyReader.read(keyPaths.publicKey, keyPaths.privateKey)
139+
));
140+
rsaEncryptionProvider = new RsaEncryptionProvider(config.encryptionKeyPairId(), keyRing);
142141
aesEncryptionProvider = new AesEncryptionProvider();
143142
}
144143
final ChunkManagerFactory chunkManagerFactory = new ChunkManagerFactory();

core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerConfig.java

Lines changed: 110 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import java.nio.file.Path;
2020
import java.time.Duration;
21+
import java.util.HashMap;
22+
import java.util.List;
2123
import java.util.Map;
2224
import java.util.Optional;
2325

@@ -68,10 +70,6 @@ public class RemoteStorageManagerConfig extends AbstractConfig {
6870

6971
private static final String ENCRYPTION_CONFIG = "encryption.enabled";
7072
private static final String ENCRYPTION_DOC = "Whether to enable encryption";
71-
private static final String ENCRYPTION_PUBLIC_KEY_FILE_CONFIG = "encryption.public.key.file";
72-
private static final String ENCRYPTION_PUBLIC_KEY_FILE_DOC = "The path to the RSA public key file";
73-
private static final String ENCRYPTION_PRIVATE_KEY_FILE_CONFIG = "encryption.private.key.file";
74-
private static final String ENCRYPTION_PRIVATE_KEY_FILE_DOC = "The path to the RSA private key file";
7573
// TODO add possibility to pass keys as strings
7674

7775

@@ -158,20 +156,6 @@ public class RemoteStorageManagerConfig extends AbstractConfig {
158156
ConfigDef.Importance.HIGH,
159157
ENCRYPTION_DOC
160158
);
161-
CONFIG.define(
162-
ENCRYPTION_PUBLIC_KEY_FILE_CONFIG,
163-
ConfigDef.Type.STRING,
164-
null,
165-
ConfigDef.Importance.HIGH,
166-
ENCRYPTION_PUBLIC_KEY_FILE_DOC
167-
);
168-
CONFIG.define(
169-
ENCRYPTION_PRIVATE_KEY_FILE_CONFIG,
170-
ConfigDef.Type.STRING,
171-
null,
172-
ConfigDef.Importance.HIGH,
173-
ENCRYPTION_PRIVATE_KEY_FILE_DOC
174-
);
175159

176160
CONFIG
177161
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
@@ -196,14 +180,107 @@ public class RemoteStorageManagerConfig extends AbstractConfig {
196180
METRICS_RECORDING_LEVEL_DOC);
197181
}
198182

183+
/**
184+
* Internal config for encryption.
185+
*
186+
* <p>It's needed for more convenient dynamic config definition.
187+
*/
188+
private static class EncryptionConfig extends AbstractConfig {
189+
private static final String ENCRYPTION_KEY_PAIR_ID_CONFIG = "encryption.key.pair.id";
190+
private static final String ENCRYPTION_KEY_PAIR_ID_DOC =
191+
"The ID of the key pair to be used for encryption";
192+
193+
private static final String ENCRYPTION_KEY_PAIRS_CONFIG = "encryption.key.pairs";
194+
private static final String ENCRYPTION_KEY_PAIRS_DOC = "The list of encryption key pair IDs";
195+
196+
private static final String ENCRYPTION_PUBLIC_KEY_FILE_DOC = "The path to the RSA public key file";
197+
private static final String ENCRYPTION_PRIVATE_KEY_FILE_DOC = "The path to the RSA private key file";
198+
199+
private EncryptionConfig(final ConfigDef configDef, final Map<String, ?> props) {
200+
super(configDef, props);
201+
}
202+
203+
Path encryptionPublicKeyFile(final String keyPairId) {
204+
return Path.of(getString(publicKeyFileConfig(keyPairId)));
205+
}
206+
207+
Path encryptionPrivateKeyFile(final String keyPairId) {
208+
return Path.of(getString(privateKeyFileConfig(keyPairId)));
209+
}
210+
211+
public static EncryptionConfig create(final Map<String, ?> props) {
212+
final ConfigDef configDef = new ConfigDef();
213+
// First, define the active key ID and key ID list fields, they are required always.
214+
configDef.define(
215+
ENCRYPTION_KEY_PAIR_ID_CONFIG,
216+
ConfigDef.Type.STRING,
217+
ConfigDef.NO_DEFAULT_VALUE,
218+
ConfigDef.Importance.HIGH,
219+
ENCRYPTION_KEY_PAIR_ID_DOC
220+
);
221+
configDef.define(
222+
ENCRYPTION_KEY_PAIRS_CONFIG,
223+
ConfigDef.Type.LIST,
224+
ConfigDef.NO_DEFAULT_VALUE,
225+
ConfigDef.Importance.HIGH,
226+
ENCRYPTION_KEY_PAIRS_DOC
227+
);
228+
final EncryptionConfig interimEncryptionConfig = new EncryptionConfig(configDef, props);
229+
230+
// Check that the active ID is present in the list.
231+
if (!interimEncryptionConfig.keyPairIds().contains(interimEncryptionConfig.activeKeyPairId())) {
232+
throw new ConfigException(
233+
"Encryption key '" + interimEncryptionConfig.activeKeyPairId() + "' must be provided");
234+
}
235+
236+
// Then, define key fields dynamically based on the key pair IDs provided above.
237+
// See e.g. the ConnectorConfig.enrich in the Kafka code.
238+
for (final String keyPairId : interimEncryptionConfig.keyPairIds()) {
239+
configDef.define(
240+
publicKeyFileConfig(keyPairId),
241+
ConfigDef.Type.STRING,
242+
ConfigDef.NO_DEFAULT_VALUE,
243+
ConfigDef.Importance.HIGH,
244+
ENCRYPTION_PUBLIC_KEY_FILE_DOC
245+
);
246+
configDef.define(
247+
privateKeyFileConfig(keyPairId),
248+
ConfigDef.Type.STRING,
249+
ConfigDef.NO_DEFAULT_VALUE,
250+
ConfigDef.Importance.HIGH,
251+
ENCRYPTION_PRIVATE_KEY_FILE_DOC
252+
);
253+
}
254+
255+
return new EncryptionConfig(configDef, props);
256+
}
257+
258+
String activeKeyPairId() {
259+
return getString(ENCRYPTION_KEY_PAIR_ID_CONFIG);
260+
}
261+
262+
List<String> keyPairIds() {
263+
return getList(ENCRYPTION_KEY_PAIRS_CONFIG);
264+
}
265+
266+
private static String publicKeyFileConfig(final String keyPairId) {
267+
return "encryption.key.pairs." + keyPairId + ".public.key.file";
268+
}
269+
270+
private static String privateKeyFileConfig(final String keyPairId) {
271+
return "encryption.key.pairs." + keyPairId + ".private.key.file";
272+
}
273+
}
274+
275+
private final EncryptionConfig encryptionConfig;
199276

200277
RemoteStorageManagerConfig(final Map<String, ?> props) {
201278
super(CONFIG, props);
279+
encryptionConfig = encryptionEnabled() ? EncryptionConfig.create(props) : null;
202280
validate();
203281
}
204282

205283
private void validate() {
206-
validateEncryption();
207284
validateCompression();
208285
}
209286

@@ -214,17 +291,6 @@ private void validateCompression() {
214291
}
215292
}
216293

217-
private void validateEncryption() {
218-
if (getBoolean(ENCRYPTION_CONFIG) && getString(ENCRYPTION_PUBLIC_KEY_FILE_CONFIG) == null) {
219-
throw new ConfigException(
220-
ENCRYPTION_PUBLIC_KEY_FILE_CONFIG + " must be provided if encryption is enabled");
221-
}
222-
if (getBoolean(ENCRYPTION_CONFIG) && getString(ENCRYPTION_PRIVATE_KEY_FILE_CONFIG) == null) {
223-
throw new ConfigException(
224-
ENCRYPTION_PRIVATE_KEY_FILE_CONFIG + " must be provided if encryption is enabled");
225-
}
226-
}
227-
228294
StorageBackend storage() {
229295
final Class<?> storageClass = getClass(STORAGE_BACKEND_CLASS_CONFIG);
230296
final StorageBackend storage = Utils.newInstance(storageClass, StorageBackend.class);
@@ -268,19 +334,25 @@ boolean encryptionEnabled() {
268334
return getBoolean(ENCRYPTION_CONFIG);
269335
}
270336

271-
Path encryptionPublicKeyFile() {
272-
final String value = getString(ENCRYPTION_PUBLIC_KEY_FILE_CONFIG);
273-
if (value == null) {
337+
String encryptionKeyPairId() {
338+
if (!encryptionEnabled()) {
274339
return null;
275340
}
276-
return Path.of(value);
341+
return encryptionConfig.activeKeyPairId();
277342
}
278343

279-
Path encryptionPrivateKeyFile() {
280-
final String value = getString(ENCRYPTION_PRIVATE_KEY_FILE_CONFIG);
281-
if (value == null) {
344+
Map<String, KeyPairPaths> encryptionKeyRing() {
345+
if (!encryptionEnabled()) {
282346
return null;
283347
}
284-
return Path.of(value);
348+
349+
final Map<String, KeyPairPaths> result = new HashMap<>();
350+
for (final String keyPairId : encryptionConfig.keyPairIds()) {
351+
final KeyPairPaths keyPair = new KeyPairPaths(
352+
encryptionConfig.encryptionPublicKeyFile(keyPairId),
353+
encryptionConfig.encryptionPrivateKeyFile(keyPairId));
354+
result.put(keyPairId, keyPair);
355+
}
356+
return result;
285357
}
286358
}

0 commit comments

Comments
 (0)