Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import java.util.Collection;
import java.util.Optional;
import java.util.Set;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -85,4 +86,8 @@ public void put(String resourceReference) {
public void addStaticInfo(CachelessResourceBundle staticInfo) {
bundle.merge(staticInfo);
}

public Set<ResourceGroup> getValidResourceGroups() {
return bundle.getValidResourceGroups();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;

@Service
public class CrtdlProcessingService {
Expand Down Expand Up @@ -139,6 +140,7 @@ private Mono<Void> processIntern(AnnotatedCrtdl crtdl, String jobID, Flux<Patien
).then(
// Step 3: Write the Final Core Resource Bundle to File
Mono.defer(() -> {
logger.debug("Handling Final Core Bundle");
PatientResourceBundle corePatientBundle = new PatientResourceBundle("CORE", coreBundle);
PatientBatchWithConsent coreBundleBatch = new PatientBatchWithConsent(Map.of("CORE", corePatientBundle), false);
return writeBatch(jobID, batchCopierRedacter.transformBatch(coreBundleBatch, groupsToProcess.allGroups()));
Expand All @@ -147,10 +149,14 @@ private Mono<Void> processIntern(AnnotatedCrtdl crtdl, String jobID, Flux<Patien
}

private Mono<Void> processBatch(PatientBatchWithConsent batch, String jobID, GroupsToProcess groupsToProcess, ResourceBundle coreBundle) {
UUID id = UUID.randomUUID();
return directResourceLoader.directLoadPatientCompartment(groupsToProcess.directPatientCompartmentGroups(), batch)
.doOnNext(loadedBatch -> logger.debug("Directly loaded patient compartment for batch {} with {} patients", id, loadedBatch.patientIds().size()))
.flatMap(patientBatch -> referenceResolver.processSinglePatientBatch(patientBatch, coreBundle, groupsToProcess.allGroups()))
.map(patientBatch -> cascadingDelete.handlePatientBatch(patientBatch, groupsToProcess.allGroups()))
.doOnNext(loadedBatch -> logger.debug("Batch resolved references {} with {} patients", id, loadedBatch.patientIds().size()))
.map(patientBatch -> batchCopierRedacter.transformBatch(patientBatch, groupsToProcess.allGroups()))
.doOnNext(loadedBatch -> logger.debug("Batch finished extraction {} ", id))
.flatMap(patientBatch -> {
batchToCoreWriter.updateCore(patientBatch, coreBundle);
return writeBatch(jobID, patientBatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Stream;

Expand Down Expand Up @@ -154,8 +155,10 @@ private String serializeBatchBundle(Map<String, Set<String>> idsByType) {
*/
public <T extends Resource> Flux<T> search(Query query, Class<T> resourceType) {
var start = System.nanoTime();
logger.trace("Execute query: {}", query);

var queryId = UUID.randomUUID();
logger.debug("Executing query {} for resource type {}", queryId, query.type());
logger.trace("Full query: {}", query);
var counter = new AtomicInteger();
return client.post()
.uri("/" + query.type() + "/_search")
.contentType(APPLICATION_FORM_URLENCODED)
Expand All @@ -170,14 +173,15 @@ public <T extends Resource> Flux<T> search(Query query, Class<T> resourceType) {
.flatMap(bundle -> Flux.fromStream(bundle.getEntry().stream().map(Bundle.BundleEntryComponent::getResource)))
.flatMap(resource -> {
if (resourceType.isInstance(resource)) {
counter.incrementAndGet();
return Mono.just(resourceType.cast(resource));
} else {
logger.warn("Found miss match resource type {} querying type {}", resource.getClass().getSimpleName(), query.type());
return Mono.empty();
}
})
.doOnComplete(() -> logger.trace("Finished query `{}` in {} seconds.", query,
"%.1f".formatted(TimeUtils.durationSecondsSince(start))))
.doOnComplete(() -> logger.debug("Finished query `{}` in {} seconds with {} resources.", queryId,
"%.1f".formatted(TimeUtils.durationSecondsSince(start)), counter.get()))
.doOnError(e -> logger.error("Error while executing resource query `{}`: {}", query, e.getMessage()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import de.medizininformatikinitiative.torch.consent.ConsentValidator;
import de.medizininformatikinitiative.torch.exceptions.MustHaveViolatedException;
import de.medizininformatikinitiative.torch.exceptions.PatientIdNotFoundException;
import de.medizininformatikinitiative.torch.management.StructureDefinitionHandler;
import de.medizininformatikinitiative.torch.model.consent.PatientBatchWithConsent;
import de.medizininformatikinitiative.torch.model.crtdl.annotated.AnnotatedAttributeGroup;
import de.medizininformatikinitiative.torch.model.fhir.Query;
Expand Down Expand Up @@ -42,17 +41,14 @@ public class DirectResourceLoader {

private final ConsentValidator consentValidator;
private final DseMappingTreeBase dseMappingTreeBase;
private final StructureDefinitionHandler structureDefinitionsHandler;
private final ProfileMustHaveChecker profileMustHaveChecker;

@Autowired
public DirectResourceLoader(DataStore dataStore, DseMappingTreeBase dseMappingTreeBase, StructureDefinitionHandler structureDefinitionHandler, ProfileMustHaveChecker profileMustHaveChecker, ConsentValidator validator) {
public DirectResourceLoader(DataStore dataStore, DseMappingTreeBase dseMappingTreeBase, ProfileMustHaveChecker profileMustHaveChecker, ConsentValidator validator) {
this.dataStore = dataStore;
this.consentValidator = validator;
this.dseMappingTreeBase = dseMappingTreeBase;
this.structureDefinitionsHandler = structureDefinitionHandler;
this.profileMustHaveChecker = profileMustHaveChecker;

}

/**
Expand All @@ -74,7 +70,17 @@ public Mono<PatientBatchWithConsent> directLoadPatientCompartment(

private Mono<PatientBatchWithConsent> processBatchWithConsent(List<AnnotatedAttributeGroup> attributeGroups, PatientBatchWithConsent patientBatchWithConsent) {
Set<String> safeSet = new ConcurrentSkipListSet<>(patientBatchWithConsent.patientBatch().ids());
return processPatientAttributeGroups(attributeGroups, patientBatchWithConsent, safeSet).map(bundle -> patientBatchWithConsent.keep(safeSet));
return processPatientAttributeGroups(attributeGroups, patientBatchWithConsent, safeSet)
.doOnNext(bundle -> {
logger.debug(" {} out of {} patients passed consent checks",
safeSet.size(),
patientBatchWithConsent.patientBatch().ids().size());

if (logger.isTraceEnabled()) {
logger.trace("Surviving patient IDs: {}", String.join(", ", safeSet));
}
})
.map(bundle -> patientBatchWithConsent.keep(safeSet));
}

private Flux<Query> groupQueries(AnnotatedAttributeGroup group) {
Expand Down Expand Up @@ -123,7 +129,6 @@ public Mono<Void> processCoreAttributeGroup(AnnotatedAttributeGroup group, Resou
}).then(Mono.defer(() -> {
if (atLeastOneResource.get()) {
return Mono.empty();

} else {
logger.error("MustHave violated for group: {}", group.groupReference());
return Mono.error(new MustHaveViolatedException("MustHave requirement violated for group: " + group.id()));
Expand Down Expand Up @@ -179,11 +184,10 @@ private Mono<PatientBatchWithConsent> processPatientSingleAttributeGroup(Annotat
.doOnNext(tuple -> {
PatientResourceBundle bundle = mutableBundles.get(tuple.patientId);
if (profileMustHaveChecker.fulfilled(tuple.resource, group)) {

safeGroup.add(tuple.patientId);
System.out.println("Ref" + group.groupReference() + "Type" + group.resourceType() + " ID " + group.id());
bundle.put(tuple.resource, group.id(), true);
} else {
logger.trace("Resource {} has not fulfilled must have checks for group {}", tuple.resource.getId(), group.id());
bundle.put(tuple.resource, group.id(), false);
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import de.medizininformatikinitiative.torch.consent.ConsentValidator;
import de.medizininformatikinitiative.torch.exceptions.ConsentViolatedException;
import de.medizininformatikinitiative.torch.exceptions.DataStoreException;
import de.medizininformatikinitiative.torch.exceptions.PatientIdNotFoundException;
import de.medizininformatikinitiative.torch.exceptions.ReferenceToPatientException;
import de.medizininformatikinitiative.torch.management.CompartmentManager;
Expand Down Expand Up @@ -69,21 +68,11 @@ public Mono<Void> fetchUnknownResources(
}
notLoaded.forEach(unloaded -> {
if (compartmentManager.isInCompartment(unloaded) && patientBundle != null) {
patientBundle.bundle().put(unloaded);
patientBundle.put(unloaded);
} else {
coreBundle.put(unloaded);
}
});
}).onErrorResume(DataStoreException.class, e -> {
logger.error("Failed to fetch resources, marking all as invalid: {}", e.getMessage());
unknownReferences.forEach(ref -> {
if (compartmentManager.isInCompartment(ref) && patientBundle != null) {
patientBundle.bundle().put(ref);
} else {
coreBundle.put(ref);
}
});
return Mono.empty();
}).then();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ public Mono<PatientResourceBundle> resolvePatient(
ResourceBundle coreBundle,
boolean applyConsent,
Map<String, AnnotatedAttributeGroup> groupMap) {

return Mono.just(patientBundle.bundle().getValidResourceGroups())
int groupValidity = patientBundle.getValidResourceGroups().size();
logger.trace("Resolving Patient Resource Bundle {} with {} valid groups", patientBundle.patientId(), groupValidity);
return Mono.just(patientBundle.getValidResourceGroups())
.map(groups -> groups.stream()
.filter(compartmentManager::isInCompartment) // your custom filter logic
.collect(Collectors.toSet()))
Expand Down Expand Up @@ -216,14 +217,10 @@ private Map.Entry<ResourceGroup, List<ReferenceWrapper>> processResourceGroup(
}

Optional<Resource> resource = isPatientResource
? patientBundle.bundle().get(resourceGroup.resourceId())
? patientBundle.get(resourceGroup.resourceId())
: coreBundle.get(resourceGroup.resourceId());

if (resource.isPresent()) {
return extractReferences(resourceGroup, resource.get(), groupMap, processingBundle);
} else {
return handleMissingResource(resourceGroup, processingBundle);
}
return resource.map(value -> extractReferences(resourceGroup, value, groupMap, processingBundle)).orElseGet(() -> handleMissingResource(resourceGroup, processingBundle));
}

/**
Expand Down Expand Up @@ -253,9 +250,7 @@ private Map.Entry<ResourceGroup, List<ReferenceWrapper>> extractReferences(
List<ReferenceWrapper> extracted = referenceExtractor.extract(resource, groupMap, resourceGroup.groupId());
return Map.entry(resourceGroup, extracted);
} catch (MustHaveViolatedException e) {
synchronized (processingBundle) {
processingBundle.addResourceGroupValidity(resourceGroup, false);
}
processingBundle.addResourceGroupValidity(resourceGroup, false);
return Map.entry(resourceGroup, Collections.emptyList());
}
}
Expand All @@ -270,11 +265,8 @@ private Map.Entry<ResourceGroup, List<ReferenceWrapper>> extractReferences(
private Map.Entry<ResourceGroup, List<ReferenceWrapper>> handleMissingResource(
ResourceGroup resourceGroup,
ResourceBundle processingBundle) {

synchronized (processingBundle) {
logger.warn("Empty resource marked as valid for group {}", resourceGroup);
processingBundle.addResourceGroupValidity(resourceGroup, false);
}
logger.warn("Empty resource marked as valid for group {}", resourceGroup);
processingBundle.addResourceGroupValidity(resourceGroup, false);
return Map.entry(resourceGroup, Collections.emptyList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class KnownReferences {


@Test
void notFetchingResources() {
void processResourceGroups_notFetchingResources() {
Map<String, AnnotatedAttributeGroup> attributeGroupMap = new HashMap<>() {{
put("Patient1", patientGroup);
put("Condition1", conditionGroup);
Expand All @@ -273,14 +273,56 @@ void notFetchingResources() {
patientBundle.put(new ResourceGroupWrapper(condition, Set.of("Condition1")));
patientBundle.bundle().addResourceGroupValidity(new ResourceGroup("Condition/2", "Condition1"), true);

var result = referenceResolver.processResourceGroups(patientBundle.bundle().getValidResourceGroups(), patientBundle, coreBundle, false, attributeGroupMap);
var result = referenceResolver.processResourceGroups(patientBundle.getValidResourceGroups(), patientBundle, coreBundle, false, attributeGroupMap);


StepVerifier.create(result)
.assertNext(unprocessedResourceGroups -> assertThat(unprocessedResourceGroups).containsExactly(new ResourceGroup("Patient/VHF00006", "Patient1")))
.verifyComplete();
}

@Test
void processResourceGroups_mustHaveViolation() {
Map<String, AnnotatedAttributeGroup> attributeGroupMap = new HashMap<>() {{
put("Patient1", patientGroup);
put("Condition1", conditionGroup);
}};
PatientResourceBundle patientBundle = new PatientResourceBundle("VHF00006");
ResourceBundle coreBundle = new ResourceBundle();
Patient patient = parser.parseResource(Patient.class, PATIENT);
Condition condition = parser.parseResource(Condition.class, CONDITION).setSubject(null); // marked as must-have

patientBundle.put(new ResourceGroupWrapper(patient, Set.of()));
patientBundle.put(new ResourceGroupWrapper(condition, Set.of("Condition1")));
patientBundle.bundle().addResourceGroupValidity(new ResourceGroup("Condition/2", "Condition1"), true);


var result = referenceResolver.processResourceGroups(patientBundle.getValidResourceGroups(), patientBundle, coreBundle, false, attributeGroupMap);


StepVerifier.create(result)
.verifyComplete();
}

@Test
void processResourceGroups_withMissingResource() {
Map<String, AnnotatedAttributeGroup> attributeGroupMap = new HashMap<>() {{
put("Patient1", patientGroup);
put("Condition1", conditionGroup);
}};
PatientResourceBundle patientBundle = new PatientResourceBundle("VHF00006");
ResourceBundle coreBundle = new ResourceBundle();

patientBundle.put("Condition/2"); // add an empty reference
patientBundle.bundle().addResourceGroupValidity(new ResourceGroup("Condition/2", "Condition1"), true);


var result = referenceResolver.processResourceGroups(patientBundle.getValidResourceGroups(), patientBundle, coreBundle, false, attributeGroupMap);


StepVerifier.create(result)
.verifyComplete();
}

@Test
void loadReferences_success() {
Expand All @@ -298,7 +340,7 @@ void loadReferences_success() {
patientBundle.put(new ResourceGroupWrapper(patient, Set.of()));
patientBundle.put(new ResourceGroupWrapper(condition, Set.of("Condition1")));

var result = referenceResolver.loadReferencesByResourceGroup(patientBundle.bundle().getValidResourceGroups(), patientBundle, coreBundle, attributeGroupMap);
var result = referenceResolver.loadReferencesByResourceGroup(patientBundle.getValidResourceGroups(), patientBundle, coreBundle, attributeGroupMap);

assertThat(result).containsExactly(Map.entry(new ResourceGroup("Condition/2", "Condition1"), List.of(new ReferenceWrapper(conditionSubject, List.of(PAT_REFERENCE), "Condition1", "Condition/2"))));
}
Expand Down