Skip to content

Commit fb226e1

Browse files
committed
Improve Logging In The Torch Pipeline
1 parent dbe670a commit fb226e1

File tree

7 files changed

+44
-33
lines changed

7 files changed

+44
-33
lines changed

src/main/java/de/medizininformatikinitiative/torch/model/management/PatientResourceBundle.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import java.util.Collection;
88
import java.util.Optional;
9+
import java.util.Set;
910

1011
import static java.util.Objects.requireNonNull;
1112

@@ -85,4 +86,8 @@ public void put(String resourceReference) {
8586
public void addStaticInfo(CachelessResourceBundle staticInfo) {
8687
bundle.merge(staticInfo);
8788
}
89+
90+
public Set<ResourceGroup> getValidResourceGroups() {
91+
return bundle.getValidResourceGroups();
92+
}
8893
}

src/main/java/de/medizininformatikinitiative/torch/service/CrtdlProcessingService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.io.IOException;
3333
import java.util.List;
3434
import java.util.Map;
35+
import java.util.UUID;
3536

3637
@Service
3738
public class CrtdlProcessingService {
@@ -139,6 +140,7 @@ private Mono<Void> processIntern(AnnotatedCrtdl crtdl, String jobID, Flux<Patien
139140
).then(
140141
// Step 3: Write the Final Core Resource Bundle to File
141142
Mono.defer(() -> {
143+
logger.debug("Handling Final Core Bundle");
142144
PatientResourceBundle corePatientBundle = new PatientResourceBundle("CORE", coreBundle);
143145
PatientBatchWithConsent coreBundleBatch = new PatientBatchWithConsent(Map.of("CORE", corePatientBundle), false);
144146
return writeBatch(jobID, batchCopierRedacter.transformBatch(coreBundleBatch, groupsToProcess.allGroups()));
@@ -147,10 +149,14 @@ private Mono<Void> processIntern(AnnotatedCrtdl crtdl, String jobID, Flux<Patien
147149
}
148150

149151
private Mono<Void> processBatch(PatientBatchWithConsent batch, String jobID, GroupsToProcess groupsToProcess, ResourceBundle coreBundle) {
152+
UUID id = UUID.randomUUID();
150153
return directResourceLoader.directLoadPatientCompartment(groupsToProcess.directPatientCompartmentGroups(), batch)
154+
.doOnNext(loadedBatch -> logger.debug("Directly loaded patient compartment for batch {} with {} patients", id, loadedBatch.patientIds().size()))
151155
.flatMap(patientBatch -> referenceResolver.processSinglePatientBatch(patientBatch, coreBundle, groupsToProcess.allGroups()))
152156
.map(patientBatch -> cascadingDelete.handlePatientBatch(patientBatch, groupsToProcess.allGroups()))
157+
.doOnNext(loadedBatch -> logger.debug("Batch resolved references {} with {} patients", id, loadedBatch.patientIds().size()))
153158
.map(patientBatch -> batchCopierRedacter.transformBatch(patientBatch, groupsToProcess.allGroups()))
159+
.doOnNext(loadedBatch -> logger.debug("Batch finished extraction {} ", id))
154160
.flatMap(patientBatch -> {
155161
batchToCoreWriter.updateCore(patientBatch, coreBundle);
156162
return writeBatch(jobID, patientBatch);

src/main/java/de/medizininformatikinitiative/torch/service/DataStore.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.Optional;
3333
import java.util.Set;
3434
import java.util.UUID;
35+
import java.util.concurrent.atomic.AtomicInteger;
3536
import java.util.function.Consumer;
3637
import java.util.stream.Stream;
3738

@@ -154,8 +155,10 @@ private String serializeBatchBundle(Map<String, Set<String>> idsByType) {
154155
*/
155156
public <T extends Resource> Flux<T> search(Query query, Class<T> resourceType) {
156157
var start = System.nanoTime();
157-
logger.trace("Execute query: {}", query);
158-
158+
var queryId = UUID.randomUUID();
159+
logger.debug("Executing query {} for resource type {}", queryId, query.type());
160+
logger.trace("Full query: {}", query);
161+
var counter = new AtomicInteger();
159162
return client.post()
160163
.uri("/" + query.type() + "/_search")
161164
.contentType(APPLICATION_FORM_URLENCODED)
@@ -170,14 +173,15 @@ public <T extends Resource> Flux<T> search(Query query, Class<T> resourceType) {
170173
.flatMap(bundle -> Flux.fromStream(bundle.getEntry().stream().map(Bundle.BundleEntryComponent::getResource)))
171174
.flatMap(resource -> {
172175
if (resourceType.isInstance(resource)) {
176+
counter.incrementAndGet();
173177
return Mono.just(resourceType.cast(resource));
174178
} else {
175179
logger.warn("Found miss match resource type {} querying type {}", resource.getClass().getSimpleName(), query.type());
176180
return Mono.empty();
177181
}
178182
})
179-
.doOnComplete(() -> logger.trace("Finished query `{}` in {} seconds.", query,
180-
"%.1f".formatted(TimeUtils.durationSecondsSince(start))))
183+
.doOnComplete(() -> logger.debug("Finished query `{}` in {} seconds with {} resources.", queryId,
184+
"%.1f".formatted(TimeUtils.durationSecondsSince(start)), counter.get()))
181185
.doOnError(e -> logger.error("Error while executing resource query `{}`: {}", query, e.getMessage()));
182186
}
183187

src/main/java/de/medizininformatikinitiative/torch/service/DirectResourceLoader.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import de.medizininformatikinitiative.torch.consent.ConsentValidator;
44
import de.medizininformatikinitiative.torch.exceptions.MustHaveViolatedException;
55
import de.medizininformatikinitiative.torch.exceptions.PatientIdNotFoundException;
6-
import de.medizininformatikinitiative.torch.management.StructureDefinitionHandler;
76
import de.medizininformatikinitiative.torch.model.consent.PatientBatchWithConsent;
87
import de.medizininformatikinitiative.torch.model.crtdl.annotated.AnnotatedAttributeGroup;
98
import de.medizininformatikinitiative.torch.model.fhir.Query;
@@ -42,17 +41,14 @@ public class DirectResourceLoader {
4241

4342
private final ConsentValidator consentValidator;
4443
private final DseMappingTreeBase dseMappingTreeBase;
45-
private final StructureDefinitionHandler structureDefinitionsHandler;
4644
private final ProfileMustHaveChecker profileMustHaveChecker;
4745

4846
@Autowired
49-
public DirectResourceLoader(DataStore dataStore, DseMappingTreeBase dseMappingTreeBase, StructureDefinitionHandler structureDefinitionHandler, ProfileMustHaveChecker profileMustHaveChecker, ConsentValidator validator) {
47+
public DirectResourceLoader(DataStore dataStore, DseMappingTreeBase dseMappingTreeBase, ProfileMustHaveChecker profileMustHaveChecker, ConsentValidator validator) {
5048
this.dataStore = dataStore;
5149
this.consentValidator = validator;
5250
this.dseMappingTreeBase = dseMappingTreeBase;
53-
this.structureDefinitionsHandler = structureDefinitionHandler;
5451
this.profileMustHaveChecker = profileMustHaveChecker;
55-
5652
}
5753

5854
/**
@@ -74,7 +70,17 @@ public Mono<PatientBatchWithConsent> directLoadPatientCompartment(
7470

7571
private Mono<PatientBatchWithConsent> processBatchWithConsent(List<AnnotatedAttributeGroup> attributeGroups, PatientBatchWithConsent patientBatchWithConsent) {
7672
Set<String> safeSet = new ConcurrentSkipListSet<>(patientBatchWithConsent.patientBatch().ids());
77-
return processPatientAttributeGroups(attributeGroups, patientBatchWithConsent, safeSet).map(bundle -> patientBatchWithConsent.keep(safeSet));
73+
return processPatientAttributeGroups(attributeGroups, patientBatchWithConsent, safeSet)
74+
.doOnNext(bundle -> {
75+
logger.debug(" {} out of {} patients passed consent checks",
76+
safeSet.size(),
77+
patientBatchWithConsent.patientBatch().ids().size());
78+
79+
if (logger.isTraceEnabled()) {
80+
logger.trace("Surviving patient IDs: {}", String.join(", ", safeSet));
81+
}
82+
})
83+
.map(bundle -> patientBatchWithConsent.keep(safeSet));
7884
}
7985

8086
private Flux<Query> groupQueries(AnnotatedAttributeGroup group) {
@@ -123,7 +129,6 @@ public Mono<Void> processCoreAttributeGroup(AnnotatedAttributeGroup group, Resou
123129
}).then(Mono.defer(() -> {
124130
if (atLeastOneResource.get()) {
125131
return Mono.empty();
126-
127132
} else {
128133
logger.error("MustHave violated for group: {}", group.groupReference());
129134
return Mono.error(new MustHaveViolatedException("MustHave requirement violated for group: " + group.id()));
@@ -179,11 +184,10 @@ private Mono<PatientBatchWithConsent> processPatientSingleAttributeGroup(Annotat
179184
.doOnNext(tuple -> {
180185
PatientResourceBundle bundle = mutableBundles.get(tuple.patientId);
181186
if (profileMustHaveChecker.fulfilled(tuple.resource, group)) {
182-
183187
safeGroup.add(tuple.patientId);
184-
System.out.println("Ref" + group.groupReference() + "Type" + group.resourceType() + " ID " + group.id());
185188
bundle.put(tuple.resource, group.id(), true);
186189
} else {
190+
logger.trace("Resource {} has not fulfilled must have checks for group {}", tuple.resource.getId(), group.id());
187191
bundle.put(tuple.resource, group.id(), false);
188192
}
189193
})

src/main/java/de/medizininformatikinitiative/torch/service/ReferenceBundleLoader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public Mono<Void> fetchUnknownResources(
6969
}
7070
notLoaded.forEach(unloaded -> {
7171
if (compartmentManager.isInCompartment(unloaded) && patientBundle != null) {
72-
patientBundle.bundle().put(unloaded);
72+
patientBundle.put(unloaded);
7373
} else {
7474
coreBundle.put(unloaded);
7575
}
@@ -78,7 +78,7 @@ public Mono<Void> fetchUnknownResources(
7878
logger.error("Failed to fetch resources, marking all as invalid: {}", e.getMessage());
7979
unknownReferences.forEach(ref -> {
8080
if (compartmentManager.isInCompartment(ref) && patientBundle != null) {
81-
patientBundle.bundle().put(ref);
81+
patientBundle.put(ref);
8282
} else {
8383
coreBundle.put(ref);
8484
}

src/main/java/de/medizininformatikinitiative/torch/service/ReferenceResolver.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,9 @@ public Mono<PatientResourceBundle> resolvePatient(
106106
ResourceBundle coreBundle,
107107
boolean applyConsent,
108108
Map<String, AnnotatedAttributeGroup> groupMap) {
109-
110-
return Mono.just(patientBundle.bundle().getValidResourceGroups())
109+
int groupValidity = patientBundle.getValidResourceGroups().size();
110+
logger.trace("Resolving Patient Resource Bundle {} with {} valid groups", patientBundle.patientId(), groupValidity);
111+
return Mono.just(patientBundle.getValidResourceGroups())
111112
.map(groups -> groups.stream()
112113
.filter(compartmentManager::isInCompartment) // your custom filter logic
113114
.collect(Collectors.toSet()))
@@ -216,14 +217,10 @@ private Map.Entry<ResourceGroup, List<ReferenceWrapper>> processResourceGroup(
216217
}
217218

218219
Optional<Resource> resource = isPatientResource
219-
? patientBundle.bundle().get(resourceGroup.resourceId())
220+
? patientBundle.get(resourceGroup.resourceId())
220221
: coreBundle.get(resourceGroup.resourceId());
221222

222-
if (resource.isPresent()) {
223-
return extractReferences(resourceGroup, resource.get(), groupMap, processingBundle);
224-
} else {
225-
return handleMissingResource(resourceGroup, processingBundle);
226-
}
223+
return resource.map(value -> extractReferences(resourceGroup, value, groupMap, processingBundle)).orElseGet(() -> handleMissingResource(resourceGroup, processingBundle));
227224
}
228225

229226
/**
@@ -253,9 +250,7 @@ private Map.Entry<ResourceGroup, List<ReferenceWrapper>> extractReferences(
253250
List<ReferenceWrapper> extracted = referenceExtractor.extract(resource, groupMap, resourceGroup.groupId());
254251
return Map.entry(resourceGroup, extracted);
255252
} catch (MustHaveViolatedException e) {
256-
synchronized (processingBundle) {
257-
processingBundle.addResourceGroupValidity(resourceGroup, false);
258-
}
253+
processingBundle.addResourceGroupValidity(resourceGroup, false);
259254
return Map.entry(resourceGroup, Collections.emptyList());
260255
}
261256
}
@@ -270,11 +265,8 @@ private Map.Entry<ResourceGroup, List<ReferenceWrapper>> extractReferences(
270265
private Map.Entry<ResourceGroup, List<ReferenceWrapper>> handleMissingResource(
271266
ResourceGroup resourceGroup,
272267
ResourceBundle processingBundle) {
273-
274-
synchronized (processingBundle) {
275-
logger.warn("Empty resource marked as valid for group {}", resourceGroup);
276-
processingBundle.addResourceGroupValidity(resourceGroup, false);
277-
}
268+
logger.warn("Empty resource marked as valid for group {}", resourceGroup);
269+
processingBundle.addResourceGroupValidity(resourceGroup, false);
278270
return Map.entry(resourceGroup, Collections.emptyList());
279271
}
280272

src/test/java/de/medizininformatikinitiative/torch/service/ReferenceResolverIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ void notFetchingResources() {
273273
patientBundle.put(new ResourceGroupWrapper(condition, Set.of("Condition1")));
274274
patientBundle.bundle().addResourceGroupValidity(new ResourceGroup("Condition/2", "Condition1"), true);
275275

276-
var result = referenceResolver.processResourceGroups(patientBundle.bundle().getValidResourceGroups(), patientBundle, coreBundle, false, attributeGroupMap);
276+
var result = referenceResolver.processResourceGroups(patientBundle.getValidResourceGroups(), patientBundle, coreBundle, false, attributeGroupMap);
277277

278278

279279
StepVerifier.create(result)
@@ -298,7 +298,7 @@ void loadReferences_success() {
298298
patientBundle.put(new ResourceGroupWrapper(patient, Set.of()));
299299
patientBundle.put(new ResourceGroupWrapper(condition, Set.of("Condition1")));
300300

301-
var result = referenceResolver.loadReferencesByResourceGroup(patientBundle.bundle().getValidResourceGroups(), patientBundle, coreBundle, attributeGroupMap);
301+
var result = referenceResolver.loadReferencesByResourceGroup(patientBundle.getValidResourceGroups(), patientBundle, coreBundle, attributeGroupMap);
302302

303303
assertThat(result).containsExactly(Map.entry(new ResourceGroup("Condition/2", "Condition1"), List.of(new ReferenceWrapper(conditionSubject, List.of(PAT_REFERENCE), "Condition1", "Condition/2"))));
304304
}

0 commit comments

Comments
 (0)