Skip to content

Commit 68e71e7

Browse files
committed
Fix Nullpointer Exception in ReferenceResolve
1 parent 5ff2214 commit 68e71e7

File tree

11 files changed

+306
-394
lines changed

11 files changed

+306
-394
lines changed

src/main/java/de/medizininformatikinitiative/torch/config/AppConfig.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import de.medizininformatikinitiative.torch.util.Redaction;
3434
import de.medizininformatikinitiative.torch.util.ReferenceExtractor;
3535
import de.medizininformatikinitiative.torch.util.ReferenceHandler;
36+
import de.medizininformatikinitiative.torch.util.ResourceGroupValidator;
3637
import de.medizininformatikinitiative.torch.util.ResourceReader;
3738
import de.medizininformatikinitiative.torch.util.ResultFileManager;
3839
import de.numcodex.sq2cql.Translator;
@@ -126,8 +127,13 @@ public ProcessedGroupFactory attributeGroupProcessor(CompartmentManager manager)
126127

127128

128129
@Bean
129-
ReferenceHandler referenceHandler(DataStore dataStore, ProfileMustHaveChecker mustHaveChecker, CompartmentManager compartmentManager, ConsentValidator consentValidator) {
130-
return new ReferenceHandler(mustHaveChecker);
130+
ReferenceHandler referenceHandler(ResourceGroupValidator resourceGroupValidator) {
131+
return new ReferenceHandler(resourceGroupValidator);
132+
}
133+
134+
@Bean
135+
ResourceGroupValidator resourceGroupValidator(ProfileMustHaveChecker profileMustHaveChecker) {
136+
return new ResourceGroupValidator(profileMustHaveChecker);
131137
}
132138

133139
@Bean

src/main/java/de/medizininformatikinitiative/torch/exceptions/ResourceTypeMissmatchException.java

Lines changed: 0 additions & 8 deletions
This file was deleted.

src/main/java/de/medizininformatikinitiative/torch/service/CrtdlProcessingService.java

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.io.IOException;
3333
import java.util.List;
3434
import java.util.Map;
35+
import java.util.Optional;
3536

3637
@Service
3738
public class CrtdlProcessingService {
@@ -117,14 +118,15 @@ private Mono<Void> processIntern(AnnotatedCrtdl crtdl, String jobID, Flux<Patien
117118
// Step 2: Process Patient Batches Using Preprocessed Core Bundle
118119
logger.debug("Process patient batches with a concurrency of {}", maxConcurrency);
119120
return preProcessedCoreBundle.flatMapMany(coreSnapshot ->
120-
batches
121-
.flatMap(batch -> crtdl.consentKey()
122-
.map(s -> consentHandler.fetchAndBuildConsentInfo(s, batch))
123-
.orElse(Mono.just(PatientBatchWithConsent.fromBatch(batch)))
124-
.onErrorResume(ConsentViolatedException.class, ex -> Mono.empty())
125-
, maxConcurrency) //skip batches without consenting patient
126-
.doOnNext(patientBatch -> patientBatch.addStaticInfo(coreSnapshot))
127-
.flatMap(batch -> processBatch(batch, jobID, groupsToProcess, coreBundle), maxConcurrency)
121+
batches.flatMap(batch ->
122+
processBatch(
123+
batch,
124+
jobID,
125+
groupsToProcess,
126+
coreBundle,
127+
crtdl.consentKey(),
128+
coreSnapshot
129+
), maxConcurrency)
128130
).then(
129131
// Step 3: Write the Final Core Resource Bundle to File
130132
Mono.defer(() -> {
@@ -135,15 +137,34 @@ private Mono<Void> processIntern(AnnotatedCrtdl crtdl, String jobID, Flux<Patien
135137
);
136138
}
137139

138-
private Mono<Void> processBatch(PatientBatchWithConsent batch, String jobID, GroupsToProcess groupsToProcess, ResourceBundle coreBundle) {
139-
return directResourceLoader.directLoadPatientCompartment(groupsToProcess.directPatientCompartmentGroups(), batch)
140-
.flatMap(patientBatch -> referenceResolver.processSinglePatientBatch(patientBatch, coreBundle, groupsToProcess.allGroups()))
141-
.map(patientBatch -> cascadingDelete.handlePatientBatch(patientBatch, groupsToProcess.allGroups()))
142-
.map(patientBatch -> batchCopierRedacter.transformBatch(patientBatch, groupsToProcess.allGroups()))
143-
.flatMap(patientBatch -> {
144-
batchToCoreWriter.updateCore(patientBatch, coreBundle);
145-
return writeBatch(jobID, patientBatch);
146-
}
140+
private Mono<Void> processBatch(
141+
PatientBatch batch,
142+
String jobID,
143+
GroupsToProcess groupsToProcess,
144+
ResourceBundle coreBundle,
145+
Optional<String> consentKey,
146+
CachelessResourceBundle coreSnapshot
147+
) {
148+
// Fetch consent (or assume consent if key is empty)
149+
Mono<PatientBatchWithConsent> withConsent = consentKey
150+
.map(key -> consentHandler.fetchAndBuildConsentInfo(key, batch))
151+
.orElse(Mono.just(PatientBatchWithConsent.fromBatch(batch)))
152+
.onErrorResume(ConsentViolatedException.class, ex -> {
153+
logger.debug("Skipping batch due to consent violation: {}", ex.getMessage());
154+
return Mono.empty();
155+
});
156+
157+
return withConsent
158+
.doOnNext(b -> b.addStaticInfo(coreSnapshot))
159+
.flatMap(consented ->
160+
directResourceLoader.directLoadPatientCompartment(groupsToProcess.directPatientCompartmentGroups(), consented)
161+
.flatMap(loaded -> referenceResolver.processSinglePatientBatch(loaded, coreBundle, groupsToProcess.allGroups()))
162+
.map(processed -> cascadingDelete.handlePatientBatch(processed, groupsToProcess.allGroups()))
163+
.map(transformed -> batchCopierRedacter.transformBatch(transformed, groupsToProcess.allGroups()))
164+
.flatMap(finalBatch -> {
165+
batchToCoreWriter.updateCore(finalBatch, coreBundle);
166+
return writeBatch(jobID, finalBatch);
167+
})
147168
);
148169
}
149170

src/main/java/de/medizininformatikinitiative/torch/service/ReferenceResolver.java

Lines changed: 32 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Collections;
2222
import java.util.List;
2323
import java.util.Map;
24+
import java.util.Objects;
2425
import java.util.Optional;
2526
import java.util.Set;
2627
import java.util.stream.Collectors;
@@ -62,14 +63,11 @@ public ReferenceResolver(CompartmentManager compartmentManager,
6263
*/
6364
public Mono<ResourceBundle> resolveCoreBundle(ResourceBundle coreBundle, Map<String, AnnotatedAttributeGroup> groupMap) {
6465
return Mono.just(coreBundle.getValidResourceGroups())
65-
.map(groups -> groups.stream()
66-
.filter(resourceGroup -> !compartmentManager.isInCompartment(resourceGroup)) // your custom filter logic
66+
.map(groups -> groups.stream().filter(resourceGroup -> !compartmentManager.isInCompartment(resourceGroup))
6767
.collect(Collectors.toSet()))
6868
.expand(currentGroupSet ->
6969
processResourceGroups(currentGroupSet, null, coreBundle, false, groupMap)
70-
.onErrorResume(e -> {
71-
return Mono.empty(); // Skip this resource group on error
72-
}))
70+
.onErrorResume(e -> Mono.empty()))
7371
.then(Mono.just(coreBundle));
7472
}
7573

@@ -85,7 +83,11 @@ public Mono<ResourceBundle> resolveCoreBundle(ResourceBundle coreBundle, Map<Str
8583
Mono<PatientBatchWithConsent> processSinglePatientBatch(
8684
PatientBatchWithConsent batch, ResourceBundle coreBundle, Map<String, AnnotatedAttributeGroup> groupMap) {
8785
return Flux.fromIterable(batch.bundles().entrySet())
88-
.concatMap(entry -> resolvePatient(entry.getValue(), coreBundle, batch.applyConsent(), groupMap)
86+
.concatMap(entry -> resolvePatient(entry.getValue(), coreBundle, batch.applyConsent(), groupMap).doOnNext(bundle -> {
87+
if (bundle == null) {
88+
logger.warn("Resolved PatientResourceBundle for key {} is null", entry.getKey());
89+
}
90+
}).filter(Objects::nonNull)
8991
.map(updatedBundle -> Map.entry(entry.getKey(), updatedBundle)))
9092
.collectMap(Map.Entry::getKey, Map.Entry::getValue)
9193
.map(updatedBundles -> new PatientBatchWithConsent(updatedBundles, batch.applyConsent()));
@@ -116,9 +118,7 @@ public Mono<PatientResourceBundle> resolvePatient(
116118
.onErrorResume(e -> {
117119
logger.warn("Error processing resource group set {} in PatientBundle: {}", currentGroupSet, e.getMessage());
118120
return Mono.empty(); // Skip this group on error
119-
})
120-
)
121-
.then(Mono.just(patientBundle));
121+
})).then(Mono.just(patientBundle));
122122
}
123123

124124
/**
@@ -143,27 +143,18 @@ public Mono<Set<ResourceGroup>> processResourceGroups(
143143

144144
return bundleLoader.fetchUnknownResources(referencesGroupedByResourceGroup, patientBundle, coreBundle, applyConsent)
145145
.thenMany(
146-
Flux.fromIterable(referencesGroupedByResourceGroup.entrySet())
146+
Flux.fromIterable(referencesGroupedByResourceGroup.entrySet()).filter(Objects::nonNull)
147+
.filter(entry -> entry.getKey() != null && entry.getValue() != null && !entry.getValue().isEmpty())
147148
.concatMap(entry ->
148-
{
149-
try {
150-
return referenceHandler.handleReferences(
151-
entry.getValue(),
152-
patientBundle,
153-
coreBundle,
154-
groupMap
155-
);
156-
} catch (MustHaveViolatedException e) {
157-
return Flux.empty();
158-
}
159-
}
160-
)
161-
)
149+
Flux.fromIterable(referenceHandler.handleReferences(
150+
entry.getValue(),
151+
patientBundle,
152+
coreBundle,
153+
groupMap))))
162154
.collect(Collectors.toSet())
163155
.flatMap(set -> set.isEmpty() ? Mono.empty() : Mono.just(set));
164156
}
165157

166-
167158
/**
168159
* Extracts for every ResourceGroup the ReferenceWrappers and collects them ordered by
169160
*
@@ -179,9 +170,18 @@ public Map<ResourceGroup, List<ReferenceWrapper>> loadReferencesByResourceGroup(
179170
ResourceBundle coreBundle,
180171
Map<String, AnnotatedAttributeGroup> groupMap) {
181172

182-
return resourceGroups.parallelStream()
183-
.map(resourceGroup -> processResourceGroup(resourceGroup, patientBundle, coreBundle, groupMap))
184-
.filter(entry -> !entry.getValue().isEmpty())
173+
return resourceGroups.stream()
174+
.map(resourceGroup -> processResourceGroup(resourceGroup, patientBundle, coreBundle, groupMap)).map(entry -> {
175+
if (entry == null) {
176+
logger.warn("Null entry returned by processResourceGroup");
177+
} else if (entry.getKey() == null) {
178+
logger.warn("Entry with null key for resource group {}", entry);
179+
} else if (entry.getValue() == null || entry.getValue().isEmpty()) {
180+
logger.info("No references extracted for resource group {}", entry.getKey());
181+
}
182+
return entry;
183+
}).filter(Objects::nonNull)
184+
.filter(entry -> entry.getKey() != null && entry.getValue() != null && !entry.getValue().isEmpty())
185185
.collect(Collectors.toMap(
186186
Map.Entry::getKey,
187187
Map.Entry::getValue,
@@ -219,11 +219,7 @@ private Map.Entry<ResourceGroup, List<ReferenceWrapper>> processResourceGroup(
219219
? patientBundle.bundle().get(resourceGroup.resourceId())
220220
: coreBundle.get(resourceGroup.resourceId());
221221

222-
if (resource.isPresent()) {
223-
return extractReferences(resourceGroup, resource.get(), groupMap, processingBundle);
224-
} else {
225-
return handleMissingResource(resourceGroup, processingBundle);
226-
}
222+
return resource.map(value -> extractReferences(resourceGroup, value, groupMap, processingBundle)).orElseGet(() -> handleMissingResource(resourceGroup, processingBundle));
227223
}
228224

229225
/**
@@ -253,9 +249,7 @@ private Map.Entry<ResourceGroup, List<ReferenceWrapper>> extractReferences(
253249
List<ReferenceWrapper> extracted = referenceExtractor.extract(resource, groupMap, resourceGroup.groupId());
254250
return Map.entry(resourceGroup, extracted);
255251
} catch (MustHaveViolatedException e) {
256-
synchronized (processingBundle) {
257-
processingBundle.addResourceGroupValidity(resourceGroup, false);
258-
}
252+
processingBundle.addResourceGroupValidity(resourceGroup, false);
259253
return Map.entry(resourceGroup, Collections.emptyList());
260254
}
261255
}
@@ -270,11 +264,8 @@ private Map.Entry<ResourceGroup, List<ReferenceWrapper>> extractReferences(
270264
private Map.Entry<ResourceGroup, List<ReferenceWrapper>> handleMissingResource(
271265
ResourceGroup resourceGroup,
272266
ResourceBundle processingBundle) {
273-
274-
synchronized (processingBundle) {
275-
logger.warn("Empty resource marked as valid for group {}", resourceGroup);
276-
processingBundle.addResourceGroupValidity(resourceGroup, false);
277-
}
267+
logger.warn("Empty resource marked as valid for group {}", resourceGroup);
268+
processingBundle.addResourceGroupValidity(resourceGroup, false);
278269
return Map.entry(resourceGroup, Collections.emptyList());
279270
}
280271

0 commit comments

Comments
 (0)