Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.security.annotation.Secured;
import io.micronaut.tracing.annotation.NewSpan;
import io.opentelemetry.instrumentation.annotations.AddingSpanAttributes;
import io.opentelemetry.instrumentation.annotations.SpanAttribute;
import io.opentelemetry.instrumentation.annotations.WithSpan;
Expand Down Expand Up @@ -62,7 +61,6 @@ public class PseudoController {
* @return HTTP response containing a {@link HttpResponse<Flowable>} object.
*/

@NewSpan
@WithSpan("pseudonyimze column")
@AddingSpanAttributes
@Operation(summary = "Pseudonymize field", description = "Pseudonymize a field.")
Expand Down
82 changes: 43 additions & 39 deletions src/main/java/no/ssb/dlp/pseudo/service/pseudo/PseudoField.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package no.ssb.dlp.pseudo.service.pseudo;

import com.google.common.base.Stopwatch;
import io.micronaut.tracing.annotation.ContinueSpan;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.annotations.AddingSpanAttributes;
import io.opentelemetry.instrumentation.annotations.SpanAttribute;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import lombok.AccessLevel;
Expand All @@ -21,7 +21,6 @@
import no.ssb.dlp.pseudo.service.pseudo.metadata.FieldMetric;
import no.ssb.dlp.pseudo.service.pseudo.metadata.PseudoMetadataProcessor;

import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -33,6 +32,7 @@
@Data
@Slf4j
public class PseudoField {
private static final Tracer tracer = GlobalOpenTelemetry.getTracer("pseudo-service");
@Getter(AccessLevel.PROTECTED)
private static final int BUFFER_SIZE = 10000;
@Getter(AccessLevel.PROTECTED)
Expand All @@ -50,7 +50,6 @@ public class PseudoField {
* @param pseudoFunc The pseudo function definition.
* @param keyset The encrypted keyset to be used for pseudonymization.
*/
@WithSpan
public PseudoField(String name, String pattern, String pseudoFunc, EncryptedKeysetWrapper keyset) {

pseudoConfig = new PseudoConfig();
Expand Down Expand Up @@ -88,47 +87,52 @@ public PseudoField(String name, String pattern, String pseudoFunc, EncryptedKeys
* @param values The values to be processed.
* @return A Flowable stream that processes the field values by applying the configured pseudo rules, and returns them as a lists of strings.
*/
@ContinueSpan
@AddingSpanAttributes
public Flowable<String> process(@SpanAttribute("pseudoConfigSplitter") PseudoConfigSplitter pseudoConfigSplitter,
@SpanAttribute("recordProcessorFactory") RecordMapProcessorFactory recordProcessorFactory,
@SpanAttribute("values") List<String> values,
@SpanAttribute("pseudoOperation") PseudoOperation pseudoOperation,
String correlationId) {
Stopwatch stopwatch = Stopwatch.createStarted();
List<PseudoConfig> pseudoConfigs = pseudoConfigSplitter.splitIfNecessary(this.getPseudoConfig());

RecordMapProcessor<PseudoMetadataProcessor> recordMapProcessor;
switch (pseudoOperation) {
case PSEUDONYMIZE -> recordMapProcessor = recordProcessorFactory.
newPseudonymizeRecordProcessor(pseudoConfigs, correlationId);
case DEPSEUDONYMIZE -> recordMapProcessor = recordProcessorFactory.
newDepseudonymizeRecordProcessor(pseudoConfigs, correlationId);
default -> throw new RuntimeException(
String.format("Pseudo operation \"%s\" not supported for this method", pseudoOperation));
final var span = tracer.spanBuilder("PseudoField.process").startSpan();
try (Scope scope = span.makeCurrent()) {

Stopwatch stopwatch = Stopwatch.createStarted();
List<PseudoConfig> pseudoConfigs = pseudoConfigSplitter.splitIfNecessary(this.getPseudoConfig());

RecordMapProcessor<PseudoMetadataProcessor> recordMapProcessor;
switch (pseudoOperation) {
case PSEUDONYMIZE -> recordMapProcessor = recordProcessorFactory.
newPseudonymizeRecordProcessor(pseudoConfigs, correlationId);
case DEPSEUDONYMIZE -> recordMapProcessor = recordProcessorFactory.
newDepseudonymizeRecordProcessor(pseudoConfigs, correlationId);
default -> throw new RuntimeException(
String.format("Pseudo operation \"%s\" not supported for this method", pseudoOperation));
}
Completable preprocessor = getPreprocessor(values, recordMapProcessor);
// Metadata will be processes in parallel with the data, but must be collected separately
final PseudoMetadataProcessor metadataProcessor = recordMapProcessor.getMetadataProcessor();
final Flowable<String> metadata = Flowable.fromPublisher(metadataProcessor.getMetadata());
final Flowable<String> logs = Flowable.fromPublisher(metadataProcessor.getLogs());
final Flowable<String> metrics = Flowable.fromPublisher(metadataProcessor.getMetrics());

Flowable<String> result = preprocessor.andThen(Flowable.fromIterable(values.stream()
.map(v -> mapOptional(v, recordMapProcessor, metadataProcessor)).toList()
))
.map(v -> v.map(Json::from).orElse("null"))
.doOnError(throwable -> {
log.error("Response failed", throwable);
recordMapProcessor.getMetadataProcessor().onErrorAll(throwable);
})
.doOnComplete(() -> {
log.info("{} took {}", pseudoOperation, stopwatch.stop().elapsed());
// Signal the metadataProcessor to stop collecting metadata
recordMapProcessor.getMetadataProcessor().onCompleteAll();
});

return PseudoResponseSerializer.serialize(result, metadata, logs, metrics);
} finally {
span.end();
}
Completable preprocessor = getPreprocessor(values, recordMapProcessor);
// Metadata will be processes in parallel with the data, but must be collected separately
final PseudoMetadataProcessor metadataProcessor = recordMapProcessor.getMetadataProcessor();
final Flowable<String> metadata = Flowable.fromPublisher(metadataProcessor.getMetadata());
final Flowable<String> logs = Flowable.fromPublisher(metadataProcessor.getLogs());
final Flowable<String> metrics = Flowable.fromPublisher(metadataProcessor.getMetrics());

Flowable<String> result = preprocessor.andThen(Flowable.fromIterable(values.stream()
.map(v -> mapOptional(v, recordMapProcessor, metadataProcessor)).toList()
))
.map(v -> v.map(Json::from).orElse("null"))
.doOnError(throwable -> {
log.error("Response failed", throwable);
recordMapProcessor.getMetadataProcessor().onErrorAll(throwable);
})
.doOnComplete(() -> {
log.info("{} took {}", pseudoOperation, stopwatch.stop().elapsed());
// Signal the metadataProcessor to stop collecting metadata
recordMapProcessor.getMetadataProcessor().onCompleteAll();
});

return PseudoResponseSerializer.serialize(result, metadata, logs, metrics);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.crypto.tink.Aead;
import io.micronaut.tracing.annotation.ContinueSpan;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.instrumentation.annotations.AddingSpanAttributes;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import jakarta.inject.Singleton;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -33,7 +33,6 @@
import no.ssb.dlp.pseudo.service.pseudo.metadata.FieldMetric;
import no.ssb.dlp.pseudo.service.pseudo.metadata.PseudoMetadataProcessor;

import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -48,26 +47,29 @@
@Singleton
@Slf4j
public class RecordMapProcessorFactory {
private static final Tracer tracer = GlobalOpenTelemetry.getTracer("pseudo-service");
private final PseudoSecrets pseudoSecrets;
private final LoadingCache<String, Aead> aeadCache;

@ContinueSpan
@AddingSpanAttributes
public RecordMapProcessor<PseudoMetadataProcessor> newPseudonymizeRecordProcessor(List<PseudoConfig> pseudoConfigs, String correlationId) {
Span.current().addEvent("newPseudonymizeRecordProcessor", Instant.now());
ValueInterceptorChain chain = new ValueInterceptorChain();
PseudoMetadataProcessor metadataProcessor = new PseudoMetadataProcessor(correlationId);

for (PseudoConfig config : pseudoConfigs) {
for (PseudoKeyset keyset : config.getKeysets()) {
log.info(keyset.getKekUri().toString());
final var span = tracer.spanBuilder("RecordMapProcessorFactory.newPseudonymizeRecordProcessor").startSpan();
try (Scope scope = span.makeCurrent()) {
ValueInterceptorChain chain = new ValueInterceptorChain();
PseudoMetadataProcessor metadataProcessor = new PseudoMetadataProcessor(correlationId);

for (PseudoConfig config : pseudoConfigs) {
for (PseudoKeyset keyset : config.getKeysets()) {
log.info(keyset.getKekUri().toString());
}
final PseudoFuncs fieldPseudonymizer = newPseudoFuncs(config.getRules(),
pseudoKeysetsOf(config.getKeysets()));
chain.preprocessor((f, v) -> init(fieldPseudonymizer, TransformDirection.APPLY, f, v));
chain.register((f, v) -> process(PSEUDONYMIZE, fieldPseudonymizer, f, v, metadataProcessor));
}
final PseudoFuncs fieldPseudonymizer = newPseudoFuncs(config.getRules(),
pseudoKeysetsOf(config.getKeysets()));
chain.preprocessor((f, v) -> init(fieldPseudonymizer, TransformDirection.APPLY, f, v));
chain.register((f, v) -> process(PSEUDONYMIZE, fieldPseudonymizer, f, v, metadataProcessor));
return new RecordMapProcessor<>(chain, metadataProcessor);
} finally {
span.end();
}
return new RecordMapProcessor<>(chain, metadataProcessor);
}

@WithSpan
Expand Down Expand Up @@ -119,78 +121,83 @@ private String process(PseudoOperation operation,
FieldDescriptor field,
String varValue,
PseudoMetadataProcessor metadataProcessor) {
PseudoFuncRuleMatch match = func.findPseudoFunc(field).orElse(null);
final var span = tracer.spanBuilder("RecordMapProcessorFactory.process").startSpan();
try (Scope scope = span.makeCurrent()) {
PseudoFuncRuleMatch match = func.findPseudoFunc(field).orElse(null);

if (match == null) {
return varValue;
}
if (varValue == null) {
// Avoid counting null values to map-sid twice (since map-sid consists of 2 functions)
if (!(match.getFunc() instanceof MapFunc)) {
metadataProcessor.addMetric(FieldMetric.NULL_VALUE);
if (match == null) {
return varValue;
}
return varValue;
}
try {
PseudoFuncDeclaration funcDeclaration = PseudoFuncDeclaration.fromString(match.getRule().getFunc());

// FPE requires minimum two bytes/chars to perform encryption and minimum four bytes in case of Unicode.
if (varValue.length() < 4 && (
match.getFunc() instanceof FpeFunc ||
match.getFunc() instanceof TinkFpeFunc ||
funcDeclaration.getFuncName().equals(PseudoFuncNames.MAP_SID) ||
funcDeclaration.getFuncName().equals(PseudoFuncNames.MAP_SID_FF31)
)) {
metadataProcessor.addMetric(FieldMetric.FPE_LIMITATION);
return getMapFailureStrategy(funcDeclaration.getArgs()) == MapFailureStrategy.RETURN_ORIGINAL ? varValue : null;
if (varValue == null) {
// Avoid counting null values to map-sid twice (since map-sid consists of 2 functions)
if (!(match.getFunc() instanceof MapFunc)) {
metadataProcessor.addMetric(FieldMetric.NULL_VALUE);
}
return varValue;
}

final boolean isSidMapping = funcDeclaration.getFuncName().equals(PseudoFuncNames.MAP_SID)
|| funcDeclaration.getFuncName().equals(PseudoFuncNames.MAP_SID_FF31)
|| funcDeclaration.getFuncName().equals(PseudoFuncNames.MAP_SID_DAEAD);

if (operation == PSEUDONYMIZE) {
PseudoFuncOutput output = match.getFunc().apply(PseudoFuncInput.of(varValue));
output.getWarnings().forEach(metadataProcessor::addLog);
final String sidSnapshotDate = output.getMetadata().getOrDefault(MapFuncConfig.Param.SNAPSHOT_DATE, null);
final String mapFailureMetadata = output.getMetadata().getOrDefault(MAP_FAILURE_METADATA, null);
final String mappedValue = output.getValue();
if (isSidMapping && mapFailureMetadata != null) {
// There has been an unsuccessful SID-mapping
metadataProcessor.addMetric(FieldMetric.MISSING_SID);
} else if (isSidMapping) {
metadataProcessor.addMetric(FieldMetric.MAPPED_SID);
try {
PseudoFuncDeclaration funcDeclaration = PseudoFuncDeclaration.fromString(match.getRule().getFunc());

// FPE requires minimum two bytes/chars to perform encryption and minimum four bytes in case of Unicode.
if (varValue.length() < 4 && (
match.getFunc() instanceof FpeFunc ||
match.getFunc() instanceof TinkFpeFunc ||
funcDeclaration.getFuncName().equals(PseudoFuncNames.MAP_SID) ||
funcDeclaration.getFuncName().equals(PseudoFuncNames.MAP_SID_FF31)
)) {
metadataProcessor.addMetric(FieldMetric.FPE_LIMITATION);
return getMapFailureStrategy(funcDeclaration.getArgs()) == MapFailureStrategy.RETURN_ORIGINAL ? varValue : null;
}
metadataProcessor.addMetadata(FieldMetadata.builder()
.shortName(field.getName())
.dataElementPath(normalizePath(field.getPath())) // Skip leading slash and use dot as separator
.encryptionKeyReference(funcDeclaration.getArgs().getOrDefault(KEY_REFERENCE, null))
.encryptionAlgorithm(match.getFunc().getAlgorithm())
.stableIdentifierVersion(sidSnapshotDate)
.stableIdentifierType(isSidMapping)
.encryptionAlgorithmParameters(funcDeclaration.getArgs())
.build());
return mappedValue;

} else if (operation == DEPSEUDONYMIZE) {
PseudoFuncOutput output = match.getFunc().restore(PseudoFuncInput.of(varValue));
output.getWarnings().forEach(metadataProcessor::addLog);
final String mappedValue = output.getValue();
final String mapFailureMetadata = output.getMetadata().getOrDefault(MAP_FAILURE_METADATA, null);
if (isSidMapping && mapFailureMetadata != null) {
// There has been an unsuccessful SID-mapping
metadataProcessor.addMetric(FieldMetric.MISSING_SID);
} else if (isSidMapping) {
metadataProcessor.addMetric(FieldMetric.MAPPED_SID);

final boolean isSidMapping = funcDeclaration.getFuncName().equals(PseudoFuncNames.MAP_SID)
|| funcDeclaration.getFuncName().equals(PseudoFuncNames.MAP_SID_FF31)
|| funcDeclaration.getFuncName().equals(PseudoFuncNames.MAP_SID_DAEAD);

if (operation == PSEUDONYMIZE) {
PseudoFuncOutput output = match.getFunc().apply(PseudoFuncInput.of(varValue));
output.getWarnings().forEach(metadataProcessor::addLog);
final String sidSnapshotDate = output.getMetadata().getOrDefault(MapFuncConfig.Param.SNAPSHOT_DATE, null);
final String mapFailureMetadata = output.getMetadata().getOrDefault(MAP_FAILURE_METADATA, null);
final String mappedValue = output.getValue();
if (isSidMapping && mapFailureMetadata != null) {
// There has been an unsuccessful SID-mapping
metadataProcessor.addMetric(FieldMetric.MISSING_SID);
} else if (isSidMapping) {
metadataProcessor.addMetric(FieldMetric.MAPPED_SID);
}
metadataProcessor.addMetadata(FieldMetadata.builder()
.shortName(field.getName())
.dataElementPath(normalizePath(field.getPath())) // Skip leading slash and use dot as separator
.encryptionKeyReference(funcDeclaration.getArgs().getOrDefault(KEY_REFERENCE, null))
.encryptionAlgorithm(match.getFunc().getAlgorithm())
.stableIdentifierVersion(sidSnapshotDate)
.stableIdentifierType(isSidMapping)
.encryptionAlgorithmParameters(funcDeclaration.getArgs())
.build());
return mappedValue;

} else if (operation == DEPSEUDONYMIZE) {
PseudoFuncOutput output = match.getFunc().restore(PseudoFuncInput.of(varValue));
output.getWarnings().forEach(metadataProcessor::addLog);
final String mappedValue = output.getValue();
final String mapFailureMetadata = output.getMetadata().getOrDefault(MAP_FAILURE_METADATA, null);
if (isSidMapping && mapFailureMetadata != null) {
// There has been an unsuccessful SID-mapping
metadataProcessor.addMetric(FieldMetric.MISSING_SID);
} else if (isSidMapping) {
metadataProcessor.addMetric(FieldMetric.MAPPED_SID);
}
return mappedValue;
} else {
PseudoFuncOutput output = match.getFunc().restore(PseudoFuncInput.of(varValue));
return output.getValue();
}
return mappedValue;
} else {
PseudoFuncOutput output = match.getFunc().restore(PseudoFuncInput.of(varValue));
return output.getValue();
} catch (Exception e) {
throw new PseudoException(String.format("pseudonymize error - field='%s', originalValue='%s'",
field.getPath(), varValue), e);
}
} catch (Exception e) {
throw new PseudoException(String.format("pseudonymize error - field='%s', originalValue='%s'",
field.getPath(), varValue), e);
} finally {
span.end();
}
}

Expand Down
Loading
Loading