Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.pekko.cluster.sharding.ClusterSharding;
import org.apache.pekko.cluster.sharding.ClusterShardingSettings;
import org.apache.pekko.event.DiagnosticLoggingAdapter;
import org.apache.pekko.cluster.singleton.ClusterSingletonProxy;
import org.apache.pekko.cluster.singleton.ClusterSingletonProxySettings;
import org.apache.pekko.japi.pf.DeciderBuilder;
import org.eclipse.ditto.base.service.RootChildActorStarter;
import org.eclipse.ditto.base.service.actors.DittoRootActor;
Expand All @@ -33,6 +35,7 @@
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPersistenceOperationsActor;
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPersistenceStreamingActorCreator;
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionSupervisorActor;
import org.eclipse.ditto.connectivity.service.messaging.persistence.migration.EncryptionMigrationActor;
import org.eclipse.ditto.edge.service.dispatching.EdgeCommandForwarderActor;
import org.eclipse.ditto.edge.service.dispatching.ShardRegions;
import org.eclipse.ditto.internal.utils.cluster.ClusterUtil;
Expand All @@ -46,6 +49,7 @@
import org.eclipse.ditto.internal.utils.health.config.PersistenceConfig;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.persistence.mongo.MongoClientWrapper;
import org.eclipse.ditto.internal.utils.persistence.mongo.MongoHealthChecker;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.internal.utils.persistentactors.PersistencePingActor;
Expand Down Expand Up @@ -112,10 +116,15 @@ private ConnectivityRootActor(final ConnectivityConfig connectivityConfig,
startChildActor(ConnectionIdsRetrievalActor.ACTOR_NAME, ConnectionIdsRetrievalActor.props(mongoReadJournal,
connectionIdsRetrievalConfig));

final MongoClientWrapper mongoClientWrapper =
MongoClientWrapper.newInstance(connectivityConfig.getMongoDbConfig());

startChildActor(ConnectionPersistenceOperationsActor.ACTOR_NAME,
ConnectionPersistenceOperationsActor.props(pubSubMediator, connectivityConfig.getMongoDbConfig(),
ConnectionPersistenceOperationsActor.props(pubSubMediator, mongoClientWrapper,
config, connectivityConfig.getPersistenceOperationsConfig()));

optionallyStartEncryptionMigrationSingleton(actorSystem, connectivityConfig, mongoClientWrapper);

RootChildActorStarter.get(actorSystem, ScopedConfig.dittoExtension(config)).execute(getContext());


Expand Down Expand Up @@ -150,6 +159,22 @@ protected PartialFunction<Throwable, SupervisorStrategy.Directive> getSupervisio
}).build().orElse(super.getSupervisionDecider());
}

private void optionallyStartEncryptionMigrationSingleton(final ActorSystem actorSystem,
final ConnectivityConfig connectivityConfig, final MongoClientWrapper mongoClient) {
final var encryptionConfig = connectivityConfig.getConnectionConfig().getFieldsEncryptionConfig();
if (encryptionConfig.isEncryptionEnabled() || encryptionConfig.getOldSymmetricalKey().isPresent()) {
final String managerName = EncryptionMigrationActor.ACTOR_NAME + "Singleton";
final ActorRef singletonManager = startClusterSingletonActor(
EncryptionMigrationActor.props(connectivityConfig, mongoClient), managerName);

final ClusterSingletonProxySettings proxySettings =
ClusterSingletonProxySettings.create(actorSystem).withRole(CLUSTER_ROLE);
final Props proxyProps = ClusterSingletonProxy.props(
singletonManager.path().toStringWithoutAddress(), proxySettings);
getContext().actorOf(proxyProps, EncryptionMigrationActor.ACTOR_NAME);
}
}

