Skip to content

Commit 185502b

Browse files
committed
Expose Failed Patient Transfers via Status
1 parent d084b10 commit 185502b

File tree

5 files changed

+204
-26
lines changed

5 files changed

+204
-26
lines changed

clinical-domain-agent/src/main/java/care/smith/fts/cda/DefaultTransferProcessRunner.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,12 @@ private Flux<ConsentedPatientBundle> selectDataForPatient(ConsentedPatient patie
151151
return process
152152
.dataSelector()
153153
.select(patient)
154-
.doOnError(e -> logError("select data", patient.identifier(), e));
154+
.onErrorResume(
155+
e -> {
156+
logError("select data", patient.identifier(), e);
157+
status.updateAndGet(s -> s.addFailedPatient(patient.identifier(), e.getMessage()));
158+
return Flux.empty();
159+
});
155160
}
156161

157162
private void logError(String step, String patientIdentifier, Throwable e) {
@@ -171,25 +176,36 @@ private Flux<PatientContext<TransportBundle>> deidentify(
171176

172177
private Mono<PatientContext<TransportBundle>> deidentifyForPatient(
173178
ConsentedPatientBundle bundle) {
179+
var patientId = bundle.consentedPatient().identifier();
174180
return process
175181
.deidentificator()
176182
.deidentify(bundle)
177-
.doOnError(e -> logError("deidentify bundle", bundle.consentedPatient().identifier(), e))
178-
.map(t -> new PatientContext<>(t, bundle.consentedPatient()));
183+
.map(t -> new PatientContext<>(t, bundle.consentedPatient()))
184+
.onErrorResume(
185+
e -> {
186+
logError("deidentify bundle", patientId, e);
187+
status.updateAndGet(s -> s.addFailedPatient(patientId, e.getMessage()));
188+
return Mono.empty();
189+
});
179190
}
180191

181192
private Flux<Result> sendBundles(Flux<PatientContext<TransportBundle>> deidentification) {
182193
return deidentification
183194
.flatMap(this::sendBundleForPatient, config.maxSendConcurrency)
184-
.doOnNext(b -> status.updateAndGet(TransferProcessStatus::incSentBundles))
185-
.onErrorContinue((e, r) -> status.updateAndGet(TransferProcessStatus::incSkippedBundles));
195+
.doOnNext(b -> status.updateAndGet(TransferProcessStatus::incSentBundles));
186196
}
187197

188198
private Mono<Result> sendBundleForPatient(PatientContext<TransportBundle> b) {
199+
var patientId = b.consentedPatient().identifier();
189200
return process
190201
.bundleSender()
191202
.send(b.data())
192-
.doOnError(e -> logError("send bundle", b.consentedPatient().identifier(), e));
203+
.onErrorResume(
204+
e -> {
205+
logError("send bundle", patientId, e);
206+
status.updateAndGet(s -> s.addFailedPatient(patientId, e.getMessage()));
207+
return Mono.empty();
208+
});
193209
}
194210

195211
private void onComplete() {

clinical-domain-agent/src/main/java/care/smith/fts/cda/TransferProcessStatus.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import care.smith.fts.cda.TransferProcessRunner.Phase;
66
import java.time.LocalDateTime;
7+
import java.util.ArrayList;
8+
import java.util.List;
79
import lombok.With;
810

911
public record TransferProcessStatus(
@@ -15,10 +17,14 @@ public record TransferProcessStatus(
1517
@With(PRIVATE) long totalBundles,
1618
@With(PRIVATE) long deidentifiedBundles,
1719
@With(PRIVATE) long sentBundles,
18-
@With(PRIVATE) long skippedBundles) {
20+
@With(PRIVATE) long skippedBundles,
21+
@With(PRIVATE) List<PatientError> failedPatients) {
22+
23+
public record PatientError(String patientId, String errorMessage) {}
24+
1925
public static TransferProcessStatus create(String processId) {
2026
return new TransferProcessStatus(
21-
processId, Phase.QUEUED, LocalDateTime.now(), null, 0, 0, 0, 0, 0);
27+
processId, Phase.QUEUED, LocalDateTime.now(), null, 0, 0, 0, 0, 0, List.of());
2228
}
2329

2430
public TransferProcessStatus incTotalPatients() {
@@ -41,6 +47,12 @@ public TransferProcessStatus incSkippedBundles() {
4147
return withSkippedBundles(skippedBundles + 1);
4248
}
4349

50+
public TransferProcessStatus addFailedPatient(String patientId, String errorMessage) {
51+
var updated = new ArrayList<>(failedPatients);
52+
updated.add(new PatientError(patientId, errorMessage));
53+
return withFailedPatients(List.copyOf(updated)).withSkippedBundles(skippedBundles + 1);
54+
}
55+
4456
/**
4557
* Set the process phase. If the phase switches to COMPLETED or COMPLETED_WITH_ERROR `finishedAt`
4658
* is set. Once the process is in a completed state going back is not possible anymore.

clinical-domain-agent/src/main/java/care/smith/fts/cda/rest/TransferProcessController.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ private URI generateJobUri(UriComponentsBuilder uriBuilder, String id) {
140140
name = "Transfer process status",
141141
value =
142142
"""
143-
{"processId":"ChlblQ","phase":"RUNNING","createdAt":[2024,12,16,9,29,37,186662587],"finishedAt":null,"totalPatients":100,"totalBundles":53,"deidentifiedBundles":32,"sentBundles":0,"skippedBundles":0}
143+
{"processId":"ChlblQ","phase":"RUNNING","createdAt":[2024,12,16,9,29,37,186662587],"finishedAt":null,"totalPatients":100,"totalBundles":53,"deidentifiedBundles":32,"sentBundles":0,"skippedBundles":0,"failedPatients":[]}
144144
""")
145145
})),
146146
@ApiResponse(responseCode = "404", description = "The project could not be found")
@@ -170,8 +170,8 @@ Mono<ResponseEntity<TransferProcessStatus>> status(
170170
value =
171171
"""
172172
[
173-
{"processId":"8R9JGu","phase":"COMPLETED","createdAt":[2024,12,16,9,28,50,772443200],"finishedAt":[2024,12,16,9,29,31,776068091],"totalPatients":100,"totalBundles":119,"deidentifiedBundles":118,"sentBundles":118,"skippedBundles":0},
174-
{"processId":"ChlblQ","phase":"RUNNING","createdAt":[2024,12,16,9,29,37,186662587],"finishedAt":null,"totalPatients":100,"totalBundles":53,"deidentifiedBundles":32,"sentBundles":0,"skippedBundles":0}
173+
{"processId":"8R9JGu","phase":"COMPLETED","createdAt":[2024,12,16,9,28,50,772443200],"finishedAt":[2024,12,16,9,29,31,776068091],"totalPatients":100,"totalBundles":119,"deidentifiedBundles":118,"sentBundles":118,"skippedBundles":0,"failedPatients":[]},
174+
{"processId":"ChlblQ","phase":"RUNNING","createdAt":[2024,12,16,9,29,37,186662587],"finishedAt":null,"totalPatients":100,"totalBundles":53,"deidentifiedBundles":32,"sentBundles":0,"skippedBundles":0,"failedPatients":[]}
175175
]
176176
""")
177177
})),

clinical-domain-agent/src/test/java/care/smith/fts/cda/DefaultTransferProcessRunnerTest.java

Lines changed: 119 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import com.fasterxml.jackson.databind.ObjectMapper;
2121
import java.time.Duration;
2222
import java.util.List;
23-
import java.util.concurrent.atomic.AtomicBoolean;
23+
import java.util.concurrent.atomic.AtomicInteger;
2424
import org.hl7.fhir.r4.model.Bundle;
2525
import org.junit.jupiter.api.BeforeEach;
2626
import org.junit.jupiter.api.Test;
@@ -37,6 +37,9 @@ class DefaultTransferProcessRunnerTest {
3737
private static final String PATIENT_IDENTIFIER_2 = "patient-142391";
3838
private static final ConsentedPatient PATIENT_2 =
3939
new ConsentedPatient(PATIENT_IDENTIFIER_2, "system");
40+
private static final String PATIENT_IDENTIFIER_3 = "patient-293847";
41+
private static final ConsentedPatient PATIENT_3 =
42+
new ConsentedPatient(PATIENT_IDENTIFIER_3, "system");
4043

4144
private DefaultTransferProcessRunner runner;
4245

@@ -180,7 +183,7 @@ void errorInDataSelectorSkipsBundleAndContinues() {
180183
new TransferProcessDefinition(
181184
"test",
182185
rawConfig,
183-
pids -> fromIterable(List.of(PATIENT, PATIENT_2)),
186+
pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)),
184187
errorOnSecond(new Bundle()),
185188
b -> just(new TransportBundle(new Bundle(), "transferId")),
186189
b -> just(new Result()));
@@ -192,15 +195,15 @@ void errorInDataSelectorSkipsBundleAndContinues() {
192195
}
193196

194197
private static DataSelector errorOnSecond(Bundle bundle) {
195-
var first = new AtomicBoolean(true);
198+
var callCount = new AtomicInteger(0);
196199
return p ->
197-
first.getAndSet(false)
200+
callCount.getAndIncrement() == 1
198201
? Flux.error(new RuntimeException("Cannot select data"))
199202
: Flux.just(bundle).map(b -> new ConsentedPatientBundle(b, p));
200203
}
201204

202205
private void completedWithErrors(TransferProcessStatus r) {
203-
assertThat(r.sentBundles()).isEqualTo(1);
206+
assertThat(r.sentBundles()).isEqualTo(2);
204207
assertThat(r.skippedBundles()).isEqualTo(1);
205208
assertThat(r.phase()).isEqualTo(Phase.COMPLETED_WITH_ERROR);
206209
}
@@ -211,7 +214,7 @@ void errorInDeidentificatorSkipsBundleAndContinues() {
211214
new TransferProcessDefinition(
212215
"test",
213216
rawConfig,
214-
pids -> fromIterable(List.of(PATIENT, PATIENT_2)),
217+
pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)),
215218
p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), p))),
216219
errorOnSecond(new TransportBundle(new Bundle(), "transferId")),
217220
b -> just(new Result()));
@@ -223,11 +226,11 @@ void errorInDeidentificatorSkipsBundleAndContinues() {
223226
}
224227

225228
private static Deidentificator errorOnSecond(TransportBundle bundle) {
226-
var first = new AtomicBoolean(true);
229+
var callCount = new AtomicInteger(0);
227230
return b ->
228-
first.getAndSet(false)
229-
? just(bundle)
230-
: Mono.error(new RuntimeException("Cannot deidentify bundle"));
231+
callCount.getAndIncrement() == 1
232+
? Mono.error(new RuntimeException("Cannot deidentify bundle"))
233+
: just(bundle);
231234
}
232235

233236
@Test
@@ -236,7 +239,7 @@ void errorInBundleSenderSkipsBundleAndContinues() {
236239
new TransferProcessDefinition(
237240
"test",
238241
rawConfig,
239-
pids -> fromIterable(List.of(PATIENT, DefaultTransferProcessRunnerTest.PATIENT_2)),
242+
pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)),
240243
p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), p))),
241244
b -> just(new TransportBundle(new Bundle(), "transferId")),
242245
errorOnSecond(new Result()));
@@ -248,11 +251,112 @@ void errorInBundleSenderSkipsBundleAndContinues() {
248251
}
249252

250253
private static BundleSender errorOnSecond(Result result) {
251-
var first = new AtomicBoolean(true);
254+
var callCount = new AtomicInteger(0);
252255
return b ->
253-
first.getAndSet(false)
254-
? just(result)
255-
: Mono.error(new RuntimeException("Cannot send bundle"));
256+
callCount.getAndIncrement() == 1
257+
? Mono.error(new RuntimeException("Cannot send bundle"))
258+
: just(result);
259+
}
260+
261+
@Test
262+
void errorInDataSelectorRecordsFailedPatient() {
263+
var process =
264+
new TransferProcessDefinition(
265+
"test",
266+
rawConfig,
267+
pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)),
268+
errorOnSecond(new Bundle()),
269+
b -> just(new TransportBundle(new Bundle(), "transferId")),
270+
b -> just(new Result()));
271+
272+
var processId = runner.start(process, List.of());
273+
waitForCompletion(processId);
274+
275+
create(runner.status(processId))
276+
.assertNext(
277+
r -> {
278+
assertThat(r.phase()).isEqualTo(Phase.COMPLETED_WITH_ERROR);
279+
assertThat(r.sentBundles()).isEqualTo(2);
280+
assertThat(r.failedPatients()).hasSize(1);
281+
assertThat(r.failedPatients().getFirst().patientId())
282+
.isEqualTo(PATIENT_IDENTIFIER_2);
283+
assertThat(r.failedPatients().getFirst().errorMessage())
284+
.isEqualTo("Cannot select data");
285+
})
286+
.verifyComplete();
287+
}
288+
289+
@Test
290+
void errorInDeidentificatorRecordsFailedPatient() {
291+
var process =
292+
new TransferProcessDefinition(
293+
"test",
294+
rawConfig,
295+
pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)),
296+
p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), p))),
297+
errorOnSecond(new TransportBundle(new Bundle(), "transferId")),
298+
b -> just(new Result()));
299+
300+
var processId = runner.start(process, List.of());
301+
waitForCompletion(processId);
302+
303+
create(runner.status(processId))
304+
.assertNext(
305+
r -> {
306+
assertThat(r.phase()).isEqualTo(Phase.COMPLETED_WITH_ERROR);
307+
assertThat(r.sentBundles()).isEqualTo(2);
308+
assertThat(r.failedPatients()).hasSize(1);
309+
assertThat(r.failedPatients().getFirst().errorMessage())
310+
.isEqualTo("Cannot deidentify bundle");
311+
})
312+
.verifyComplete();
313+
}
314+
315+
@Test
316+
void errorInBundleSenderRecordsFailedPatient() {
317+
var process =
318+
new TransferProcessDefinition(
319+
"test",
320+
rawConfig,
321+
pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)),
322+
p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), p))),
323+
b -> just(new TransportBundle(new Bundle(), "transferId")),
324+
errorOnSecond(new Result()));
325+
326+
var processId = runner.start(process, List.of());
327+
waitForCompletion(processId);
328+
329+
create(runner.status(processId))
330+
.assertNext(
331+
r -> {
332+
assertThat(r.phase()).isEqualTo(Phase.COMPLETED_WITH_ERROR);
333+
assertThat(r.sentBundles()).isEqualTo(2);
334+
assertThat(r.failedPatients()).hasSize(1);
335+
assertThat(r.failedPatients().getFirst().errorMessage())
336+
.isEqualTo("Cannot send bundle");
337+
})
338+
.verifyComplete();
339+
}
340+
341+
@Test
342+
void successfulTransferHasNoFailedPatients() {
343+
var process =
344+
new TransferProcessDefinition(
345+
"test",
346+
rawConfig,
347+
pids -> fromIterable(List.of(PATIENT)),
348+
p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), PATIENT))),
349+
b -> just(new TransportBundle(new Bundle(), "transferId")),
350+
b -> just(new Result()));
351+
352+
var processId = runner.start(process, List.of());
353+
create(runner.status(processId))
354+
.assertNext(
355+
r -> {
356+
assertThat(r.phase()).isEqualTo(Phase.COMPLETED);
357+
assertThat(r.failedPatients()).isEmpty();
358+
})
359+
.verifyComplete();
256360
}
257361

258362
@Test

clinical-domain-agent/src/test/java/care/smith/fts/cda/TransferProcessStatusTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package care.smith.fts.cda;
22

33
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
45

56
import care.smith.fts.cda.TransferProcessRunner.Phase;
7+
import care.smith.fts.cda.TransferProcessStatus.PatientError;
68
import java.time.LocalDateTime;
79
import org.junit.jupiter.api.Test;
810

@@ -22,6 +24,7 @@ void testCreate() {
2224
assertThat(status.deidentifiedBundles()).isEqualTo(0);
2325
assertThat(status.sentBundles()).isEqualTo(0);
2426
assertThat(status.skippedBundles()).isEqualTo(0);
27+
assertThat(status.failedPatients()).isEmpty();
2528
}
2629

2730
@Test
@@ -93,4 +96,47 @@ void testIsCompleted() {
9396
status = status.setPhase(Phase.FATAL);
9497
assertThat(status.isCompleted(status.phase())).isTrue();
9598
}
99+
100+
@Test
101+
void addFailedPatientIncrementsSkippedAndAddsEntry() {
102+
var status =
103+
TransferProcessStatus.create("process123")
104+
.addFailedPatient("patient-1", "Connection refused");
105+
106+
assertThat(status.skippedBundles()).isEqualTo(1);
107+
assertThat(status.failedPatients()).hasSize(1);
108+
assertThat(status.failedPatients().getFirst().patientId()).isEqualTo("patient-1");
109+
assertThat(status.failedPatients().getFirst().errorMessage()).isEqualTo("Connection refused");
110+
}
111+
112+
@Test
113+
void addMultipleFailedPatientsAccumulatesAll() {
114+
var status =
115+
TransferProcessStatus.create("process123")
116+
.addFailedPatient("patient-1", "Error A")
117+
.addFailedPatient("patient-2", "Error B");
118+
119+
assertThat(status.skippedBundles()).isEqualTo(2);
120+
assertThat(status.failedPatients()).hasSize(2);
121+
assertThat(status.failedPatients().get(0).patientId()).isEqualTo("patient-1");
122+
assertThat(status.failedPatients().get(1).patientId()).isEqualTo("patient-2");
123+
}
124+
125+
@Test
126+
void addFailedPatientPreservesImmutability() {
127+
var original = TransferProcessStatus.create("process123");
128+
var withError = original.addFailedPatient("patient-1", "Error");
129+
130+
assertThat(original.failedPatients()).isEmpty();
131+
assertThat(original.skippedBundles()).isEqualTo(0);
132+
assertThat(withError.failedPatients()).hasSize(1);
133+
}
134+
135+
@Test
136+
void failedPatientsListIsUnmodifiable() {
137+
var status = TransferProcessStatus.create("process123").addFailedPatient("patient-1", "Error");
138+
139+
assertThatThrownBy(() -> status.failedPatients().add(new PatientError("x", "y")))
140+
.isInstanceOf(UnsupportedOperationException.class);
141+
}
96142
}

0 commit comments

Comments
 (0)