|
20 | 20 | import no.ssb.dlp.pseudo.core.util.Json; |
21 | 21 | import no.ssb.dlp.pseudo.service.pseudo.metadata.FieldMetric; |
22 | 22 | import no.ssb.dlp.pseudo.service.pseudo.metadata.PseudoMetadataProcessor; |
| 23 | +import no.ssb.dlp.pseudo.service.tracing.WithSpan; |
23 | 24 |
|
24 | 25 | import java.util.List; |
25 | 26 | import java.util.Map; |
|
32 | 33 | @Data |
33 | 34 | @Slf4j |
34 | 35 | public class PseudoField { |
35 | | - private static final Tracer tracer = GlobalOpenTelemetry.getTracer("pseudo-service"); |
36 | 36 | @Getter(AccessLevel.PROTECTED) |
37 | 37 | private static final int BUFFER_SIZE = 10000; |
38 | 38 | @Getter(AccessLevel.PROTECTED) |
@@ -87,52 +87,46 @@ public PseudoField(String name, String pattern, String pseudoFunc, EncryptedKeys |
87 | 87 | * @param values The values to be processed. |
88 | 88 | * @return A Flowable stream that processes the field values by applying the configured pseudo rules, and returns them as a lists of strings. |
89 | 89 | */ |
90 | | - @AddingSpanAttributes |
91 | | - public Flowable<String> process(@SpanAttribute("pseudoConfigSplitter") PseudoConfigSplitter pseudoConfigSplitter, |
92 | | - @SpanAttribute("recordProcessorFactory") RecordMapProcessorFactory recordProcessorFactory, |
93 | | - @SpanAttribute("values") List<String> values, |
94 | | - @SpanAttribute("pseudoOperation") PseudoOperation pseudoOperation, |
| 90 | + @WithSpan |
| 91 | + public Flowable<String> process(PseudoConfigSplitter pseudoConfigSplitter, |
| 92 | + RecordMapProcessorFactory recordProcessorFactory, |
| 93 | + List<String> values, |
| 94 | + PseudoOperation pseudoOperation, |
95 | 95 | String correlationId) { |
96 | | - final var span = tracer.spanBuilder("PseudoField.process").startSpan(); |
97 | | - try (Scope scope = span.makeCurrent()) { |
98 | | - |
99 | | - Stopwatch stopwatch = Stopwatch.createStarted(); |
100 | | - List<PseudoConfig> pseudoConfigs = pseudoConfigSplitter.splitIfNecessary(this.getPseudoConfig()); |
101 | | - |
102 | | - RecordMapProcessor<PseudoMetadataProcessor> recordMapProcessor; |
103 | | - switch (pseudoOperation) { |
104 | | - case PSEUDONYMIZE -> recordMapProcessor = recordProcessorFactory. |
105 | | - newPseudonymizeRecordProcessor(pseudoConfigs, correlationId); |
106 | | - case DEPSEUDONYMIZE -> recordMapProcessor = recordProcessorFactory. |
107 | | - newDepseudonymizeRecordProcessor(pseudoConfigs, correlationId); |
108 | | - default -> throw new RuntimeException( |
109 | | - String.format("Pseudo operation \"%s\" not supported for this method", pseudoOperation)); |
110 | | - } |
111 | | - Completable preprocessor = getPreprocessor(values, recordMapProcessor); |
112 | | - // Metadata will be processes in parallel with the data, but must be collected separately |
113 | | - final PseudoMetadataProcessor metadataProcessor = recordMapProcessor.getMetadataProcessor(); |
114 | | - final Flowable<String> metadata = Flowable.fromPublisher(metadataProcessor.getMetadata()); |
115 | | - final Flowable<String> logs = Flowable.fromPublisher(metadataProcessor.getLogs()); |
116 | | - final Flowable<String> metrics = Flowable.fromPublisher(metadataProcessor.getMetrics()); |
117 | | - |
118 | | - Flowable<String> result = preprocessor.andThen(Flowable.fromIterable(values.stream() |
119 | | - .map(v -> mapOptional(v, recordMapProcessor, metadataProcessor)).toList() |
120 | | - )) |
121 | | - .map(v -> v.map(Json::from).orElse("null")) |
122 | | - .doOnError(throwable -> { |
123 | | - log.error("Response failed", throwable); |
124 | | - recordMapProcessor.getMetadataProcessor().onErrorAll(throwable); |
125 | | - }) |
126 | | - .doOnComplete(() -> { |
127 | | - log.info("{} took {}", pseudoOperation, stopwatch.stop().elapsed()); |
128 | | - // Signal the metadataProcessor to stop collecting metadata |
129 | | - recordMapProcessor.getMetadataProcessor().onCompleteAll(); |
130 | | - }); |
131 | | - |
132 | | - return PseudoResponseSerializer.serialize(result, metadata, logs, metrics); |
133 | | - } finally { |
134 | | - span.end(); |
| 96 | + Stopwatch stopwatch = Stopwatch.createStarted(); |
| 97 | + List<PseudoConfig> pseudoConfigs = pseudoConfigSplitter.splitIfNecessary(this.getPseudoConfig()); |
| 98 | + |
| 99 | + RecordMapProcessor<PseudoMetadataProcessor> recordMapProcessor; |
| 100 | + switch (pseudoOperation) { |
| 101 | + case PSEUDONYMIZE -> recordMapProcessor = recordProcessorFactory. |
| 102 | + newPseudonymizeRecordProcessor(pseudoConfigs, correlationId); |
| 103 | + case DEPSEUDONYMIZE -> recordMapProcessor = recordProcessorFactory. |
| 104 | + newDepseudonymizeRecordProcessor(pseudoConfigs, correlationId); |
| 105 | + default -> throw new RuntimeException( |
| 106 | + String.format("Pseudo operation \"%s\" not supported for this method", pseudoOperation)); |
135 | 107 | } |
| 108 | + Completable preprocessor = getPreprocessor(values, recordMapProcessor); |
| 109 | + // Metadata will be processes in parallel with the data, but must be collected separately |
| 110 | + final PseudoMetadataProcessor metadataProcessor = recordMapProcessor.getMetadataProcessor(); |
| 111 | + final Flowable<String> metadata = Flowable.fromPublisher(metadataProcessor.getMetadata()); |
| 112 | + final Flowable<String> logs = Flowable.fromPublisher(metadataProcessor.getLogs()); |
| 113 | + final Flowable<String> metrics = Flowable.fromPublisher(metadataProcessor.getMetrics()); |
| 114 | + |
| 115 | + Flowable<String> result = preprocessor.andThen(Flowable.fromIterable(values.stream() |
| 116 | + .map(v -> mapOptional(v, recordMapProcessor, metadataProcessor)).toList() |
| 117 | + )) |
| 118 | + .map(v -> v.map(Json::from).orElse("null")) |
| 119 | + .doOnError(throwable -> { |
| 120 | + log.error("Response failed", throwable); |
| 121 | + recordMapProcessor.getMetadataProcessor().onErrorAll(throwable); |
| 122 | + }) |
| 123 | + .doOnComplete(() -> { |
| 124 | + log.info("{} took {}", pseudoOperation, stopwatch.stop().elapsed()); |
| 125 | + // Signal the metadataProcessor to stop collecting metadata |
| 126 | + recordMapProcessor.getMetadataProcessor().onCompleteAll(); |
| 127 | + }); |
| 128 | + |
| 129 | + return PseudoResponseSerializer.serialize(result, metadata, logs, metrics); |
136 | 130 | } |
137 | 131 |
|
138 | 132 | /** |
|
0 commit comments