Skip to content

Commit 62553d7

Browse files
committed
Fix Nullpointer Exception in ReferenceResolve
1 parent 5ff2214 commit 62553d7

File tree

11 files changed

+288
-384
lines changed

11 files changed

+288
-384
lines changed

src/main/java/de/medizininformatikinitiative/torch/config/AppConfig.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import de.medizininformatikinitiative.torch.util.Redaction;
3434
import de.medizininformatikinitiative.torch.util.ReferenceExtractor;
3535
import de.medizininformatikinitiative.torch.util.ReferenceHandler;
36+
import de.medizininformatikinitiative.torch.util.ResourceGroupValidator;
3637
import de.medizininformatikinitiative.torch.util.ResourceReader;
3738
import de.medizininformatikinitiative.torch.util.ResultFileManager;
3839
import de.numcodex.sq2cql.Translator;
@@ -126,8 +127,13 @@ public ProcessedGroupFactory attributeGroupProcessor(CompartmentManager manager)
126127

127128

128129
@Bean
129-
ReferenceHandler referenceHandler(DataStore dataStore, ProfileMustHaveChecker mustHaveChecker, CompartmentManager compartmentManager, ConsentValidator consentValidator) {
130-
return new ReferenceHandler(mustHaveChecker);
130+
ReferenceHandler referenceHandler(ResourceGroupValidator resourceGroupValidator) {
131+
return new ReferenceHandler(resourceGroupValidator);
132+
}
133+
134+
@Bean
135+
ResourceGroupValidator resourceGroupValidator(ProfileMustHaveChecker profileMustHaveChecker) {
136+
return new ResourceGroupValidator(profileMustHaveChecker);
131137
}
132138

133139
@Bean

src/main/java/de/medizininformatikinitiative/torch/exceptions/ResourceTypeMissmatchException.java

Lines changed: 0 additions & 8 deletions
This file was deleted.

src/main/java/de/medizininformatikinitiative/torch/service/CrtdlProcessingService.java

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.io.IOException;
3333
import java.util.List;
3434
import java.util.Map;
35+
import java.util.Optional;
3536

3637
@Service
3738
public class CrtdlProcessingService {
@@ -117,14 +118,15 @@ private Mono<Void> processIntern(AnnotatedCrtdl crtdl, String jobID, Flux<Patien
117118
// Step 2: Process Patient Batches Using Preprocessed Core Bundle
118119
logger.debug("Process patient batches with a concurrency of {}", maxConcurrency);
119120
return preProcessedCoreBundle.flatMapMany(coreSnapshot ->
120-
batches
121-
.flatMap(batch -> crtdl.consentKey()
122-
.map(s -> consentHandler.fetchAndBuildConsentInfo(s, batch))
123-
.orElse(Mono.just(PatientBatchWithConsent.fromBatch(batch)))
124-
.onErrorResume(ConsentViolatedException.class, ex -> Mono.empty())
125-
, maxConcurrency) //skip batches without consenting patient
126-
.doOnNext(patientBatch -> patientBatch.addStaticInfo(coreSnapshot))
127-
.flatMap(batch -> processBatch(batch, jobID, groupsToProcess, coreBundle), maxConcurrency)
121+
batches.flatMap(batch ->
122+
processBatch(
123+
batch,
124+
jobID,
125+
groupsToProcess,
126+
coreBundle,
127+
crtdl.consentKey(),
128+
coreSnapshot
129+
), maxConcurrency)
128130
).then(
129131
// Step 3: Write the Final Core Resource Bundle to File
130132
Mono.defer(() -> {
@@ -135,15 +137,34 @@ private Mono<Void> processIntern(AnnotatedCrtdl crtdl, String jobID, Flux<Patien
135137
);
136138
}
137139

138-
private Mono<Void> processBatch(PatientBatchWithConsent batch, String jobID, GroupsToProcess groupsToProcess, ResourceBundle coreBundle) {
139-
return directResourceLoader.directLoadPatientCompartment(groupsToProcess.directPatientCompartmentGroups(), batch)
140-
.flatMap(patientBatch -> referenceResolver.processSinglePatientBatch(patientBatch, coreBundle, groupsToProcess.allGroups()))
141-
.map(patientBatch -> cascadingDelete.handlePatientBatch(patientBatch, groupsToProcess.allGroups()))
142-
.map(patientBatch -> batchCopierRedacter.transformBatch(patientBatch, groupsToProcess.allGroups()))
143-
.flatMap(patientBatch -> {
144-
batchToCoreWriter.updateCore(patientBatch, coreBundle);
145-
return writeBatch(jobID, patientBatch);
146-
}
140+
private Mono<Void> processBatch(
141+
PatientBatch batch,
142+
String jobID,
143+
GroupsToProcess groupsToProcess,
144+
ResourceBundle coreBundle,
145+
Optional<String> consentKey,
146+
CachelessResourceBundle coreSnapshot
147+
) {
148+
// Fetch consent (or assume consent if key is empty)
149+
Mono<PatientBatchWithConsent> withConsent = consentKey
150+
.map(key -> consentHandler.fetchAndBuildConsentInfo(key, batch))
151+
.orElse(Mono.just(PatientBatchWithConsent.fromBatch(batch)))
152+
.onErrorResume(ConsentViolatedException.class, ex -> {
153+
logger.debug("Skipping batch due to consent violation: {}", ex.getMessage());
154+
return Mono.empty();
155+
});
156+
157+
return withConsent
158+
.doOnNext(b -> b.addStaticInfo(coreSnapshot))
159+
.flatMap(consented ->
160+
directResourceLoader.directLoadPatientCompartment(groupsToProcess.directPatientCompartmentGroups(), consented)
161+
.flatMap(loaded -> referenceResolver.processSinglePatientBatch(loaded, coreBundle, groupsToProcess.allGroups()))
162+
.map(processed -> cascadingDelete.handlePatientBatch(processed, groupsToProcess.allGroups()))
163+
.map(transformed -> batchCopierRedacter.transformBatch(transformed, groupsToProcess.allGroups()))
164+
.flatMap(finalBatch -> {
165+
batchToCoreWriter.updateCore(finalBatch, coreBundle);
166+
return writeBatch(jobID, finalBatch);
167+
})
147168
);
148169
}
149170

src/main/java/de/medizininformatikinitiative/torch/service/ReferenceResolver.java

Lines changed: 14 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,7 @@ public Mono<ResourceBundle> resolveCoreBundle(ResourceBundle coreBundle, Map<Str
6767
.collect(Collectors.toSet()))
6868
.expand(currentGroupSet ->
6969
processResourceGroups(currentGroupSet, null, coreBundle, false, groupMap)
70-
.onErrorResume(e -> {
71-
return Mono.empty(); // Skip this resource group on error
72-
}))
70+
.onErrorResume(e -> Mono.empty()))
7371
.then(Mono.just(coreBundle));
7472
}
7573

@@ -144,26 +142,20 @@ public Mono<Set<ResourceGroup>> processResourceGroups(
144142
return bundleLoader.fetchUnknownResources(referencesGroupedByResourceGroup, patientBundle, coreBundle, applyConsent)
145143
.thenMany(
146144
Flux.fromIterable(referencesGroupedByResourceGroup.entrySet())
145+
.filter(entry -> entry.getKey() != null && entry.getValue() != null && !entry.getValue().isEmpty())
147146
.concatMap(entry ->
148-
{
149-
try {
150-
return referenceHandler.handleReferences(
151-
entry.getValue(),
152-
patientBundle,
153-
coreBundle,
154-
groupMap
155-
);
156-
} catch (MustHaveViolatedException e) {
157-
return Flux.empty();
158-
}
159-
}
147+
Flux.fromIterable(referenceHandler.handleReferences(
148+
entry.getValue(),
149+
patientBundle,
150+
coreBundle,
151+
groupMap
152+
))
160153
)
161154
)
162155
.collect(Collectors.toSet())
163156
.flatMap(set -> set.isEmpty() ? Mono.empty() : Mono.just(set));
164157
}
165158

166-
167159
/**
168160
* Extracts for every ResourceGroup the ReferenceWrappers and collects them ordered by
169161
*
@@ -179,9 +171,9 @@ public Map<ResourceGroup, List<ReferenceWrapper>> loadReferencesByResourceGroup(
179171
ResourceBundle coreBundle,
180172
Map<String, AnnotatedAttributeGroup> groupMap) {
181173

182-
return resourceGroups.parallelStream()
174+
return resourceGroups.stream()
183175
.map(resourceGroup -> processResourceGroup(resourceGroup, patientBundle, coreBundle, groupMap))
184-
.filter(entry -> !entry.getValue().isEmpty())
176+
.filter(entry -> entry.getKey() != null && entry.getValue() != null && !entry.getValue().isEmpty())
185177
.collect(Collectors.toMap(
186178
Map.Entry::getKey,
187179
Map.Entry::getValue,
@@ -219,11 +211,7 @@ private Map.Entry<ResourceGroup, List<ReferenceWrapper>> processResourceGroup(
219211
? patientBundle.bundle().get(resourceGroup.resourceId())
220212
: coreBundle.get(resourceGroup.resourceId());
221213

222-
if (resource.isPresent()) {
223-
return extractReferences(resourceGroup, resource.get(), groupMap, processingBundle);
224-
} else {
225-
return handleMissingResource(resourceGroup, processingBundle);
226-
}
214+
return resource.map(value -> extractReferences(resourceGroup, value, groupMap, processingBundle)).orElseGet(() -> handleMissingResource(resourceGroup, processingBundle));
227215
}
228216

229217
/**
@@ -253,9 +241,7 @@ private Map.Entry<ResourceGroup, List<ReferenceWrapper>> extractReferences(
253241
List<ReferenceWrapper> extracted = referenceExtractor.extract(resource, groupMap, resourceGroup.groupId());
254242
return Map.entry(resourceGroup, extracted);
255243
} catch (MustHaveViolatedException e) {
256-
synchronized (processingBundle) {
257-
processingBundle.addResourceGroupValidity(resourceGroup, false);
258-
}
244+
processingBundle.addResourceGroupValidity(resourceGroup, false);
259245
return Map.entry(resourceGroup, Collections.emptyList());
260246
}
261247
}
@@ -270,11 +256,8 @@ private Map.Entry<ResourceGroup, List<ReferenceWrapper>> extractReferences(
270256
private Map.Entry<ResourceGroup, List<ReferenceWrapper>> handleMissingResource(
271257
ResourceGroup resourceGroup,
272258
ResourceBundle processingBundle) {
273-
274-
synchronized (processingBundle) {
275-
logger.warn("Empty resource marked as valid for group {}", resourceGroup);
276-
processingBundle.addResourceGroupValidity(resourceGroup, false);
277-
}
259+
logger.warn("Empty resource marked as valid for group {}", resourceGroup);
260+
processingBundle.addResourceGroupValidity(resourceGroup, false);
278261
return Map.entry(resourceGroup, Collections.emptyList());
279262
}
280263

0 commit comments

Comments
 (0)