diff --git a/src/main/java/de/medizininformatikinitiative/torch/config/AppConfig.java b/src/main/java/de/medizininformatikinitiative/torch/config/AppConfig.java index d1403a45..e3cb08ee 100644 --- a/src/main/java/de/medizininformatikinitiative/torch/config/AppConfig.java +++ b/src/main/java/de/medizininformatikinitiative/torch/config/AppConfig.java @@ -33,6 +33,7 @@ import de.medizininformatikinitiative.torch.util.Redaction; import de.medizininformatikinitiative.torch.util.ReferenceExtractor; import de.medizininformatikinitiative.torch.util.ReferenceHandler; +import de.medizininformatikinitiative.torch.util.ResourceGroupValidator; import de.medizininformatikinitiative.torch.util.ResourceReader; import de.medizininformatikinitiative.torch.util.ResultFileManager; import de.numcodex.sq2cql.Translator; @@ -126,8 +127,13 @@ public ProcessedGroupFactory attributeGroupProcessor(CompartmentManager manager) @Bean - ReferenceHandler referenceHandler(DataStore dataStore, ProfileMustHaveChecker mustHaveChecker, CompartmentManager compartmentManager, ConsentValidator consentValidator) { - return new ReferenceHandler(mustHaveChecker); + ReferenceHandler referenceHandler(ResourceGroupValidator resourceGroupValidator) { + return new ReferenceHandler(resourceGroupValidator); + } + + @Bean + ResourceGroupValidator resourceGroupValidator(ProfileMustHaveChecker profileMustHaveChecker) { + return new ResourceGroupValidator(profileMustHaveChecker); } @Bean diff --git a/src/main/java/de/medizininformatikinitiative/torch/exceptions/ResourceTypeMissmatchException.java b/src/main/java/de/medizininformatikinitiative/torch/exceptions/ResourceTypeMissmatchException.java deleted file mode 100644 index a79dc530..00000000 --- a/src/main/java/de/medizininformatikinitiative/torch/exceptions/ResourceTypeMissmatchException.java +++ /dev/null @@ -1,8 +0,0 @@ -package de.medizininformatikinitiative.torch.exceptions; - -public class ResourceTypeMissmatchException extends Exception { - - public ResourceTypeMissmatchException(String errorMessage) { - super(errorMessage); - } -} diff --git a/src/main/java/de/medizininformatikinitiative/torch/service/CrtdlProcessingService.java b/src/main/java/de/medizininformatikinitiative/torch/service/CrtdlProcessingService.java index af345e3c..c78d410a 100644 --- a/src/main/java/de/medizininformatikinitiative/torch/service/CrtdlProcessingService.java +++ b/src/main/java/de/medizininformatikinitiative/torch/service/CrtdlProcessingService.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Optional; @Service public class CrtdlProcessingService { @@ -117,14 +118,15 @@ private Mono processIntern(AnnotatedCrtdl crtdl, String jobID, Flux - batches - .flatMap(batch -> crtdl.consentKey() - .map(s -> consentHandler.fetchAndBuildConsentInfo(s, batch)) - .orElse(Mono.just(PatientBatchWithConsent.fromBatch(batch))) - .onErrorResume(ConsentViolatedException.class, ex -> Mono.empty()) - , maxConcurrency) //skip batches without consenting patient - .doOnNext(patientBatch -> patientBatch.addStaticInfo(coreSnapshot)) - .flatMap(batch -> processBatch(batch, jobID, groupsToProcess, coreBundle), maxConcurrency) + batches.flatMap(batch -> + processBatch( + batch, + jobID, + groupsToProcess, + coreBundle, + crtdl.consentKey(), + coreSnapshot + ), maxConcurrency) ).then( // Step 3: Write the Final Core Resource Bundle to File Mono.defer(() -> { @@ -135,15 +137,34 @@ private Mono processIntern(AnnotatedCrtdl crtdl, String jobID, Flux processBatch(PatientBatchWithConsent batch, String jobID, GroupsToProcess groupsToProcess, ResourceBundle coreBundle) { - return directResourceLoader.directLoadPatientCompartment(groupsToProcess.directPatientCompartmentGroups(), batch) - .flatMap(patientBatch -> referenceResolver.processSinglePatientBatch(patientBatch, coreBundle, groupsToProcess.allGroups())) - .map(patientBatch -> cascadingDelete.handlePatientBatch(patientBatch, groupsToProcess.allGroups())) - .map(patientBatch -> batchCopierRedacter.transformBatch(patientBatch, groupsToProcess.allGroups())) - .flatMap(patientBatch -> { - batchToCoreWriter.updateCore(patientBatch, coreBundle); - return writeBatch(jobID, patientBatch); - } + private Mono processBatch( + PatientBatch batch, + String jobID, + GroupsToProcess groupsToProcess, + ResourceBundle coreBundle, + Optional consentKey, + CachelessResourceBundle coreSnapshot + ) { + // Fetch consent (or assume consent if key is empty) + Mono withConsent = consentKey + .map(key -> consentHandler.fetchAndBuildConsentInfo(key, batch)) + .orElse(Mono.just(PatientBatchWithConsent.fromBatch(batch))) + .onErrorResume(ConsentViolatedException.class, ex -> { + logger.debug("Skipping batch due to consent violation: {}", ex.getMessage()); + return Mono.empty(); + }); + + return withConsent + .doOnNext(b -> b.addStaticInfo(coreSnapshot)) + .flatMap(consented -> + directResourceLoader.directLoadPatientCompartment(groupsToProcess.directPatientCompartmentGroups(), consented) + .flatMap(loaded -> referenceResolver.processSinglePatientBatch(loaded, coreBundle, groupsToProcess.allGroups())) + .map(processed -> cascadingDelete.handlePatientBatch(processed, groupsToProcess.allGroups())) + .map(transformed -> batchCopierRedacter.transformBatch(transformed, groupsToProcess.allGroups())) + .flatMap(finalBatch -> { + batchToCoreWriter.updateCore(finalBatch, coreBundle); + return writeBatch(jobID, finalBatch); + }) ); } diff --git a/src/main/java/de/medizininformatikinitiative/torch/service/ReferenceResolver.java b/src/main/java/de/medizininformatikinitiative/torch/service/ReferenceResolver.java index bcb37944..288b4d13 100644 --- a/src/main/java/de/medizininformatikinitiative/torch/service/ReferenceResolver.java +++ b/src/main/java/de/medizininformatikinitiative/torch/service/ReferenceResolver.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -62,14 +63,11 @@ public ReferenceResolver(CompartmentManager compartmentManager, */ public Mono resolveCoreBundle(ResourceBundle coreBundle, Map groupMap) { return Mono.just(coreBundle.getValidResourceGroups()) - .map(groups -> groups.stream() - .filter(resourceGroup -> !compartmentManager.isInCompartment(resourceGroup)) // your custom filter logic + .map(groups -> groups.stream().filter(resourceGroup -> !compartmentManager.isInCompartment(resourceGroup)) .collect(Collectors.toSet())) .expand(currentGroupSet -> processResourceGroups(currentGroupSet, null, coreBundle, false, groupMap) - .onErrorResume(e -> { - return Mono.empty(); // Skip this resource group on error - })) + .onErrorResume(e -> Mono.empty())) .then(Mono.just(coreBundle)); } @@ -85,7 +83,11 @@ public Mono resolveCoreBundle(ResourceBundle coreBundle, Map processSinglePatientBatch( PatientBatchWithConsent batch, ResourceBundle coreBundle, Map groupMap) { return Flux.fromIterable(batch.bundles().entrySet()) - .concatMap(entry -> resolvePatient(entry.getValue(), coreBundle, batch.applyConsent(), groupMap) + .concatMap(entry -> resolvePatient(entry.getValue(), coreBundle, batch.applyConsent(), groupMap).doOnNext(bundle -> { + if (bundle == null) { + logger.warn("Resolved PatientResourceBundle for key {} is null", entry.getKey()); + } + }).filter(Objects::nonNull) .map(updatedBundle -> Map.entry(entry.getKey(), updatedBundle))) .collectMap(Map.Entry::getKey, Map.Entry::getValue) .map(updatedBundles -> new PatientBatchWithConsent(updatedBundles, batch.applyConsent())); @@ -116,9 +118,7 @@ public Mono resolvePatient( .onErrorResume(e -> { logger.warn("Error processing resource group set {} in PatientBundle: {}", currentGroupSet, e.getMessage()); return Mono.empty(); // Skip this group on error - }) - ) - .then(Mono.just(patientBundle)); + })).then(Mono.just(patientBundle)); } /** @@ -143,27 +143,18 @@ public Mono> processResourceGroups( return bundleLoader.fetchUnknownResources(referencesGroupedByResourceGroup, patientBundle, coreBundle, applyConsent) .thenMany( - Flux.fromIterable(referencesGroupedByResourceGroup.entrySet()) + Flux.fromIterable(referencesGroupedByResourceGroup.entrySet()).filter(Objects::nonNull) + .filter(entry -> entry.getKey() != null && entry.getValue() != null && !entry.getValue().isEmpty()) .concatMap(entry -> - { - try { - return referenceHandler.handleReferences( - entry.getValue(), - patientBundle, - coreBundle, - groupMap - ); - } catch (MustHaveViolatedException e) { - return Flux.empty(); - } - } - ) - ) + Flux.fromIterable(referenceHandler.handleReferences( + entry.getValue(), + patientBundle, + coreBundle, + groupMap)))) .collect(Collectors.toSet()) .flatMap(set -> set.isEmpty() ? Mono.empty() : Mono.just(set)); } - /** * Extracts for every ResourceGroup the ReferenceWrappers and collects them ordered by * @@ -179,9 +170,18 @@ public Map> loadReferencesByResourceGroup( ResourceBundle coreBundle, Map groupMap) { - return resourceGroups.parallelStream() - .map(resourceGroup -> processResourceGroup(resourceGroup, patientBundle, coreBundle, groupMap)) - .filter(entry -> !entry.getValue().isEmpty()) + return resourceGroups.stream() + .map(resourceGroup -> processResourceGroup(resourceGroup, patientBundle, coreBundle, groupMap)).map(entry -> { + if (entry == null) { + logger.warn("Null entry returned by processResourceGroup"); + } else if (entry.getKey() == null) { + logger.warn("Entry with null key for resource group {}", entry); + } else if (entry.getValue() == null || entry.getValue().isEmpty()) { + logger.info("No references extracted for resource group {}", entry.getKey()); + } + return entry; + }).filter(Objects::nonNull) + .filter(entry -> entry.getKey() != null && entry.getValue() != null && !entry.getValue().isEmpty()) .collect(Collectors.toMap( Map.Entry::getKey, Map.Entry::getValue, @@ -219,11 +219,7 @@ private Map.Entry> processResourceGroup( ? patientBundle.bundle().get(resourceGroup.resourceId()) : coreBundle.get(resourceGroup.resourceId()); - if (resource.isPresent()) { - return extractReferences(resourceGroup, resource.get(), groupMap, processingBundle); - } else { - return handleMissingResource(resourceGroup, processingBundle); - } + return resource.map(value -> extractReferences(resourceGroup, value, groupMap, processingBundle)).orElseGet(() -> handleMissingResource(resourceGroup, processingBundle)); } /** @@ -253,9 +249,7 @@ private Map.Entry> extractReferences( List extracted = referenceExtractor.extract(resource, groupMap, resourceGroup.groupId()); return Map.entry(resourceGroup, extracted); } catch (MustHaveViolatedException e) { - synchronized (processingBundle) { - processingBundle.addResourceGroupValidity(resourceGroup, false); - } + processingBundle.addResourceGroupValidity(resourceGroup, false); return Map.entry(resourceGroup, Collections.emptyList()); } } @@ -270,11 +264,8 @@ private Map.Entry> extractReferences( private Map.Entry> handleMissingResource( ResourceGroup resourceGroup, ResourceBundle processingBundle) { - - synchronized (processingBundle) { - logger.warn("Empty resource marked as valid for group {}", resourceGroup); - processingBundle.addResourceGroupValidity(resourceGroup, false); - } + logger.warn("Empty resource marked as valid for group {}", resourceGroup); + processingBundle.addResourceGroupValidity(resourceGroup, false); return Map.entry(resourceGroup, Collections.emptyList()); } diff --git a/src/main/java/de/medizininformatikinitiative/torch/util/ReferenceHandler.java b/src/main/java/de/medizininformatikinitiative/torch/util/ReferenceHandler.java index 8648bf6c..ebad1488 100644 --- a/src/main/java/de/medizininformatikinitiative/torch/util/ReferenceHandler.java +++ b/src/main/java/de/medizininformatikinitiative/torch/util/ReferenceHandler.java @@ -7,13 +7,12 @@ import de.medizininformatikinitiative.torch.model.management.ResourceAttribute; import de.medizininformatikinitiative.torch.model.management.ResourceBundle; import de.medizininformatikinitiative.torch.model.management.ResourceGroup; -import org.hl7.fhir.r4.model.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -22,24 +21,27 @@ public class ReferenceHandler { private static final Logger logger = LoggerFactory.getLogger(ReferenceHandler.class); - private final ProfileMustHaveChecker profileMustHaveChecker; + private final ResourceGroupValidator resourceGroupValidator; - public ReferenceHandler(ProfileMustHaveChecker profileMustHaveChecker) { - this.profileMustHaveChecker = profileMustHaveChecker; + public ReferenceHandler(ResourceGroupValidator resourceGroupValidator) { + this.resourceGroupValidator = resourceGroupValidator; } - private static Flux> checkReferenceViolatesMustHave(ReferenceWrapper referenceWrapper, List list, ResourceBundle processingBundle) { + private static List checkReferenceViolatesMustHave(ReferenceWrapper referenceWrapper, List list, ResourceBundle processingBundle) throws MustHaveViolatedException { ResourceAttribute referenceAttribute = referenceWrapper.toResourceAttributeGroup(); if (referenceWrapper.refAttribute().mustHave() && list.isEmpty()) { processingBundle.setResourceAttributeInValid(referenceAttribute); - return Flux.error(new MustHaveViolatedException( + throw new MustHaveViolatedException( "MustHave condition violated: No valid references were resolved for " + referenceWrapper.references() - )); + ); + } + if (referenceWrapper.references().isEmpty()) { + return List.of(); } processingBundle.setResourceAttributeValid(referenceAttribute); - return Flux.just(list); + return list; } /** @@ -49,47 +51,57 @@ private static Flux> checkReferenceViolatesMustHave(Referenc * @param groupMap cache containing all known attributeGroups * @return newly added ResourceGroups to be processed */ - public Flux handleReferences(List references, + public List handleReferences(List references, @Nullable PatientResourceBundle patientBundle, ResourceBundle coreBundle, - Map groupMap) throws MustHaveViolatedException { + Map groupMap) { ResourceBundle processingBundle = (patientBundle != null) ? patientBundle.bundle() : coreBundle; ResourceGroup parentGroup = new ResourceGroup(references.getFirst().resourceId(), references.getFirst().groupId()); - List unprocessedReferences = filterUnprocessedReferences(references, processingBundle); - Set knownGroups = processingBundle.getKnownResourceGroups(); - return Flux.fromIterable(unprocessedReferences) - .concatMap(ref -> handleReference(ref, patientBundle, coreBundle, groupMap).doOnNext( - resourceGroupList -> { - ResourceAttribute referenceAttribute = ref.toResourceAttributeGroup(); - resourceGroupList.forEach(resourceGroup -> processingBundle.addAttributeToChild(referenceAttribute, resourceGroup)); + try { + List unprocessedReferences = filterUnprocessedReferences(references, processingBundle); + Set knownGroups = processingBundle.getKnownResourceGroups(); + return unprocessedReferences.stream() + // map each reference to a list of ResourceGroups + .map(ref -> { + List resourceGroupList; + try { + resourceGroupList = handleReference(ref, patientBundle, coreBundle, groupMap); + } catch (MustHaveViolatedException e) { + processingBundle.addResourceGroupValidity(parentGroup, false); + return Collections.emptyList(); } - )) - .collectList() - .flatMapMany(results -> Flux.fromIterable(results.stream() - .flatMap(List::stream) - .toList())) - .filter(group -> !knownGroups.contains(group)) - .onErrorResume(MustHaveViolatedException.class, e -> { - processingBundle.addResourceGroupValidity(parentGroup, false); - logger.warn("MustHaveViolatedException occurred. Stopping resource processing: {}", e.getMessage()); - return Flux.empty(); - }); + ResourceAttribute referenceAttribute = ref.toResourceAttributeGroup(); + // side effect: add attribute to each resource group + resourceGroupList.forEach(rg -> processingBundle.addAttributeToChild(referenceAttribute, rg)); + return resourceGroupList; + }) + .flatMap(List::stream) + .filter(group -> !knownGroups.contains(group)) + .toList(); + } catch (MustHaveViolatedException e) { + processingBundle.addResourceGroupValidity(parentGroup, false); + logger.warn("MustHaveViolatedException occurred. Stopping resource processing: {}", e.getMessage()); + return List.of(); + } } /** * Handles a ReferenceWrapper by resolving its references and updating the processing bundle. * + *

+ * Checks if referenceAttribute + * * @param referenceWrapper The reference wrapper to handle. * @param patientBundle The patient bundle being updated, if present. * @param coreBundle to be updated and queried, that contains a centrally shared concurrent HashMap. * @param groupMap Map of attribute groups for validation. * @return A Flux emitting a list of ResourceGroups corresponding to the resolved references. */ - public Flux> handleReference(ReferenceWrapper referenceWrapper, - @Nullable PatientResourceBundle patientBundle, - ResourceBundle coreBundle, - Map groupMap) { + public List handleReference(ReferenceWrapper referenceWrapper, + @Nullable PatientResourceBundle patientBundle, + ResourceBundle coreBundle, + Map groupMap) throws MustHaveViolatedException { ResourceBundle processingBundle = patientBundle != null ? patientBundle.bundle() : coreBundle; @@ -104,7 +116,7 @@ public Flux> handleReference(ReferenceWrapper referenceWrapp }) .filter(Objects::nonNull) .filter(Optional::isPresent) - .flatMap(resource -> collectValidGroups(referenceWrapper, groupMap, resource.get(), processingBundle).stream()) + .flatMap(resource -> resourceGroupValidator.collectValidGroups(referenceWrapper, groupMap, resource.get(), processingBundle).stream()) .toList(); // Now run your must-have validation and wrap in Flux @@ -112,37 +124,6 @@ public Flux> handleReference(ReferenceWrapper referenceWrapp } - /** - * Collects all valid resourceGroups for the currently processed ResourceBundle. - *

For a given reference and resource checks if already a valid group in processingBundle. - * If resourceGroups not assigned yet, executes filter, musthave (Without References) and profile checks. - * - * @param groupMap known attribute groups - * @param resource Resource to be checked - * @param processingBundle bundle that is currently processed - * @return ResourceGroup if previously unknown and assignable to the group. - */ - private List collectValidGroups(ReferenceWrapper referenceWrapper, Map groupMap, Resource resource, ResourceBundle processingBundle) { - return referenceWrapper.refAttribute().linkedGroups().stream() - .map(groupId -> { - ResourceGroup resourceGroup = new ResourceGroup(ResourceUtils.getRelativeURL(resource), groupId); - Boolean isValid = processingBundle.isValidResourceGroup(resourceGroup); - if (isValid == null) { - AnnotatedAttributeGroup group = groupMap.get(groupId); - boolean fulfilled = profileMustHaveChecker.fulfilled(resource, group); - if (group.compiledFilter() != null) { - fulfilled = fulfilled && group.compiledFilter().test(resource); - } - logger.trace("Group {} for Reference: {}", groupId, fulfilled); - isValid = fulfilled; - processingBundle.addResourceGroupValidity(resourceGroup, isValid); - } - return isValid ? resourceGroup : null; - }) - .filter(Objects::nonNull) - .toList(); - } - /** * Iterates over the references to find unprocessed reference wrappers. * @@ -163,19 +144,15 @@ private List filterUnprocessedReferences(List For a given reference and resource checks if already a valid group in processingBundle. + * If resourceGroups not assigned yet, executes filter, musthave (Without References) and profile checks. + * + * @param groupMap known attribute groups + * @param resource Resource to be checked + * @param processingBundle bundle that is currently processed + * @return ResourceGroup if previously unknown and assignable to the group. + */ + public List collectValidGroups(ReferenceWrapper referenceWrapper, Map groupMap, Resource resource, ResourceBundle processingBundle) { + return referenceWrapper.refAttribute().linkedGroups().stream() + .map(groupId -> { + ResourceGroup resourceGroup = new ResourceGroup(ResourceUtils.getRelativeURL(resource), groupId); + Boolean isValid = processingBundle.isValidResourceGroup(resourceGroup); + if (isValid == null) { + AnnotatedAttributeGroup group = groupMap.get(groupId); + boolean fulfilled = profileMustHaveChecker.fulfilled(resource, group); + if (group.compiledFilter() != null) { + fulfilled = fulfilled && group.compiledFilter().test(resource); + } + logger.trace("Group {} for Reference: {}", groupId, fulfilled); + isValid = fulfilled; + processingBundle.addResourceGroupValidity(resourceGroup, isValid); + } + return isValid ? resourceGroup : null; + }) + .filter(Objects::nonNull) + .toList(); + } + + +} diff --git a/src/main/java/de/medizininformatikinitiative/torch/util/ResourceUtils.java b/src/main/java/de/medizininformatikinitiative/torch/util/ResourceUtils.java index e0ccf35f..79f668cf 100644 --- a/src/main/java/de/medizininformatikinitiative/torch/util/ResourceUtils.java +++ b/src/main/java/de/medizininformatikinitiative/torch/util/ResourceUtils.java @@ -2,8 +2,15 @@ import de.medizininformatikinitiative.torch.TargetClassCreationException; import de.medizininformatikinitiative.torch.exceptions.PatientIdNotFoundException; -import de.medizininformatikinitiative.torch.model.management.ResourceGroupWrapper; -import org.hl7.fhir.r4.model.*; +import org.hl7.fhir.r4.model.Base; +import org.hl7.fhir.r4.model.Bundle; +import org.hl7.fhir.r4.model.Consent; +import org.hl7.fhir.r4.model.DomainResource; +import org.hl7.fhir.r4.model.ElementDefinition; +import org.hl7.fhir.r4.model.Extension; +import org.hl7.fhir.r4.model.Patient; +import org.hl7.fhir.r4.model.Resource; +import org.hl7.fhir.r4.model.StructureDefinition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,29 +111,11 @@ public static List getElementsByPath(String path, StructureDe return List.copyOf(matchingElements); } - public static List getElementById(String id, StructureDefinition.StructureDefinitionSnapshotComponent snapshot) { - if (id == null) { - return Collections.emptyList(); - } - List matchingElements = new ArrayList<>(); - for (ElementDefinition ed : snapshot.getElement()) { - if (id.equals(ed.getId())) { - matchingElements.add(ed); - } - } - return List.copyOf(matchingElements); - } - public static String getRelativeURL(Resource resource) { return resource.fhirType() + "/" + resource.getIdPart(); } - public static String getRelativeURL(ResourceGroupWrapper resourceWrapper) { - return getRelativeURL(resourceWrapper.resource()); - - } - /** * Creates a new instance of a FHIR DomainResource subclass. @@ -144,28 +133,6 @@ public static T createTargetResource(Class resourc } } - /** - * @param base base to be casted to its fhirtype - * @param fhirType e.g. Medication - * @return cast of base - */ - @SuppressWarnings("unchecked") - public static T castBaseToItsFhirType(Base base) { - String typeName = base.fhirType(); // e.g., "Patient", "Observation" - String basePackage = DomainResource.class.getPackage().getName(); // dynamically resolves package - - try { - Class clazz = Class.forName(basePackage + "." + typeName); - if (clazz.isInstance(base)) { - return (T) clazz.cast(base); - } else { - throw new IllegalArgumentException("Base is not instance of " + typeName); - } - } catch (ClassNotFoundException e) { - throw new RuntimeException("Unknown FHIR type: " + typeName, e); - } - } - /** * Reflects the method with one param for a given object used for setter and add extension methods * diff --git a/src/main/java/de/medizininformatikinitiative/torch/util/Slicing.java b/src/main/java/de/medizininformatikinitiative/torch/util/Slicing.java index 4481f001..532ed34c 100644 --- a/src/main/java/de/medizininformatikinitiative/torch/util/Slicing.java +++ b/src/main/java/de/medizininformatikinitiative/torch/util/Slicing.java @@ -1,6 +1,11 @@ package de.medizininformatikinitiative.torch.util; -import org.hl7.fhir.r4.model.*; +import org.hl7.fhir.r4.model.Base; +import org.hl7.fhir.r4.model.CanonicalType; +import org.hl7.fhir.r4.model.Element; +import org.hl7.fhir.r4.model.ElementDefinition; +import org.hl7.fhir.r4.model.StructureDefinition; +import org.hl7.fhir.r4.model.UriType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,21 +42,6 @@ public static Set checkSlicing(Base base, Set element } - /** - * Checks if the given element is a sliced element and returns the sliced element otherwise null. - * - * @param base Hapi Base (Element) which should be checked - * @param elementID Element ID of the above element. - * @param snapshotComponents set of Struturedefinitions of the Ressource to which the element belongs - * @return Returns empty set if no slicing is found and an elementdefinition for the slice otherwise - */ - public static Set checkSlicing(Base base, String elementID, Set snapshotComponents) { - - return snapshotComponents.stream().map(snapshotComponent -> checkSlicing(base, elementID, snapshotComponent)).filter(Objects::nonNull).collect(Collectors.toSet()); - - } - - /** * Checks if the given element is a sliced element and returns the sliced element otherwise null. * diff --git a/src/test/java/de/medizininformatikinitiative/torch/config/TestConfig.java b/src/test/java/de/medizininformatikinitiative/torch/config/TestConfig.java index 5b5c216d..25970c30 100644 --- a/src/test/java/de/medizininformatikinitiative/torch/config/TestConfig.java +++ b/src/test/java/de/medizininformatikinitiative/torch/config/TestConfig.java @@ -34,6 +34,7 @@ import de.medizininformatikinitiative.torch.util.Redaction; import de.medizininformatikinitiative.torch.util.ReferenceExtractor; import de.medizininformatikinitiative.torch.util.ReferenceHandler; +import de.medizininformatikinitiative.torch.util.ResourceGroupValidator; import de.medizininformatikinitiative.torch.util.ResourceReader; import de.medizininformatikinitiative.torch.util.ResultFileManager; import de.numcodex.sq2cql.Translator; @@ -111,8 +112,13 @@ ProfileMustHaveChecker mustHaveChecker(FhirContext ctx) { } @Bean - ReferenceHandler referenceHandler(DataStore dataStore, ProfileMustHaveChecker mustHaveChecker, CompartmentManager compartmentManager, ConsentValidator validator) { - return new ReferenceHandler(mustHaveChecker); + ReferenceHandler referenceHandler(ResourceGroupValidator resourceGroupValidator) { + return new ReferenceHandler(resourceGroupValidator); + } + + @Bean + ResourceGroupValidator resourceGroupValidator(ProfileMustHaveChecker profileMustHaveChecker) { + return new ResourceGroupValidator(profileMustHaveChecker); } @Bean diff --git a/src/test/java/de/medizininformatikinitiative/torch/util/ReferenceHandlerIT.java b/src/test/java/de/medizininformatikinitiative/torch/util/ReferenceHandlerIT.java index 26182d46..7c043f48 100644 --- a/src/test/java/de/medizininformatikinitiative/torch/util/ReferenceHandlerIT.java +++ b/src/test/java/de/medizininformatikinitiative/torch/util/ReferenceHandlerIT.java @@ -3,8 +3,7 @@ import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.parser.IParser; -import de.medizininformatikinitiative.torch.consent.ConsentValidator; -import de.medizininformatikinitiative.torch.management.CompartmentManager; +import de.medizininformatikinitiative.torch.exceptions.MustHaveViolatedException; import de.medizininformatikinitiative.torch.model.crtdl.annotated.AnnotatedAttribute; import de.medizininformatikinitiative.torch.model.crtdl.annotated.AnnotatedAttributeGroup; import de.medizininformatikinitiative.torch.model.management.PatientResourceBundle; @@ -12,7 +11,6 @@ import de.medizininformatikinitiative.torch.model.management.ResourceBundle; import de.medizininformatikinitiative.torch.model.management.ResourceGroup; import de.medizininformatikinitiative.torch.model.management.ResourceGroupWrapper; -import de.medizininformatikinitiative.torch.service.DataStore; import org.hl7.fhir.r4.model.Medication; import org.hl7.fhir.r4.model.Organization; import org.hl7.fhir.r4.model.Patient; @@ -22,11 +20,8 @@ import org.junit.jupiter.api.TestInstance; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ActiveProfiles; -import reactor.core.publisher.Flux; -import reactor.test.StepVerifier; import java.util.HashMap; import java.util.List; @@ -35,6 +30,7 @@ import static java.util.Map.entry; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; @ActiveProfiles("test") @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @@ -101,26 +97,14 @@ class ReferenceHandlerIT { public static final String PAT_REFERENCE = "Patient/VHF00006"; public static final String REFERENCE_MEDICATION = "Medication/testMedication"; - @MockBean - private DataStore dataStore; - - @Autowired - private ProfileMustHaveChecker profileMustHaveChecker; - @Autowired - private CompartmentManager compartmentManager; - - @MockBean - private ConsentValidator consentValidator; - + private ResourceGroupValidator resourceGroupValidator; private ReferenceHandler referenceHandler; private IParser parser; AnnotatedAttributeGroup patientGroup; - private Organization organization; - private AnnotatedAttribute medicationID; private AnnotatedAttribute referenceAttribute; Map attributeGroupMap = new HashMap<>(); @@ -134,7 +118,7 @@ void setUp() { AnnotatedAttribute conditionSubject = new AnnotatedAttribute("Condition.subject", "Condition.subject", "Condition.subject", true, List.of("Patient1")); AnnotatedAttributeGroup conditionGroup = new AnnotatedAttributeGroup("Condition1", "https://www.medizininformatik-initiative.de/fhir/core/modul-diagnose/StructureDefinition/Diagnose", List.of(conditionSubject), List.of(), null); - medicationID = new AnnotatedAttribute("Medication.id", "Medication.id", "Medication.id", true, List.of()); + AnnotatedAttribute medicationID = new AnnotatedAttribute("Medication.id", "Medication.id", "Medication.id", true, List.of()); AnnotatedAttribute medicationAdherence = new AnnotatedAttribute("Medication.adherence", "Medication.adherence", "Medication.adherence", true, List.of()); AnnotatedAttributeGroup medicationGroup = new AnnotatedAttributeGroup("Medication1", "https://www.medizininformatik-initiative.de/fhir/core/modul-medikation/StructureDefinition/Medication", List.of(medicationID), List.of(), null); AnnotatedAttributeGroup medicationGroup2 = new AnnotatedAttributeGroup("Medication2", "https://www.medizininformatik-initiative.de/fhir/core/modul-medikation/StructureDefinition/Medication", List.of(medicationID, medicationAdherence), List.of(), null); @@ -144,11 +128,11 @@ void setUp() { attributeGroupMap.put("Medication1", medicationGroup); attributeGroupMap.put("Medication2", medicationGroup2); - organization = new Organization(); + Organization organization = new Organization(); organization.setId("evilInc"); - this.referenceHandler = new ReferenceHandler(profileMustHaveChecker); + this.referenceHandler = new ReferenceHandler(resourceGroupValidator); this.parser = FhirContext.forR4().newJsonParser(); } @@ -156,16 +140,15 @@ void setUp() { class CoreBundleOnly { @Test - void resolveCoreBundle_success() { + void resolveCoreBundle_success() throws MustHaveViolatedException { referenceAttribute = new AnnotatedAttribute("Encounter.evidence", "Encounter.evidence", "Encounter.evidence", true, List.of("Medication1")); ResourceBundle coreBundle = new ResourceBundle(); Medication testResource = parser.parseResource(Medication.class, MEDICATION); coreBundle.put(new ResourceGroupWrapper(testResource, Set.of())); - Flux> result = referenceHandler.handleReference(new ReferenceWrapper(referenceAttribute, List.of(REFERENCE_MEDICATION), "EncounterGroup", "parent"), null, coreBundle, attributeGroupMap); + List result = referenceHandler.handleReference(new ReferenceWrapper(referenceAttribute, List.of(REFERENCE_MEDICATION), "EncounterGroup", "parent"), null, coreBundle, attributeGroupMap); + + assertThat(result.getFirst()).isEqualTo(new ResourceGroup(REFERENCE_MEDICATION, "Medication1")); - StepVerifier.create(result) - .assertNext(medication -> assertThat(medication.getFirst()).isEqualTo(new ResourceGroup(REFERENCE_MEDICATION, "Medication1"))) - .verifyComplete(); // Assuming the method returns a Map assertThat(coreBundle.resourceGroupValidity()) @@ -178,11 +161,7 @@ void resolveCoreBundleFail() { ResourceBundle coreBundle = new ResourceBundle(); Medication testResource = parser.parseResource(Medication.class, MEDICATION); coreBundle.put(new ResourceGroupWrapper(testResource, Set.of())); - Flux> result = referenceHandler.handleReference(new ReferenceWrapper(referenceAttribute, List.of(REFERENCE_MEDICATION), "EncounterGroup", "parent"), null, coreBundle, attributeGroupMap); - - StepVerifier.create(result) - .expectError() - .verify(); + assertThatThrownBy(() -> referenceHandler.handleReference(new ReferenceWrapper(referenceAttribute, List.of(REFERENCE_MEDICATION), "EncounterGroup", "parent"), null, coreBundle, attributeGroupMap)).isInstanceOf(MustHaveViolatedException.class); } } @@ -192,6 +171,7 @@ class PatientBundle { @Test void resolveCoreBundleResource_success() { referenceAttribute = new AnnotatedAttribute("Encounter.evidence", "Encounter.evidence", "Encounter.evidence", true, List.of("Medication1")); + var reference = new ReferenceWrapper(referenceAttribute, List.of(REFERENCE_MEDICATION), "EncounterGroup", "parent"); ResourceBundle coreBundle = new ResourceBundle(); Medication testResource = parser.parseResource(Medication.class, MEDICATION); coreBundle.put(new ResourceGroupWrapper(testResource, Set.of())); @@ -200,11 +180,11 @@ void resolveCoreBundleResource_success() { Patient testPatient = parser.parseResource(Patient.class, PATIENT); patientBundle.put(new ResourceGroupWrapper(testPatient, Set.of())); - Flux> result = referenceHandler.handleReference(new ReferenceWrapper(referenceAttribute, List.of(REFERENCE_MEDICATION), "EncounterGroup", "parent"), null, coreBundle, attributeGroupMap); + List result = referenceHandler.handleReferences(List.of(reference), null, coreBundle, attributeGroupMap); + + + assertThat(result).containsExactly(new ResourceGroup(REFERENCE_MEDICATION, "Medication1")); - StepVerifier.create(result) - .assertNext(medication -> assertThat(medication.getFirst()).isEqualTo(new ResourceGroup(REFERENCE_MEDICATION, "Medication1"))) - .verifyComplete(); // Assuming the method returns a Map assertThat(coreBundle.resourceGroupValidity()) @@ -217,11 +197,7 @@ void resolveCoreBundleFail() { ResourceBundle coreBundle = new ResourceBundle(); Medication testResource = parser.parseResource(Medication.class, MEDICATION); coreBundle.put(new ResourceGroupWrapper(testResource, Set.of())); - Flux> result = referenceHandler.handleReference(new ReferenceWrapper(referenceAttribute, List.of(PAT_REFERENCE), "EncounterGroup", "parent"), null, coreBundle, attributeGroupMap); - - StepVerifier.create(result) - .expectError() - .verify(); + assertThatThrownBy(() -> referenceHandler.handleReference(new ReferenceWrapper(referenceAttribute, List.of(PAT_REFERENCE), "EncounterGroup", "parent"), null, coreBundle, attributeGroupMap)).isInstanceOf(MustHaveViolatedException.class); } } diff --git a/src/test/java/de/medizininformatikinitiative/torch/util/ReferenceHandlerTest.java b/src/test/java/de/medizininformatikinitiative/torch/util/ReferenceHandlerTest.java index 3ef385c5..55ce062c 100644 --- a/src/test/java/de/medizininformatikinitiative/torch/util/ReferenceHandlerTest.java +++ b/src/test/java/de/medizininformatikinitiative/torch/util/ReferenceHandlerTest.java @@ -1,210 +1,140 @@ package de.medizininformatikinitiative.torch.util; -import de.medizininformatikinitiative.torch.consent.ConsentValidator; -import de.medizininformatikinitiative.torch.management.CompartmentManager; -import de.medizininformatikinitiative.torch.service.DataStore; +import de.medizininformatikinitiative.torch.exceptions.MustHaveViolatedException; +import de.medizininformatikinitiative.torch.model.crtdl.annotated.AnnotatedAttribute; +import de.medizininformatikinitiative.torch.model.management.PatientResourceBundle; +import de.medizininformatikinitiative.torch.model.management.ReferenceWrapper; +import de.medizininformatikinitiative.torch.model.management.ResourceAttribute; +import de.medizininformatikinitiative.torch.model.management.ResourceBundle; +import de.medizininformatikinitiative.torch.model.management.ResourceGroup; +import org.hl7.fhir.r4.model.Medication; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -@ExtendWith(MockitoExtension.class) -class ReferenceHandlerTest { +import java.util.HashMap; +import java.util.List; - @Mock - private DataStore dataStore; - - @Mock - private ProfileMustHaveChecker profileMustHaveChecker; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; - @Mock - private CompartmentManager compartmentManager; +@ExtendWith(MockitoExtension.class) +class ReferenceHandlerTest { @Mock - private ConsentValidator consentValidator; + private ResourceGroupValidator resourceGroupValidator; @InjectMocks private ReferenceHandler referenceHandler; @BeforeEach void setUp() { - referenceHandler = new ReferenceHandler(profileMustHaveChecker); + referenceHandler = new ReferenceHandler(resourceGroupValidator); } -/* + @Nested class CoreResource { @Test - void shouldResolveReferenceSuccessfully() { - - Resource coreResource = mock(Resource.class); - - - when(dataStore.fetchResourceByReference("Medication/123")).thenReturn(Mono.just(coreResource)); - - - when(compartmentManager.isInCompartment(coreResource)).thenReturn(false); - - - Mono result = referenceHandler.getResourceMono(null, true, "Medication/123"); - - - StepVerifier.create(result).assertNext(resource -> assertThat(resource).isNotNull()).verifyComplete(); - } - - @Test - void noCoreResourceFound() { - - Resource coreResource = mock(Resource.class); - - - when(dataStore.fetchResourceByReference("Medication/123")).thenReturn(Mono.just(coreResource)); - - - when(compartmentManager.isInCompartment(coreResource)).thenReturn(true); - - - Mono result = referenceHandler.getResourceMono(null, true, "Medication/123"); - - - StepVerifier.create(result).expectErrorMatches(throwable -> throwable instanceof ReferenceToPatientException && throwable.getMessage().contains("Patient Resource referenced in Core Bundle")).verify(); - } - - @Test - void shouldReturnEmptyWhenEmptyRespource() { - when(dataStore.fetchResourceByReference("Medication/123")).thenReturn(Mono.empty()); + void handleReferenceGroupNotLoadableResource() { + ResourceBundle coreBundle = new ResourceBundle(); + AnnotatedAttribute attribute1 = new AnnotatedAttribute("MedicationAdministration.medication", "MedicationAdministration.medication", "Condition.onset[x]", false); + var references = List.of(new ReferenceWrapper(attribute1, List.of("Medication/UnknownResource"), "group1", "test")); + coreBundle.put("Medication/UnknownResource"); - Mono result = referenceHandler.getResourceMono(null, true, "Medication/123"); + var result = referenceHandler.handleReferences(references, null, coreBundle, new HashMap<>()); - StepVerifier.create(result).expectComplete().verify(); + assertThat(result).isEmpty(); } - @Test - void shouldReturnErrorOnConnectionError() { - // Simulate a connection error (like when the host is unreachable) - WebClientRequestException connectionException = new WebClientRequestException( - new UnknownHostException("Host not found"), // Cause of the error - HttpMethod.GET, // HTTP method used - URI.create("http://localhost/Medication/123"), // URI of the request (can be any valid URI) - HttpHeaders.EMPTY // Headers (optional, can be empty) - ); - - // Mock the behavior of dataStore.fetchResourceByReference to return the exception - when(dataStore.fetchResourceByReference("Medication/123")).thenReturn(Mono.error(connectionException)); - - Mono result = referenceHandler.getResourceMono(null, true, "Medication/123"); - - // Now expecting an error (not complete) - StepVerifier.create(result) - .expectError(WebClientRequestException.class) // Expect the WebClientRequestException error type - .verify(); - } - } - - @Nested - class PatientResource { @Test - void shouldResolveReferenceSuccessfullyWithConsent() { - - PatientResourceBundle patientBundle = new PatientResourceBundle("123"); - - Patient patientResource = new Patient(); - patientResource.setId("123"); - + void handleReferenceGroupLoadableResource() { + ResourceGroup resourceGroupMedication123 = new ResourceGroup("Medication/123", "group1"); + ResourceBundle coreBundle = new ResourceBundle(); + AnnotatedAttribute attribute1 = new AnnotatedAttribute("MedicationAdministration.medication", "MedicationAdministration.medication", "MedicationAdministration.medication", false, List.of("group1")); + var reference = new ReferenceWrapper(attribute1, List.of("Medication/123"), "MedicationAdministrationGroup", "test"); + Medication medication = new Medication(); + medication.setId("123"); + coreBundle.put(medication); + when(resourceGroupValidator.collectValidGroups( + eq(reference), any(), eq(medication), eq(coreBundle))).thenReturn(List.of(resourceGroupMedication123)); - when(dataStore.fetchResourceByReference("Patient/123")).thenReturn(Mono.just(patientResource)); + var result = referenceHandler.handleReferences(List.of(reference), null, coreBundle, new HashMap<>()); - when(compartmentManager.isInCompartment(patientResource)).thenReturn(true); - when(consentValidator.checkConsent(patientResource, patientBundle)).thenReturn(true); + assertThat(result).containsExactly(resourceGroupMedication123); - - Mono result = referenceHandler.getResourceMono(patientBundle, true, "Patient/123"); - - - StepVerifier.create(result).assertNext(wrapper -> assertThat(wrapper).isNotNull()).verifyComplete(); } @Test - void shouldResolveReferenceSuccessfullyWithoutConsent() { - - PatientResourceBundle patientBundle = new PatientResourceBundle("123"); - - Patient patientResource = new Patient(); - patientResource.setId("123"); - - - when(dataStore.fetchResourceByReference("Patient/123")).thenReturn(Mono.just(patientResource)); - + void handleReferenceNewGroupInvalid() { + ResourceBundle coreBundle = new ResourceBundle(); + AnnotatedAttribute attribute1 = new AnnotatedAttribute("MedicationAdministration.medication", "MedicationAdministration.medication", "MedicationAdministration.medication", true, List.of("group1")); + var reference = new ReferenceWrapper(attribute1, List.of("Medication/123"), "MedicationAdministrationGroup", "MedicationAdministration/123"); + Medication medication = new Medication(); + medication.setId("123"); + coreBundle.put(medication); + when(resourceGroupValidator.collectValidGroups( + eq(reference), any(), eq(medication), eq(coreBundle))).thenReturn(List.of()); - when(compartmentManager.isInCompartment(patientResource)).thenReturn(true); + var result = referenceHandler.handleReferences(List.of(reference), null, coreBundle, new HashMap<>()); - Mono result = referenceHandler.getResourceMono(patientBundle, false, "Patient/123"); + assertThat(result).isEmpty(); + System.out.println(coreBundle.resourceGroupValidity()); + assertThat(coreBundle.isValidResourceGroup(new ResourceGroup("MedicationAdministration/123", "MedicationAdministrationGroup"))).isFalse(); - - StepVerifier.create(result).assertNext(wrapper -> assertThat(wrapper).isNotNull()).verifyComplete(); } @Test - void ConsentViolatedExceptionShouldBeThrown() { - PatientResourceBundle patientBundle = new PatientResourceBundle("123"); - Patient patientResource = new Patient(); - patientResource.setId("123"); - - when(dataStore.fetchResourceByReference("Patient/123")).thenReturn(Mono.just(patientResource)); - when(compartmentManager.isInCompartment(patientResource)).thenReturn(true); - when(consentValidator.checkConsent(patientResource, patientBundle)).thenReturn(false); + void handleReferenceGroupKnownInvalid() { + ResourceGroup resourceGroupMedication123 = new ResourceGroup("Medication/123", "group1"); + ResourceBundle coreBundle = new ResourceBundle(); + AnnotatedAttribute attribute1 = new AnnotatedAttribute("MedicationAdministration.medication", "MedicationAdministration.medication", "MedicationAdministration.medication", true, List.of("group1")); + var reference = new ReferenceWrapper(attribute1, List.of("Medication/123"), "MedicationAdministrationGroup", "test"); + Medication medication = new Medication(); + medication.setId("123"); + coreBundle.put(medication); + coreBundle.addResourceGroupValidity(resourceGroupMedication123, false); + when(resourceGroupValidator.collectValidGroups( + eq(reference), any(), eq(medication), eq(coreBundle))).thenReturn(List.of()); - Mono result = referenceHandler.getResourceMono(patientBundle, true, "Patient/123"); - - StepVerifier.create(result).expectErrorMatches(throwable -> throwable instanceof ConsentViolatedException && throwable.getMessage().contains("Consent Violated in Patient Resource")).verify(); + assertThatThrownBy(() -> referenceHandler.handleReference(reference, null, coreBundle, new HashMap<>())).isInstanceOf(MustHaveViolatedException.class); } - @Test - void failsPointingAtOtherPatient() { - - PatientResourceBundle patientBundle = new PatientResourceBundle("123"); - - Patient patientResource = new Patient(); - patientResource.setId("False"); - - - when(dataStore.fetchResourceByReference("Patient/123")).thenReturn(Mono.just(patientResource)); + } - when(compartmentManager.isInCompartment(patientResource)).thenReturn(true); - + @Nested + class PatientResource { - Mono result = referenceHandler.getResourceMono(patientBundle, false, "Patient/123"); + @Test + void handleResourceAttributeKnownInvalid() { + ResourceBundle coreBundle = new ResourceBundle(); + AnnotatedAttribute attribute1 = new AnnotatedAttribute("MedicationAdministration.medication", "MedicationAdministration.medication", "MedicationAdministration.medication", true, List.of("group1")); + ResourceAttribute resourceAttribute = new ResourceAttribute("MedicationAdministration/123", attribute1); + var reference = new ReferenceWrapper(attribute1, List.of("Medication/123"), "MedicationAdministrationGroup", "MedicationAdministration/123"); + PatientResourceBundle patientResourceBundle = new PatientResourceBundle("123"); + patientResourceBundle.bundle().setResourceAttributeInValid(resourceAttribute); - StepVerifier.create(result).expectErrorMatches(throwable -> throwable instanceof ReferenceToPatientException && throwable.getMessage().contains("Patient loaded reference belonging to another patient")).verify(); - } + assertThat(referenceHandler.handleReferences(List.of(reference), patientResourceBundle, coreBundle, new HashMap<>())).isEmpty(); - @Test - void shouldLogAndReturnEmptyMonoOnError() { - // Simulate a connection error (like when the host is unreachable) - when(dataStore.fetchResourceByReference("Broken/999")).thenReturn(Mono.error(new RuntimeException("Connection failed"))); - - Mono result = referenceHandler.getResourceMono(null, true, "Broken/999"); - - // Expecting a RuntimeException with the message "Connection failed" - StepVerifier.create(result) - .expectErrorMatches(throwable -> - throwable instanceof RuntimeException && - throwable.getMessage().contains("Connection failed") - ) - .verify(); } } -*/ + }