Skip to content

Commit 5ff2214

Browse files
committed
Improve Handling of Resource Patient Id Mismatch With Batch Patient Id
Now patientId mismatches between resource and batch, result in a soft failure analog to reference resolve.
1 parent 3e9a85e commit 5ff2214

File tree

4 files changed

+269
-33
lines changed

4 files changed

+269
-33
lines changed

src/main/java/de/medizininformatikinitiative/torch/model/ResourceID.java

Lines changed: 0 additions & 7 deletions
This file was deleted.

src/main/java/de/medizininformatikinitiative/torch/model/management/JobStatus.java

Lines changed: 0 additions & 8 deletions
This file was deleted.

src/main/java/de/medizininformatikinitiative/torch/service/DirectResourceLoader.java

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.util.concurrent.ConcurrentSkipListSet;
2929
import java.util.concurrent.atomic.AtomicBoolean;
3030

31+
import static java.util.Objects.requireNonNull;
32+
3133
/**
3234
* Loader class, that handles the fetching of Resources from the datastore in batches and applying consent.
3335
*/
@@ -129,6 +131,22 @@ public Mono<Void> processCoreAttributeGroup(AnnotatedAttributeGroup group, Resou
129131
}));
130132
}
131133

134+
private static ResourceWithPatientId extractPatientId(DomainResource resource) {
135+
try {
136+
return new ResourceWithPatientId(resource, ResourceUtils.patientId(resource));
137+
} catch (PatientIdNotFoundException e) {
138+
logger.warn("Ignoring resource {} not referencing a patient", resource.getId());
139+
return null;
140+
}
141+
}
142+
143+
private record ResourceWithPatientId(DomainResource resource, String patientId) {
144+
private ResourceWithPatientId {
145+
requireNonNull(resource);
146+
requireNonNull(patientId);
147+
}
148+
}
149+
132150
/**
133151
* Fetches all resources for a batch and adds them to it, if
134152
*
@@ -154,24 +172,18 @@ private Mono<PatientBatchWithConsent> processPatientSingleAttributeGroup(Annotat
154172
return groupQueries(group)
155173
.concatMap(query -> executeQueryWithBatch(batch.patientBatch(), query))
156174
.concatMap(resource -> applyConsent(resource, batch))
157-
.filter(resource -> !resource.isEmpty())
158-
.doOnNext(resource -> {
159-
try {
160-
String patientId = ResourceUtils.patientId(resource);
161-
PatientResourceBundle bundle = mutableBundles.get(patientId);
162-
if (bundle == null) {
163-
throw new PatientIdNotFoundException("A PatientResource" + resource.getId() + " referencing Patient " + patientId + " outside batch has been loaded");
164-
}
165-
if (profileMustHaveChecker.fulfilled(resource, group)) {
166-
167-
safeGroup.add(patientId);
168-
bundle.put(resource, group.id(), true);
169-
} else {
170-
bundle.put(resource, group.id(), false);
171-
}
172-
173-
} catch (PatientIdNotFoundException e) {
174-
throw new RuntimeException(e);
175+
.mapNotNull(DirectResourceLoader::extractPatientId)
176+
.filter(tuple -> batch.bundles().containsKey(tuple.patientId))
177+
.doOnDiscard(ResourceWithPatientId.class, tuple ->
178+
logger.warn("Ignoring resource {} referencing patient {} not in batch", tuple.resource.getId(), tuple.patientId))
179+
.doOnNext(tuple -> {
180+
PatientResourceBundle bundle = mutableBundles.get(tuple.patientId);
181+
if (profileMustHaveChecker.fulfilled(tuple.resource, group)) {
182+
183+
safeGroup.add(tuple.patientId);
184+
bundle.put(tuple.resource, group.id(), true);
185+
} else {
186+
bundle.put(tuple.resource, group.id(), false);
175187
}
176188
})
177189
.doOnTerminate(() -> safeSet.retainAll(safeGroup))
Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
package de.medizininformatikinitiative.torch.service;
2+
3+
import de.medizininformatikinitiative.torch.management.StructureDefinitionHandler;
4+
import de.medizininformatikinitiative.torch.model.consent.PatientBatchWithConsent;
5+
import de.medizininformatikinitiative.torch.model.crtdl.annotated.AnnotatedAttribute;
6+
import de.medizininformatikinitiative.torch.model.crtdl.annotated.AnnotatedAttributeGroup;
7+
import de.medizininformatikinitiative.torch.model.management.PatientResourceBundle;
8+
import de.medizininformatikinitiative.torch.model.management.ResourceGroup;
9+
import de.medizininformatikinitiative.torch.util.ProfileMustHaveChecker;
10+
import org.hl7.fhir.r4.model.Observation;
11+
import org.hl7.fhir.r4.model.Reference;
12+
import org.junit.jupiter.api.Nested;
13+
import org.junit.jupiter.api.Test;
14+
import org.junit.jupiter.api.extension.ExtendWith;
15+
import org.mockito.InjectMocks;
16+
import org.mockito.Mock;
17+
import org.mockito.junit.jupiter.MockitoExtension;
18+
import reactor.core.publisher.Flux;
19+
import reactor.test.StepVerifier;
20+
21+
import java.util.HashSet;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.Optional;
25+
26+
import static org.assertj.core.api.Assertions.assertThat;
27+
import static org.mockito.ArgumentMatchers.any;
28+
import static org.mockito.Mockito.when;
29+
30+
@ExtendWith(MockitoExtension.class)
31+
class DirectResourceLoaderTest {
32+
33+
34+
@Mock
35+
DataStore dataStore;
36+
@Mock
37+
StructureDefinitionHandler structureDefinitionHandler;
38+
@Mock
39+
ProfileMustHaveChecker profileMustHaveChecker;
40+
@InjectMocks
41+
DirectResourceLoader directResourceLoader;
42+
43+
44+
@Nested
45+
class ProcessPatientAttributeGroups {
46+
47+
@Test
48+
void testIgnoresEmptyFlux() {
49+
50+
var attribute = new AnnotatedAttribute("Observation.name", "Observation.name", "Observation.name", false);
51+
var attributeGroup = new AnnotatedAttributeGroup("test", "groupRef", List.of(attribute), List.of(), null);
52+
53+
var patientBundle = new PatientResourceBundle("1");
54+
var batchWithConsent = PatientBatchWithConsent.fromList(List.of(patientBundle));
55+
var safeSet = new HashSet<>(List.of("1"));
56+
57+
when(structureDefinitionHandler.getResourceType("groupRef")).thenReturn("Observation");
58+
when(dataStore.search(any(), any())).thenAnswer(invocation -> Flux.empty());
59+
60+
var result = directResourceLoader.processPatientAttributeGroups(
61+
List.of(attributeGroup),
62+
batchWithConsent,
63+
safeSet
64+
);
65+
66+
StepVerifier.create(result)
67+
.assertNext(res -> {
68+
assertThat(res.bundles()).containsKey("1");
69+
assertThat(res.get("1").bundle().cache()).doesNotContainKey("Observation/xyz");
70+
})
71+
.verifyComplete();
72+
}
73+
74+
@Test
75+
void testIgnoresEmptyResource() {
76+
77+
var attribute = new AnnotatedAttribute("Observation.name", "Observation.name", "Observation.name", false);
78+
var attributeGroup = new AnnotatedAttributeGroup("test", "groupRef", List.of(attribute), List.of(), null);
79+
80+
var patientBundle = new PatientResourceBundle("1");
81+
var batchWithConsent = PatientBatchWithConsent.fromList(List.of(patientBundle));
82+
var safeSet = new HashSet<>(List.of("1"));
83+
84+
85+
when(structureDefinitionHandler.getResourceType("groupRef")).thenReturn("Observation");
86+
when(dataStore.search(any(), any())).thenAnswer(invocation -> Flux.just(new Observation()));
87+
88+
var result = directResourceLoader.processPatientAttributeGroups(
89+
List.of(attributeGroup),
90+
batchWithConsent,
91+
safeSet
92+
);
93+
94+
StepVerifier.create(result)
95+
.assertNext(res -> {
96+
assertThat(res.bundles()).containsKey("1");
97+
assertThat(res.get("1").bundle().cache()).doesNotContainKey("Observation/xyz");
98+
})
99+
.verifyComplete();
100+
}
101+
102+
103+
@Test
104+
void testIgnoresObservationOfUnknownPatient() {
105+
106+
var attribute = new AnnotatedAttribute("Observation.name", "Observation.name", "Observation.name", false);
107+
var attributeGroup = new AnnotatedAttributeGroup("test", "groupRef", List.of(attribute), List.of(), null);
108+
109+
var patientBundle = new PatientResourceBundle("1");
110+
var batchWithConsent = PatientBatchWithConsent.fromList(List.of(patientBundle));
111+
var safeSet = new HashSet<>(List.of("1"));
112+
113+
Observation observation = new Observation();
114+
observation.setId("Observation/xyz");
115+
observation.setSubject(new Reference("Patient/2"));
116+
117+
when(structureDefinitionHandler.getResourceType("groupRef")).thenReturn("Observation");
118+
when(dataStore.search(any(), any())).thenAnswer(invocation -> Flux.just(observation));
119+
120+
var result = directResourceLoader.processPatientAttributeGroups(
121+
List.of(attributeGroup),
122+
batchWithConsent,
123+
safeSet
124+
);
125+
126+
StepVerifier.create(result)
127+
.assertNext(res -> {
128+
assertThat(res.bundles()).containsKey("1");
129+
assertThat(res.get("1").bundle().cache()).doesNotContainKey("Observation/xyz");
130+
})
131+
.verifyComplete();
132+
}
133+
134+
@Test
135+
void testIgnoresResourceWithoutPatientReference() {
136+
// Arrange
137+
var attribute = new AnnotatedAttribute("Observation.name", "Observation.name", "Observation.name", false);
138+
var attributeGroup = new AnnotatedAttributeGroup("test", "groupRef", List.of(attribute), List.of(), null);
139+
var patientBundle = new PatientResourceBundle("1");
140+
var batchWithConsent = PatientBatchWithConsent.fromList(List.of(patientBundle));
141+
var safeSet = new HashSet<>(List.of("1"));
142+
143+
Observation observation = new Observation();
144+
observation.setId("xyz");
145+
146+
when(structureDefinitionHandler.getResourceType("groupRef")).thenReturn("Observation");
147+
when(dataStore.search(any(), any())).thenAnswer(invocation -> Flux.just(observation));
148+
149+
var result = directResourceLoader.processPatientAttributeGroups(
150+
List.of(attributeGroup),
151+
batchWithConsent,
152+
safeSet
153+
);
154+
155+
StepVerifier.create(result)
156+
.assertNext(res -> {
157+
assertThat(res.bundles()).containsKey("1");
158+
assertThat(res.get("1").bundle().cache()).doesNotContainKey("Observation/xyz");
159+
})
160+
.verifyComplete();
161+
}
162+
163+
164+
@Test
165+
void testStoresObservationWithInvalidMustHave() {
166+
// Arrange
167+
var attribute = new AnnotatedAttribute("Observation.name", "Observation.name", "Observation.name", false);
168+
var attributeGroup = new AnnotatedAttributeGroup("test", "groupRef", List.of(attribute), List.of(), null);
169+
var patientBundle = new PatientResourceBundle("1");
170+
var batchWithConsent = PatientBatchWithConsent.fromList(List.of(patientBundle));
171+
var safeSet = new HashSet<>(List.of("1"));
172+
173+
Observation observation = new Observation();
174+
observation.setId("xyz");
175+
observation.setSubject(new Reference("Patient/1"));
176+
177+
when(structureDefinitionHandler.getResourceType("groupRef")).thenReturn("Observation");
178+
when(dataStore.search(any(), any())).thenAnswer(invocation -> Flux.just(observation));
179+
when(profileMustHaveChecker.fulfilled(observation, attributeGroup)).thenReturn(false);
180+
181+
var result = directResourceLoader.processPatientAttributeGroups(
182+
List.of(attributeGroup),
183+
batchWithConsent,
184+
safeSet
185+
);
186+
187+
StepVerifier.create(result)
188+
.assertNext(processedBatch -> {
189+
assertThat(processedBatch).isNotNull();
190+
assertThat(processedBatch.patientBatch().ids()).containsExactly("1");
191+
192+
// Example check for modified bundle cache
193+
assertThat(processedBatch.get("1").bundle().cache())
194+
.isEqualTo(Map.of("Observation/xyz", Optional.of(observation)));
195+
assertThat(processedBatch.get("1").bundle().getValidResourceGroups()).doesNotContain(
196+
new ResourceGroup("Observation/xyz", "test"));
197+
})
198+
.verifyComplete();
199+
}
200+
201+
202+
@Test
203+
void testStoresObservationWithKnownPatient() {
204+
// Arrange
205+
var attribute = new AnnotatedAttribute("Observation.name", "Observation.name", "Observation.name", false);
206+
var attributeGroup = new AnnotatedAttributeGroup("test", "groupRef", List.of(attribute), List.of(), null);
207+
var patientBundle = new PatientResourceBundle("1");
208+
var batchWithConsent = PatientBatchWithConsent.fromList(List.of(patientBundle));
209+
var safeSet = new HashSet<>(List.of("1"));
210+
211+
Observation observation = new Observation();
212+
observation.setId("xyz");
213+
observation.setSubject(new Reference("Patient/1"));
214+
215+
when(structureDefinitionHandler.getResourceType("groupRef")).thenReturn("Observation");
216+
when(dataStore.search(any(), any())).thenAnswer(invocation -> Flux.just(observation));
217+
when(profileMustHaveChecker.fulfilled(observation, attributeGroup)).thenReturn(true);
218+
219+
var result = directResourceLoader.processPatientAttributeGroups(
220+
List.of(attributeGroup),
221+
batchWithConsent,
222+
safeSet
223+
);
224+
225+
StepVerifier.create(result)
226+
.assertNext(processedBatch -> {
227+
assertThat(processedBatch).isNotNull();
228+
assertThat(processedBatch.patientBatch().ids()).containsExactly("1");
229+
230+
// Example check for modified bundle cache
231+
assertThat(processedBatch.get("1").bundle().cache())
232+
.isEqualTo(Map.of("Observation/xyz", Optional.of(observation)));
233+
assertThat(processedBatch.get("1").bundle().getValidResourceGroups()).containsExactly(
234+
new ResourceGroup("Observation/xyz", "test"));
235+
})
236+
.verifyComplete();
237+
}
238+
}
239+
}

0 commit comments

Comments
 (0)