diff --git a/src/main/java/de/medizininformatikinitiative/torch/exceptions/DataStoreException.java b/src/main/java/de/medizininformatikinitiative/torch/exceptions/DataStoreException.java deleted file mode 100644 index 7f85f410..00000000 --- a/src/main/java/de/medizininformatikinitiative/torch/exceptions/DataStoreException.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.medizininformatikinitiative.torch.exceptions; - -public class DataStoreException extends Exception { - - public DataStoreException(String errorMessage) { - super(errorMessage); - } - - public DataStoreException(String message, Throwable cause) { - super(message, cause); - } - -} diff --git a/src/main/java/de/medizininformatikinitiative/torch/model/management/PatientResourceBundle.java b/src/main/java/de/medizininformatikinitiative/torch/model/management/PatientResourceBundle.java index fdd2fa55..3d8a7816 100644 --- a/src/main/java/de/medizininformatikinitiative/torch/model/management/PatientResourceBundle.java +++ b/src/main/java/de/medizininformatikinitiative/torch/model/management/PatientResourceBundle.java @@ -6,6 +6,7 @@ import java.util.Collection; import java.util.Optional; +import java.util.Set; import static java.util.Objects.requireNonNull; @@ -85,4 +86,8 @@ public void put(String resourceReference) { public void addStaticInfo(CachelessResourceBundle staticInfo) { bundle.merge(staticInfo); } + + public Set getValidResourceGroups() { + return bundle.getValidResourceGroups(); + } } diff --git a/src/main/java/de/medizininformatikinitiative/torch/service/CrtdlProcessingService.java b/src/main/java/de/medizininformatikinitiative/torch/service/CrtdlProcessingService.java index d93e5424..38a7f783 100644 --- a/src/main/java/de/medizininformatikinitiative/torch/service/CrtdlProcessingService.java +++ b/src/main/java/de/medizininformatikinitiative/torch/service/CrtdlProcessingService.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.UUID; @Service public class CrtdlProcessingService { @@ -139,6 +140,7 @@ private Mono processIntern(AnnotatedCrtdl crtdl, String jobID, Flux { + logger.debug("Handling Final Core Bundle"); PatientResourceBundle corePatientBundle = new PatientResourceBundle("CORE", coreBundle); PatientBatchWithConsent coreBundleBatch = new PatientBatchWithConsent(Map.of("CORE", corePatientBundle), false); return writeBatch(jobID, batchCopierRedacter.transformBatch(coreBundleBatch, groupsToProcess.allGroups())); @@ -147,10 +149,14 @@ private Mono processIntern(AnnotatedCrtdl crtdl, String jobID, Flux processBatch(PatientBatchWithConsent batch, String jobID, GroupsToProcess groupsToProcess, ResourceBundle coreBundle) { + UUID id = UUID.randomUUID(); return directResourceLoader.directLoadPatientCompartment(groupsToProcess.directPatientCompartmentGroups(), batch) + .doOnNext(loadedBatch -> logger.debug("Directly loaded patient compartment for batch {} with {} patients", id, loadedBatch.patientIds().size())) .flatMap(patientBatch -> referenceResolver.processSinglePatientBatch(patientBatch, coreBundle, groupsToProcess.allGroups())) .map(patientBatch -> cascadingDelete.handlePatientBatch(patientBatch, groupsToProcess.allGroups())) + .doOnNext(loadedBatch -> logger.debug("Batch resolved references {} with {} patients", id, loadedBatch.patientIds().size())) .map(patientBatch -> batchCopierRedacter.transformBatch(patientBatch, groupsToProcess.allGroups())) + .doOnNext(loadedBatch -> logger.debug("Batch finished extraction {} ", id)) .flatMap(patientBatch -> { batchToCoreWriter.updateCore(patientBatch, coreBundle); return writeBatch(jobID, patientBatch); diff --git a/src/main/java/de/medizininformatikinitiative/torch/service/DataStore.java b/src/main/java/de/medizininformatikinitiative/torch/service/DataStore.java index e355b5f6..16d68ca1 100644 --- a/src/main/java/de/medizininformatikinitiative/torch/service/DataStore.java +++ b/src/main/java/de/medizininformatikinitiative/torch/service/DataStore.java @@ -32,6 +32,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Stream; @@ -154,8 +155,10 @@ private String serializeBatchBundle(Map> idsByType) { */ public Flux search(Query query, Class resourceType) { var start = System.nanoTime(); - logger.trace("Execute query: {}", query); - + var queryId = UUID.randomUUID(); + logger.debug("Executing query {} for resource type {}", queryId, query.type()); + logger.trace("Full query: {}", query); + var counter = new AtomicInteger(); return client.post() .uri("/" + query.type() + "/_search") .contentType(APPLICATION_FORM_URLENCODED) @@ -170,14 +173,15 @@ public Flux search(Query query, Class resourceType) { .flatMap(bundle -> Flux.fromStream(bundle.getEntry().stream().map(Bundle.BundleEntryComponent::getResource))) .flatMap(resource -> { if (resourceType.isInstance(resource)) { + counter.incrementAndGet(); return Mono.just(resourceType.cast(resource)); } else { logger.warn("Found miss match resource type {} querying type {}", resource.getClass().getSimpleName(), query.type()); return Mono.empty(); } }) - .doOnComplete(() -> logger.trace("Finished query `{}` in {} seconds.", query, - "%.1f".formatted(TimeUtils.durationSecondsSince(start)))) + .doOnComplete(() -> logger.debug("Finished query `{}` in {} seconds with {} resources.", queryId, + "%.1f".formatted(TimeUtils.durationSecondsSince(start)), counter.get())) .doOnError(e -> logger.error("Error while executing resource query `{}`: {}", query, e.getMessage())); } diff --git a/src/main/java/de/medizininformatikinitiative/torch/service/DirectResourceLoader.java b/src/main/java/de/medizininformatikinitiative/torch/service/DirectResourceLoader.java index aeb2cc9b..f026aac6 100644 --- a/src/main/java/de/medizininformatikinitiative/torch/service/DirectResourceLoader.java +++ b/src/main/java/de/medizininformatikinitiative/torch/service/DirectResourceLoader.java @@ -3,7 +3,6 @@ import de.medizininformatikinitiative.torch.consent.ConsentValidator; import de.medizininformatikinitiative.torch.exceptions.MustHaveViolatedException; import de.medizininformatikinitiative.torch.exceptions.PatientIdNotFoundException; -import de.medizininformatikinitiative.torch.management.StructureDefinitionHandler; import de.medizininformatikinitiative.torch.model.consent.PatientBatchWithConsent; import de.medizininformatikinitiative.torch.model.crtdl.annotated.AnnotatedAttributeGroup; import de.medizininformatikinitiative.torch.model.fhir.Query; @@ -42,17 +41,14 @@ public class DirectResourceLoader { private final ConsentValidator consentValidator; private final DseMappingTreeBase dseMappingTreeBase; - private final StructureDefinitionHandler structureDefinitionsHandler; private final ProfileMustHaveChecker profileMustHaveChecker; @Autowired - public DirectResourceLoader(DataStore dataStore, DseMappingTreeBase dseMappingTreeBase, StructureDefinitionHandler structureDefinitionHandler, ProfileMustHaveChecker profileMustHaveChecker, ConsentValidator validator) { + public DirectResourceLoader(DataStore dataStore, DseMappingTreeBase dseMappingTreeBase, ProfileMustHaveChecker profileMustHaveChecker, ConsentValidator validator) { this.dataStore = dataStore; this.consentValidator = validator; this.dseMappingTreeBase = dseMappingTreeBase; - this.structureDefinitionsHandler = structureDefinitionHandler; this.profileMustHaveChecker = profileMustHaveChecker; - } /** @@ -74,7 +70,17 @@ public Mono directLoadPatientCompartment( private Mono processBatchWithConsent(List attributeGroups, PatientBatchWithConsent patientBatchWithConsent) { Set safeSet = new ConcurrentSkipListSet<>(patientBatchWithConsent.patientBatch().ids()); - return processPatientAttributeGroups(attributeGroups, patientBatchWithConsent, safeSet).map(bundle -> patientBatchWithConsent.keep(safeSet)); + return processPatientAttributeGroups(attributeGroups, patientBatchWithConsent, safeSet) + .doOnNext(bundle -> { + logger.debug(" {} out of {} patients passed consent checks", + safeSet.size(), + patientBatchWithConsent.patientBatch().ids().size()); + + if (logger.isTraceEnabled()) { + logger.trace("Surviving patient IDs: {}", String.join(", ", safeSet)); + } + }) + .map(bundle -> patientBatchWithConsent.keep(safeSet)); } private Flux groupQueries(AnnotatedAttributeGroup group) { @@ -123,7 +129,6 @@ public Mono processCoreAttributeGroup(AnnotatedAttributeGroup group, Resou }).then(Mono.defer(() -> { if (atLeastOneResource.get()) { return Mono.empty(); - } else { logger.error("MustHave violated for group: {}", group.groupReference()); return Mono.error(new MustHaveViolatedException("MustHave requirement violated for group: " + group.id())); @@ -179,11 +184,10 @@ private Mono processPatientSingleAttributeGroup(Annotat .doOnNext(tuple -> { PatientResourceBundle bundle = mutableBundles.get(tuple.patientId); if (profileMustHaveChecker.fulfilled(tuple.resource, group)) { - safeGroup.add(tuple.patientId); - System.out.println("Ref" + group.groupReference() + "Type" + group.resourceType() + " ID " + group.id()); bundle.put(tuple.resource, group.id(), true); } else { + logger.trace("Resource {} has not fulfilled must have checks for group {}", tuple.resource.getId(), group.id()); bundle.put(tuple.resource, group.id(), false); } }) diff --git a/src/main/java/de/medizininformatikinitiative/torch/service/ReferenceBundleLoader.java b/src/main/java/de/medizininformatikinitiative/torch/service/ReferenceBundleLoader.java index 2b1cd546..4d7f0ee6 100644 --- a/src/main/java/de/medizininformatikinitiative/torch/service/ReferenceBundleLoader.java +++ b/src/main/java/de/medizininformatikinitiative/torch/service/ReferenceBundleLoader.java @@ -2,7 +2,6 @@ import de.medizininformatikinitiative.torch.consent.ConsentValidator; import de.medizininformatikinitiative.torch.exceptions.ConsentViolatedException; -import de.medizininformatikinitiative.torch.exceptions.DataStoreException; import de.medizininformatikinitiative.torch.exceptions.PatientIdNotFoundException; import de.medizininformatikinitiative.torch.exceptions.ReferenceToPatientException; import de.medizininformatikinitiative.torch.management.CompartmentManager; @@ -69,21 +68,11 @@ public Mono fetchUnknownResources( } notLoaded.forEach(unloaded -> { if (compartmentManager.isInCompartment(unloaded) && patientBundle != null) { - patientBundle.bundle().put(unloaded); + patientBundle.put(unloaded); } else { coreBundle.put(unloaded); } }); - }).onErrorResume(DataStoreException.class, e -> { - logger.error("Failed to fetch resources, marking all as invalid: {}", e.getMessage()); - unknownReferences.forEach(ref -> { - if (compartmentManager.isInCompartment(ref) && patientBundle != null) { - patientBundle.bundle().put(ref); - } else { - coreBundle.put(ref); - } - }); - return Mono.empty(); }).then(); } diff --git a/src/main/java/de/medizininformatikinitiative/torch/service/ReferenceResolver.java b/src/main/java/de/medizininformatikinitiative/torch/service/ReferenceResolver.java index bcb37944..28a1f64e 100644 --- a/src/main/java/de/medizininformatikinitiative/torch/service/ReferenceResolver.java +++ b/src/main/java/de/medizininformatikinitiative/torch/service/ReferenceResolver.java @@ -106,8 +106,9 @@ public Mono resolvePatient( ResourceBundle coreBundle, boolean applyConsent, Map groupMap) { - - return Mono.just(patientBundle.bundle().getValidResourceGroups()) + int groupValidity = patientBundle.getValidResourceGroups().size(); + logger.trace("Resolving Patient Resource Bundle {} with {} valid groups", patientBundle.patientId(), groupValidity); + return Mono.just(patientBundle.getValidResourceGroups()) .map(groups -> groups.stream() .filter(compartmentManager::isInCompartment) // your custom filter logic .collect(Collectors.toSet())) @@ -216,14 +217,10 @@ private Map.Entry> processResourceGroup( } Optional resource = isPatientResource - ? patientBundle.bundle().get(resourceGroup.resourceId()) + ? patientBundle.get(resourceGroup.resourceId()) : coreBundle.get(resourceGroup.resourceId()); - if (resource.isPresent()) { - return extractReferences(resourceGroup, resource.get(), groupMap, processingBundle); - } else { - return handleMissingResource(resourceGroup, processingBundle); - } + return resource.map(value -> extractReferences(resourceGroup, value, groupMap, processingBundle)).orElseGet(() -> handleMissingResource(resourceGroup, processingBundle)); } /** @@ -253,9 +250,7 @@ private Map.Entry> extractReferences( List extracted = referenceExtractor.extract(resource, groupMap, resourceGroup.groupId()); return Map.entry(resourceGroup, extracted); } catch (MustHaveViolatedException e) { - synchronized (processingBundle) { - processingBundle.addResourceGroupValidity(resourceGroup, false); - } + processingBundle.addResourceGroupValidity(resourceGroup, false); return Map.entry(resourceGroup, Collections.emptyList()); } } @@ -270,11 +265,8 @@ private Map.Entry> extractReferences( private Map.Entry> handleMissingResource( ResourceGroup resourceGroup, ResourceBundle processingBundle) { - - synchronized (processingBundle) { - logger.warn("Empty resource marked as valid for group {}", resourceGroup); - processingBundle.addResourceGroupValidity(resourceGroup, false); - } + logger.warn("Empty resource marked as valid for group {}", resourceGroup); + processingBundle.addResourceGroupValidity(resourceGroup, false); return Map.entry(resourceGroup, Collections.emptyList()); } diff --git a/src/test/java/de/medizininformatikinitiative/torch/service/ReferenceResolverIT.java b/src/test/java/de/medizininformatikinitiative/torch/service/ReferenceResolverIT.java index 794085e1..12a756df 100644 --- a/src/test/java/de/medizininformatikinitiative/torch/service/ReferenceResolverIT.java +++ b/src/test/java/de/medizininformatikinitiative/torch/service/ReferenceResolverIT.java @@ -257,7 +257,7 @@ class KnownReferences { @Test - void notFetchingResources() { + void processResourceGroups_notFetchingResources() { Map attributeGroupMap = new HashMap<>() {{ put("Patient1", patientGroup); put("Condition1", conditionGroup); @@ -273,7 +273,7 @@ void notFetchingResources() { patientBundle.put(new ResourceGroupWrapper(condition, Set.of("Condition1"))); patientBundle.bundle().addResourceGroupValidity(new ResourceGroup("Condition/2", "Condition1"), true); - var result = referenceResolver.processResourceGroups(patientBundle.bundle().getValidResourceGroups(), patientBundle, coreBundle, false, attributeGroupMap); + var result = referenceResolver.processResourceGroups(patientBundle.getValidResourceGroups(), patientBundle, coreBundle, false, attributeGroupMap); StepVerifier.create(result) @@ -281,6 +281,48 @@ void notFetchingResources() { .verifyComplete(); } + @Test + void processResourceGroups_mustHaveViolation() { + Map attributeGroupMap = new HashMap<>() {{ + put("Patient1", patientGroup); + put("Condition1", conditionGroup); + }}; + PatientResourceBundle patientBundle = new PatientResourceBundle("VHF00006"); + ResourceBundle coreBundle = new ResourceBundle(); + Patient patient = parser.parseResource(Patient.class, PATIENT); + Condition condition = parser.parseResource(Condition.class, CONDITION).setSubject(null); // marked as must-have + + patientBundle.put(new ResourceGroupWrapper(patient, Set.of())); + patientBundle.put(new ResourceGroupWrapper(condition, Set.of("Condition1"))); + patientBundle.bundle().addResourceGroupValidity(new ResourceGroup("Condition/2", "Condition1"), true); + + + var result = referenceResolver.processResourceGroups(patientBundle.getValidResourceGroups(), patientBundle, coreBundle, false, attributeGroupMap); + + + StepVerifier.create(result) + .verifyComplete(); + } + + @Test + void processResourceGroups_withMissingResource() { + Map attributeGroupMap = new HashMap<>() {{ + put("Patient1", patientGroup); + put("Condition1", conditionGroup); + }}; + PatientResourceBundle patientBundle = new PatientResourceBundle("VHF00006"); + ResourceBundle coreBundle = new ResourceBundle(); + + patientBundle.put("Condition/2"); // add an empty reference + patientBundle.bundle().addResourceGroupValidity(new ResourceGroup("Condition/2", "Condition1"), true); + + + var result = referenceResolver.processResourceGroups(patientBundle.getValidResourceGroups(), patientBundle, coreBundle, false, attributeGroupMap); + + + StepVerifier.create(result) + .verifyComplete(); + } @Test void loadReferences_success() { @@ -298,7 +340,7 @@ void loadReferences_success() { patientBundle.put(new ResourceGroupWrapper(patient, Set.of())); patientBundle.put(new ResourceGroupWrapper(condition, Set.of("Condition1"))); - var result = referenceResolver.loadReferencesByResourceGroup(patientBundle.bundle().getValidResourceGroups(), patientBundle, coreBundle, attributeGroupMap); + var result = referenceResolver.loadReferencesByResourceGroup(patientBundle.getValidResourceGroups(), patientBundle, coreBundle, attributeGroupMap); assertThat(result).containsExactly(Map.entry(new ResourceGroup("Condition/2", "Condition1"), List.of(new ReferenceWrapper(conditionSubject, List.of(PAT_REFERENCE), "Condition1", "Condition/2")))); }