private ActorRef startClusterSingletonActor(final Props props, final String name) {
return ClusterUtil.startSingleton(getContext(), CLUSTER_ROLE, name, props);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,48 @@ public final class DefaultFieldsEncryptionConfig implements FieldsEncryptionConf
private static final String CONFIG_PATH = "encryption";
private final boolean isEncryptionEnabled;
private final String symmetricalKey;
private final String oldSymmetricalKey;
private final List<String> jsonPointers;
private final int migrationBatchSize;
private final int migrationMaxDocumentsPerMinute;


private DefaultFieldsEncryptionConfig(final ConfigWithFallback config) {
this.isEncryptionEnabled = config.getBoolean(ConfigValue.ENCRYPTION_ENABLED.getConfigPath());
this.symmetricalKey = config.getString(ConfigValue.SYMMETRICAL_KEY.getConfigPath());
this.oldSymmetricalKey = config.getString(ConfigValue.OLD_SYMMETRICAL_KEY.getConfigPath());
this.jsonPointers = Collections.unmodifiableList(
new ArrayList<>(config.getStringList(ConfigValue.JSON_POINTERS.getConfigPath())));
if (isEncryptionEnabled && symmetricalKey.trim().isEmpty()) {
throw new DittoConfigError("Missing Symmetric key. It is mandatory when encryption is enabled for connections!");
this.migrationBatchSize = config.getInt(ConfigValue.MIGRATION_BATCH_SIZE.getConfigPath());
this.migrationMaxDocumentsPerMinute = config.getInt(ConfigValue.MIGRATION_MAX_DOCUMENTS_PER_MINUTE.getConfigPath());

validateConfiguration();
}

private void validateConfiguration() {
final boolean hasSymmetricalKey = !symmetricalKey.trim().isEmpty();
final boolean hasOldKey = !oldSymmetricalKey.trim().isEmpty();

// When encryption is enabled, we must have a current encryption key
if (isEncryptionEnabled && !hasSymmetricalKey) {
throw new DittoConfigError(
"Missing 'symmetrical-key'. It is mandatory when encryption is enabled for connections!");
}

if (migrationBatchSize <= 0) {
throw new DittoConfigError(
"'migration.batch-size' must be greater than 0, was: " + migrationBatchSize);
}
if (migrationMaxDocumentsPerMinute < 0) {
throw new DittoConfigError(
"'migration.max-documents-per-minute' must be >= 0, was: " + migrationMaxDocumentsPerMinute);
}

// If both keys are set, they must be different
if (hasSymmetricalKey && hasOldKey && symmetricalKey.equals(oldSymmetricalKey)) {
throw new DittoConfigError(
"Configuration error: 'symmetrical-key' and 'old-symmetrical-key' must be different! " +
"If you're not rotating keys, remove 'old-symmetrical-key'.");
}
}

Expand All @@ -57,15 +89,30 @@ public boolean isEncryptionEnabled() {
}

@Override
public String getSymmetricalKey() {
return this.symmetricalKey;
public Optional<String> getSymmetricalKey() {
return symmetricalKey.trim().isEmpty() ? Optional.empty() : Optional.of(symmetricalKey);
}

@Override
public Optional<String> getOldSymmetricalKey() {
return oldSymmetricalKey.trim().isEmpty() ? Optional.empty() : Optional.of(oldSymmetricalKey);
}

@Override
public List<String> getJsonPointers() {
return this.jsonPointers;
}

@Override
public int getMigrationBatchSize() {
return migrationBatchSize;
}

@Override
public int getMigrationMaxDocumentsPerMinute() {
return migrationMaxDocumentsPerMinute;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -77,20 +124,27 @@ public boolean equals(final Object o) {
final DefaultFieldsEncryptionConfig that = (DefaultFieldsEncryptionConfig) o;
return isEncryptionEnabled == that.isEncryptionEnabled &&
Objects.equals(symmetricalKey, that.symmetricalKey) &&
Objects.equals(jsonPointers, that.jsonPointers);
Objects.equals(oldSymmetricalKey, that.oldSymmetricalKey) &&
Objects.equals(jsonPointers, that.jsonPointers) &&
migrationBatchSize == that.migrationBatchSize &&
migrationMaxDocumentsPerMinute == that.migrationMaxDocumentsPerMinute;
}

@Override
public int hashCode() {
return Objects.hash(isEncryptionEnabled, symmetricalKey, jsonPointers);
return Objects.hash(isEncryptionEnabled, symmetricalKey, oldSymmetricalKey, jsonPointers,
migrationBatchSize, migrationMaxDocumentsPerMinute);
}

@Override
public String toString() {
return getClass().getSimpleName() + "[" +
"enabled=" + isEncryptionEnabled +
", symmetricalKey='***'" +
", oldSymmetricalKey='" + (oldSymmetricalKey.trim().isEmpty() ? "not set" : "***") + "'" +
", jsonPointers=" + jsonPointers +
", migrationBatchSize=" + migrationBatchSize +
", migrationMaxDocumentsPerMinute=" + migrationMaxDocumentsPerMinute +
']';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.ditto.connectivity.service.config;

import java.util.List;
import java.util.Optional;

import org.eclipse.ditto.internal.utils.config.KnownConfigValue;

Expand All @@ -32,10 +33,28 @@ public interface FieldsEncryptionConfig {


/**
* Returns the symmetricalKey used for encryption.
* @return the symmetricalKey
* Returns the current symmetrical key used for encryption.
* This is THE key used for encrypting new data.
*
* @return the current symmetrical key
*/
Optional<String> getSymmetricalKey();

/**
* Returns the old symmetrical key used for decryption fallback during key rotation.
* When set, the system will try to decrypt with the current key first, and fallback to this key if decryption fails.
* <p>
* Typical usage during key rotation:
* <ol>
* <li>Move current key to old-symmetrical-key</li>
* <li>Set new key as symmetrical-key</li>
* <li>Trigger migration via DevOps command</li>
* <li>Remove old-symmetrical-key after migration completes</li>
* </ol>
*
* @return the old symmetrical key, empty if not configured
*/
String getSymmetricalKey();
Optional<String> getOldSymmetricalKey();


/**
Expand All @@ -46,7 +65,20 @@ public interface FieldsEncryptionConfig {
*/
List<String> getJsonPointers();

/**
* Returns the batch size for the encryption migration process.
*
* @return the batch size
*/
int getMigrationBatchSize();

/**
* Returns the maximum number of documents to migrate per minute.
* This throttles the migration stream to avoid overwhelming the database.
*
* @return the maximum documents per minute, 0 means no throttling
*/
int getMigrationMaxDocumentsPerMinute();

/**
* An enumeration of the known config path expressions and their associated default values for {@code FieldsEncryptionConfig}.
Expand All @@ -57,11 +89,20 @@ enum ConfigValue implements KnownConfigValue {
* Determines whether json value encryption is enabled.
*/
ENCRYPTION_ENABLED("encryption-enabled", false),

/**
* The symmetrical key used for encryption.
* The current symmetrical key used for encryption.
* This is THE key used for encrypting all new data.
*/
SYMMETRICAL_KEY("symmetrical-key", ""),

/**
* The old symmetrical key used for decryption fallback during key rotation.
* When set, the system will attempt to decrypt with symmetrical-key first,
* and fallback to this key if decryption fails.
*/
OLD_SYMMETRICAL_KEY("old-symmetrical-key", ""),

/**
* The pointer to the json values to be encrypted.
*/
Expand All @@ -75,7 +116,19 @@ enum ConfigValue implements KnownConfigValue {
"/credentials/parameters/sharedKey",
"/credentials/clientSecret",
"/credentials/password"
));
)),

/**
* The batch size for the encryption migration process.
*/
MIGRATION_BATCH_SIZE("migration.batch-size", 100),

/**
* The maximum number of documents to migrate per minute.
* This throttles the migration stream to avoid overwhelming the database.
* 0 means no throttling.
*/
MIGRATION_MAX_DOCUMENTS_PER_MINUTE("migration.max-documents-per-minute", 200);

private final String configPath;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.eclipse.ditto.internal.utils.persistence.mongo.AbstractMongoSnapshotAdapter;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -77,25 +78,40 @@ protected Optional<JsonField> getRevisionJsonField(final Connection entity) {

@Override
protected Connection createJsonifiableFrom(final JsonObject jsonObject) {
final boolean hasEncryptedFields = encryptionConfig.getJsonPointers().stream()
.map(JsonPointer::of)
.flatMap(p -> jsonObject.getValue(p).filter(JsonValue::isString).map(JsonValue::asString).stream())
.anyMatch(v -> v.contains(JsonFieldsEncryptor.ENCRYPTED_PREFIX));

if (encryptionConfig.getSymmetricalKey().isEmpty()) {
if(jsonObject.formatAsString().contains(JsonFieldsEncryptor.ENCRYPTED_PREFIX)){
if (hasEncryptedFields) {
LOGGER.warn("Encrypted fields will not be decrypted. Missing symmetrical key. " +
"Either configure the one used for encryption or edit connections and update encrypted fields");
}
return ConnectionMigrationUtil.connectionFromJsonWithMigration(jsonObject);
}

if (!hasEncryptedFields) {
return ConnectionMigrationUtil.connectionFromJsonWithMigration(jsonObject);
}

final JsonObject decrypted = JsonFieldsEncryptor.decrypt(jsonObject, "", encryptionConfig.getJsonPointers(),
encryptionConfig.getSymmetricalKey());
encryptionConfig.getSymmetricalKey().get(), encryptionConfig.getOldSymmetricalKey());
return ConnectionMigrationUtil.connectionFromJsonWithMigration(decrypted);
}

@Override
protected JsonObject convertToJson(final Connection snapshotEntity) {
if (encryptionConfig.isEncryptionEnabled()) {
if (encryptionConfig.isEncryptionEnabled() && isCurrentKeyConfigured()) {
final JsonObject jsonObject = super.convertToJson(snapshotEntity);
return JsonFieldsEncryptor.encrypt(jsonObject, "", encryptionConfig.getJsonPointers(),
encryptionConfig.getSymmetricalKey());
encryptionConfig.getSymmetricalKey().orElseThrow());
}
return super.convertToJson(snapshotEntity);
}

private boolean isCurrentKeyConfigured() {
final Optional<String> symmetricalKey = encryptionConfig.getSymmetricalKey();
return symmetricalKey.isPresent() && !symmetricalKey.get().isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import org.eclipse.ditto.connectivity.model.ConnectivityConstants;
import org.eclipse.ditto.internal.utils.persistence.mongo.MongoClientWrapper;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.MongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.ops.eventsource.MongoEntitiesPersistenceOperations;
import org.eclipse.ditto.internal.utils.persistence.mongo.ops.eventsource.MongoEventSourceSettings;
import org.eclipse.ditto.internal.utils.persistence.operations.AbstractPersistenceOperationsActor;
Expand Down Expand Up @@ -51,13 +50,13 @@ private ConnectionPersistenceOperationsActor(final ActorRef pubSubMediator,
* Create Props of this actor.
*
* @param pubSubMediator Pekko pub-sub mediator.
* @param mongoDbConfig the MongoDB configuration settings.
* @param mongoClient the MongoDB client.
* @param config configuration with info about event journal, snapshot store and database.
* @param persistenceOperationsConfig the persistence operations configuration settings.
* @return a Props object.
*/
public static Props props(final ActorRef pubSubMediator,
final MongoDbConfig mongoDbConfig,
final MongoClientWrapper mongoClient,
final Config config,
final PersistenceOperationsConfig persistenceOperationsConfig) {

Expand All @@ -67,7 +66,6 @@ public static Props props(final ActorRef pubSubMediator,
false, ConnectionPersistenceActor.JOURNAL_PLUGIN_ID,
ConnectionPersistenceActor.SNAPSHOT_PLUGIN_ID);

final MongoClientWrapper mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);
final MongoDatabase db = mongoClient.getDefaultDatabase();

final EntityPersistenceOperations entitiesOps =
Expand Down
Loading
Loading