Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import de.medizininformatikinitiative.torch.util.Redaction;
import de.medizininformatikinitiative.torch.util.ReferenceExtractor;
import de.medizininformatikinitiative.torch.util.ReferenceHandler;
import de.medizininformatikinitiative.torch.util.ResourceGroupValidator;
import de.medizininformatikinitiative.torch.util.ResourceReader;
import de.medizininformatikinitiative.torch.util.ResultFileManager;
import de.numcodex.sq2cql.Translator;
Expand Down Expand Up @@ -126,8 +127,13 @@ public ProcessedGroupFactory attributeGroupProcessor(CompartmentManager manager)


@Bean
ReferenceHandler referenceHandler(DataStore dataStore, ProfileMustHaveChecker mustHaveChecker, CompartmentManager compartmentManager, ConsentValidator consentValidator) {
return new ReferenceHandler(mustHaveChecker);
ReferenceHandler referenceHandler(ResourceGroupValidator resourceGroupValidator) {
return new ReferenceHandler(resourceGroupValidator);
}

@Bean
ResourceGroupValidator resourceGroupValidator(ProfileMustHaveChecker profileMustHaveChecker) {
return new ResourceGroupValidator(profileMustHaveChecker);
}

@Bean
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;

@Service
public class CrtdlProcessingService {
Expand Down Expand Up @@ -117,14 +118,15 @@ private Mono<Void> processIntern(AnnotatedCrtdl crtdl, String jobID, Flux<Patien
// Step 2: Process Patient Batches Using Preprocessed Core Bundle
logger.debug("Process patient batches with a concurrency of {}", maxConcurrency);
return preProcessedCoreBundle.flatMapMany(coreSnapshot ->
batches
.flatMap(batch -> crtdl.consentKey()
.map(s -> consentHandler.fetchAndBuildConsentInfo(s, batch))
.orElse(Mono.just(PatientBatchWithConsent.fromBatch(batch)))
.onErrorResume(ConsentViolatedException.class, ex -> Mono.empty())
, maxConcurrency) //skip batches without consenting patient
.doOnNext(patientBatch -> patientBatch.addStaticInfo(coreSnapshot))
.flatMap(batch -> processBatch(batch, jobID, groupsToProcess, coreBundle), maxConcurrency)
batches.flatMap(batch ->
processBatch(
batch,
jobID,
groupsToProcess,
coreBundle,
crtdl.consentKey(),
coreSnapshot
), maxConcurrency)
).then(
// Step 3: Write the Final Core Resource Bundle to File
Mono.defer(() -> {
Expand All @@ -135,15 +137,34 @@ private Mono<Void> processIntern(AnnotatedCrtdl crtdl, String jobID, Flux<Patien
);
}

private Mono<Void> processBatch(PatientBatchWithConsent batch, String jobID, GroupsToProcess groupsToProcess, ResourceBundle coreBundle) {
return directResourceLoader.directLoadPatientCompartment(groupsToProcess.directPatientCompartmentGroups(), batch)
.flatMap(patientBatch -> referenceResolver.processSinglePatientBatch(patientBatch, coreBundle, groupsToProcess.allGroups()))
.map(patientBatch -> cascadingDelete.handlePatientBatch(patientBatch, groupsToProcess.allGroups()))
.map(patientBatch -> batchCopierRedacter.transformBatch(patientBatch, groupsToProcess.allGroups()))
.flatMap(patientBatch -> {
batchToCoreWriter.updateCore(patientBatch, coreBundle);
return writeBatch(jobID, patientBatch);
}
private Mono<Void> processBatch(
PatientBatch batch,
String jobID,
GroupsToProcess groupsToProcess,
ResourceBundle coreBundle,
Optional<String> consentKey,
CachelessResourceBundle coreSnapshot
) {
// Fetch consent (or assume consent if key is empty)
Mono<PatientBatchWithConsent> withConsent = consentKey
.map(key -> consentHandler.fetchAndBuildConsentInfo(key, batch))
.orElse(Mono.just(PatientBatchWithConsent.fromBatch(batch)))
.onErrorResume(ConsentViolatedException.class, ex -> {
logger.debug("Skipping batch due to consent violation: {}", ex.getMessage());
return Mono.empty();
});

return withConsent
.doOnNext(b -> b.addStaticInfo(coreSnapshot))
.flatMap(consented ->
directResourceLoader.directLoadPatientCompartment(groupsToProcess.directPatientCompartmentGroups(), consented)
.flatMap(loaded -> referenceResolver.processSinglePatientBatch(loaded, coreBundle, groupsToProcess.allGroups()))
.map(processed -> cascadingDelete.handlePatientBatch(processed, groupsToProcess.allGroups()))
.map(transformed -> batchCopierRedacter.transformBatch(transformed, groupsToProcess.allGroups()))
.flatMap(finalBatch -> {
batchToCoreWriter.updateCore(finalBatch, coreBundle);
return writeBatch(jobID, finalBatch);
})
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -62,14 +63,11 @@ public ReferenceResolver(CompartmentManager compartmentManager,
*/
public Mono<ResourceBundle> resolveCoreBundle(ResourceBundle coreBundle, Map<String, AnnotatedAttributeGroup> groupMap) {
return Mono.just(coreBundle.getValidResourceGroups())
.map(groups -> groups.stream()
.filter(resourceGroup -> !compartmentManager.isInCompartment(resourceGroup)) // your custom filter logic
.map(groups -> groups.stream().filter(resourceGroup -> !compartmentManager.isInCompartment(resourceGroup))
.collect(Collectors.toSet()))
.expand(currentGroupSet ->
processResourceGroups(currentGroupSet, null, coreBundle, false, groupMap)
.onErrorResume(e -> {
return Mono.empty(); // Skip this resource group on error
}))
.onErrorResume(e -> Mono.empty()))
.then(Mono.just(coreBundle));
}

Expand All @@ -85,7 +83,11 @@ public Mono<ResourceBundle> resolveCoreBundle(ResourceBundle coreBundle, Map<Str
Mono<PatientBatchWithConsent> processSinglePatientBatch(
PatientBatchWithConsent batch, ResourceBundle coreBundle, Map<String, AnnotatedAttributeGroup> groupMap) {
return Flux.fromIterable(batch.bundles().entrySet())
.concatMap(entry -> resolvePatient(entry.getValue(), coreBundle, batch.applyConsent(), groupMap)
.concatMap(entry -> resolvePatient(entry.getValue(), coreBundle, batch.applyConsent(), groupMap).doOnNext(bundle -> {
if (bundle == null) {
logger.warn("Resolved PatientResourceBundle for key {} is null", entry.getKey());
}
}).filter(Objects::nonNull)
.map(updatedBundle -> Map.entry(entry.getKey(), updatedBundle)))
.collectMap(Map.Entry::getKey, Map.Entry::getValue)
.map(updatedBundles -> new PatientBatchWithConsent(updatedBundles, batch.applyConsent()));
Expand Down Expand Up @@ -116,9 +118,7 @@ public Mono<PatientResourceBundle> resolvePatient(
.onErrorResume(e -> {
logger.warn("Error processing resource group set {} in PatientBundle: {}", currentGroupSet, e.getMessage());
return Mono.empty(); // Skip this group on error
})
)
.then(Mono.just(patientBundle));
})).then(Mono.just(patientBundle));
}

/**
Expand All @@ -143,27 +143,18 @@ public Mono<Set<ResourceGroup>> processResourceGroups(

return bundleLoader.fetchUnknownResources(referencesGroupedByResourceGroup, patientBundle, coreBundle, applyConsent)
.thenMany(
Flux.fromIterable(referencesGroupedByResourceGroup.entrySet())
Flux.fromIterable(referencesGroupedByResourceGroup.entrySet()).filter(Objects::nonNull)
.filter(entry -> entry.getKey() != null && entry.getValue() != null && !entry.getValue().isEmpty())
.concatMap(entry ->
{
try {
return referenceHandler.handleReferences(
entry.getValue(),
patientBundle,
coreBundle,
groupMap
);
} catch (MustHaveViolatedException e) {
return Flux.empty();
}
}
)
)
Flux.fromIterable(referenceHandler.handleReferences(
entry.getValue(),
patientBundle,
coreBundle,
groupMap))))
.collect(Collectors.toSet())
.flatMap(set -> set.isEmpty() ? Mono.empty() : Mono.just(set));
}


/**
* Extracts for every ResourceGroup the ReferenceWrappers and collects them ordered by
*
Expand All @@ -179,9 +170,18 @@ public Map<ResourceGroup, List<ReferenceWrapper>> loadReferencesByResourceGroup(
ResourceBundle coreBundle,
Map<String, AnnotatedAttributeGroup> groupMap) {

return resourceGroups.parallelStream()
.map(resourceGroup -> processResourceGroup(resourceGroup, patientBundle, coreBundle, groupMap))
.filter(entry -> !entry.getValue().isEmpty())
return resourceGroups.stream()
.map(resourceGroup -> processResourceGroup(resourceGroup, patientBundle, coreBundle, groupMap)).map(entry -> {
if (entry == null) {
logger.warn("Null entry returned by processResourceGroup");
} else if (entry.getKey() == null) {
logger.warn("Entry with null key for resource group {}", entry);
} else if (entry.getValue() == null || entry.getValue().isEmpty()) {
logger.info("No references extracted for resource group {}", entry.getKey());
}
return entry;
}).filter(Objects::nonNull)
.filter(entry -> entry.getKey() != null && entry.getValue() != null && !entry.getValue().isEmpty())
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
Expand Down Expand Up @@ -219,11 +219,7 @@ private Map.Entry<ResourceGroup, List<ReferenceWrapper>> processResourceGroup(
? patientBundle.bundle().get(resourceGroup.resourceId())
: coreBundle.get(resourceGroup.resourceId());

if (resource.isPresent()) {
return extractReferences(resourceGroup, resource.get(), groupMap, processingBundle);
} else {
return handleMissingResource(resourceGroup, processingBundle);
}
return resource.map(value -> extractReferences(resourceGroup, value, groupMap, processingBundle)).orElseGet(() -> handleMissingResource(resourceGroup, processingBundle));
}

/**
Expand Down Expand Up @@ -253,9 +249,7 @@ private Map.Entry<ResourceGroup, List<ReferenceWrapper>> extractReferences(
List<ReferenceWrapper> extracted = referenceExtractor.extract(resource, groupMap, resourceGroup.groupId());
return Map.entry(resourceGroup, extracted);
} catch (MustHaveViolatedException e) {
synchronized (processingBundle) {
processingBundle.addResourceGroupValidity(resourceGroup, false);
}
processingBundle.addResourceGroupValidity(resourceGroup, false);
return Map.entry(resourceGroup, Collections.emptyList());
}
}
Expand All @@ -270,11 +264,8 @@ private Map.Entry<ResourceGroup, List<ReferenceWrapper>> extractReferences(
private Map.Entry<ResourceGroup, List<ReferenceWrapper>> handleMissingResource(
ResourceGroup resourceGroup,
ResourceBundle processingBundle) {

synchronized (processingBundle) {
logger.warn("Empty resource marked as valid for group {}", resourceGroup);
processingBundle.addResourceGroupValidity(resourceGroup, false);
}
logger.warn("Empty resource marked as valid for group {}", resourceGroup);
processingBundle.addResourceGroupValidity(resourceGroup, false);
return Map.entry(resourceGroup, Collections.emptyList());
}

Expand Down
Loading