Skip to content

Commit 55de5b2

Browse files
committed
Expose Failed Patient Transfers via Status
1 parent 5ed277e commit 55de5b2

File tree

7 files changed

+343
-22
lines changed

7 files changed

+343
-22
lines changed

clinical-domain-agent/src/main/java/care/smith/fts/cda/DefaultTransferProcessRunner.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import care.smith.fts.api.ConsentedPatientBundle;
99
import care.smith.fts.api.TransportBundle;
1010
import care.smith.fts.api.cda.BundleSender.Result;
11+
import care.smith.fts.cda.TransferProcessStatus.Step;
1112
import com.fasterxml.jackson.databind.ObjectMapper;
1213
import java.time.LocalDateTime;
1314
import java.util.HashMap;
@@ -151,10 +152,17 @@ private Flux<ConsentedPatientBundle> selectDataForPatient(ConsentedPatient patie
151152
return process
152153
.dataSelector()
153154
.select(patient)
154-
.doOnError(e -> logError("select data", patient.identifier(), e));
155+
.onErrorResume(e -> handlePatientError(patient.identifier(), Step.SELECT_DATA, e));
155156
}
156157

157-
private void logError(String step, String patientIdentifier, Throwable e) {
158+
private <T> Mono<T> handlePatientError(String patientId, Step step, Throwable e) {
159+
logError(step, patientId, e);
160+
status.updateAndGet(
161+
s -> s.incSkippedBundles().addFailedPatient(patientId, step, e.getMessage()));
162+
return Mono.empty();
163+
}
164+
165+
private void logError(Step step, String patientIdentifier, Throwable e) {
158166
var msg = "[Process {}] Failed to {} for patient {}. {}";
159167
log.error(
160168
msg, processId(), step, patientIdentifier, log.isDebugEnabled() ? e : e.getMessage());
@@ -171,25 +179,26 @@ private Flux<PatientContext<TransportBundle>> deidentify(
171179

172180
private Mono<PatientContext<TransportBundle>> deidentifyForPatient(
173181
ConsentedPatientBundle bundle) {
182+
var patientId = bundle.consentedPatient().identifier();
174183
return process
175184
.deidentificator()
176185
.deidentify(bundle)
177-
.doOnError(e -> logError("deidentify bundle", bundle.consentedPatient().identifier(), e))
178-
.map(t -> new PatientContext<>(t, bundle.consentedPatient()));
186+
.map(t -> new PatientContext<>(t, bundle.consentedPatient()))
187+
.onErrorResume(e -> handlePatientError(patientId, Step.DEIDENTIFY, e));
179188
}
180189

181190
private Flux<Result> sendBundles(Flux<PatientContext<TransportBundle>> deidentification) {
182191
return deidentification
183192
.flatMap(this::sendBundleForPatient, config.maxSendConcurrency)
184-
.doOnNext(b -> status.updateAndGet(TransferProcessStatus::incSentBundles))
185-
.onErrorContinue((e, r) -> status.updateAndGet(TransferProcessStatus::incSkippedBundles));
193+
.doOnNext(b -> status.updateAndGet(TransferProcessStatus::incSentBundles));
186194
}
187195

188196
private Mono<Result> sendBundleForPatient(PatientContext<TransportBundle> b) {
197+
var patientId = b.consentedPatient().identifier();
189198
return process
190199
.bundleSender()
191200
.send(b.data())
192-
.doOnError(e -> logError("send bundle", b.consentedPatient().identifier(), e));
201+
.onErrorResume(e -> handlePatientError(patientId, Step.SEND_BUNDLE, e));
193202
}
194203

195204
private void onComplete() {

clinical-domain-agent/src/main/java/care/smith/fts/cda/TransferProcessStatus.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
import static lombok.AccessLevel.PRIVATE;
44

55
import care.smith.fts.cda.TransferProcessRunner.Phase;
6+
import com.fasterxml.jackson.annotation.JsonIgnore;
67
import java.time.LocalDateTime;
8+
import java.util.ArrayList;
9+
import java.util.List;
710
import lombok.With;
811

912
public record TransferProcessStatus(
@@ -15,10 +18,31 @@ public record TransferProcessStatus(
1518
@With(PRIVATE) long totalBundles,
1619
@With(PRIVATE) long deidentifiedBundles,
1720
@With(PRIVATE) long sentBundles,
18-
@With(PRIVATE) long skippedBundles) {
21+
@With(PRIVATE) long skippedBundles,
22+
@JsonIgnore @With(PRIVATE) List<PatientError> failedPatients) {
23+
24+
public enum Step {
25+
SELECT_DATA("select data"),
26+
DEIDENTIFY("deidentify bundle"),
27+
SEND_BUNDLE("send bundle");
28+
29+
private final String displayName;
30+
31+
Step(String displayName) {
32+
this.displayName = displayName;
33+
}
34+
35+
@Override
36+
public String toString() {
37+
return displayName;
38+
}
39+
}
40+
41+
public record PatientError(String patientId, Step step, String errorMessage) {}
42+
1943
public static TransferProcessStatus create(String processId) {
2044
return new TransferProcessStatus(
21-
processId, Phase.QUEUED, LocalDateTime.now(), null, 0, 0, 0, 0, 0);
45+
processId, Phase.QUEUED, LocalDateTime.now(), null, 0, 0, 0, 0, 0, List.of());
2246
}
2347

2448
public TransferProcessStatus incTotalPatients() {
@@ -41,6 +65,12 @@ public TransferProcessStatus incSkippedBundles() {
4165
return withSkippedBundles(skippedBundles + 1);
4266
}
4367

68+
public TransferProcessStatus addFailedPatient(String patientId, Step step, String errorMessage) {
69+
var updated = new ArrayList<>(failedPatients);
70+
updated.add(new PatientError(patientId, step, errorMessage));
71+
return withFailedPatients(List.copyOf(updated));
72+
}
73+
4474
/**
4575
* Set the process phase. If the phase switches to COMPLETED or COMPLETED_WITH_ERROR `finishedAt`
4676
* is set. Once the process is in a completed state going back is not possible anymore.

clinical-domain-agent/src/main/java/care/smith/fts/cda/rest/TransferProcessController.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import care.smith.fts.cda.TransferProcessDefinition;
1010
import care.smith.fts.cda.TransferProcessRunner;
1111
import care.smith.fts.cda.TransferProcessStatus;
12+
import care.smith.fts.cda.TransferProcessStatus.PatientError;
1213
import care.smith.fts.util.error.ErrorResponseUtil;
1314
import io.swagger.v3.oas.annotations.Operation;
1415
import io.swagger.v3.oas.annotations.Parameter;
@@ -120,7 +121,7 @@ private URI generateJobUri(UriComponentsBuilder uriBuilder, String id) {
120121

121122
@GetMapping("/process/status/{processId:[\\w-]+}")
122123
@Operation(
123-
summary = "Transfer process's status",
124+
summary = "Transfer process status",
124125
description = "**Since 5.0**\n\n",
125126
parameters = {
126127
@Parameter(
@@ -153,6 +154,42 @@ Mono<ResponseEntity<TransferProcessStatus>> status(
153154
.onErrorResume(ErrorResponseUtil::notFound);
154155
}
155156

157+
@GetMapping("/process/status/{processId:[\\w-]+}/failed_patients")
158+
@Operation(
159+
summary = "Failed patients of a transfer process",
160+
description = "**Since 6.0**\n\nReturns patient IDs and error messages for failed transfers.",
161+
parameters = {
162+
@Parameter(
163+
name = "processId",
164+
schema = @Schema(implementation = String.class),
165+
description = "Transfer process ID")
166+
},
167+
responses = {
168+
@ApiResponse(
169+
responseCode = "200",
170+
content =
171+
@Content(
172+
mediaType = "application/json",
173+
schema = @Schema(implementation = PatientError.class),
174+
examples = {
175+
@ExampleObject(
176+
name = "Failed patients",
177+
value =
178+
"""
179+
[{"patientId":"patient-001","step":"SELECT_DATA","errorMessage":"Connection refused"},\
180+
{"patientId":"patient-042","step":"DEIDENTIFY","errorMessage":"Cannot deidentify bundle"}]
181+
""")
182+
})),
183+
@ApiResponse(responseCode = "404", description = "The process could not be found")
184+
})
185+
Mono<ResponseEntity<List<PatientError>>> failedPatients(
186+
@PathVariable(value = "processId") String processId) {
187+
return processRunner
188+
.status(processId)
189+
.map(s -> ResponseEntity.ok().body(s.failedPatients()))
190+
.onErrorResume(ErrorResponseUtil::notFound);
191+
}
192+
156193
@GetMapping("/process/statuses")
157194
@Operation(
158195
summary = "List of all transfer process statuses",

clinical-domain-agent/src/test/java/care/smith/fts/cda/DefaultTransferProcessRunnerTest.java

Lines changed: 134 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import care.smith.fts.api.cda.DataSelector;
1414
import care.smith.fts.api.cda.Deidentificator;
1515
import care.smith.fts.cda.TransferProcessRunner.Phase;
16+
import care.smith.fts.cda.TransferProcessStatus.Step;
1617
import ch.qos.logback.classic.Level;
1718
import ch.qos.logback.classic.Logger;
1819
import ch.qos.logback.classic.spi.ILoggingEvent;
@@ -21,6 +22,7 @@
2122
import java.time.Duration;
2223
import java.util.List;
2324
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.atomic.AtomicInteger;
2426
import org.hl7.fhir.r4.model.Bundle;
2527
import org.junit.jupiter.api.BeforeEach;
2628
import org.junit.jupiter.api.Test;
@@ -37,6 +39,9 @@ class DefaultTransferProcessRunnerTest {
3739
private static final String PATIENT_IDENTIFIER_2 = "patient-142391";
3840
private static final ConsentedPatient PATIENT_2 =
3941
new ConsentedPatient(PATIENT_IDENTIFIER_2, "system");
42+
private static final String PATIENT_IDENTIFIER_3 = "patient-293847";
43+
private static final ConsentedPatient PATIENT_3 =
44+
new ConsentedPatient(PATIENT_IDENTIFIER_3, "system");
4045

4146
private DefaultTransferProcessRunner runner;
4247

@@ -166,12 +171,7 @@ void ttl() throws InterruptedException {
166171

167172
sleep(110L); // wait TTL seconds for process 1 and 2 to be removed
168173

169-
create(runner.statuses())
170-
.assertNext(
171-
r -> {
172-
assertThat(r.size()).isEqualTo(0);
173-
})
174-
.verifyComplete();
174+
create(runner.statuses()).assertNext(r -> assertThat(r.size()).isEqualTo(0)).verifyComplete();
175175
}
176176

177177
@Test
@@ -236,7 +236,7 @@ void errorInBundleSenderSkipsBundleAndContinues() {
236236
new TransferProcessDefinition(
237237
"test",
238238
rawConfig,
239-
pids -> fromIterable(List.of(PATIENT, DefaultTransferProcessRunnerTest.PATIENT_2)),
239+
pids -> fromIterable(List.of(PATIENT, PATIENT_2)),
240240
p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), p))),
241241
b -> just(new TransportBundle(new Bundle(), "transferId")),
242242
errorOnSecond(new Result()));
@@ -255,6 +255,133 @@ private static BundleSender errorOnSecond(Result result) {
255255
: Mono.error(new RuntimeException("Cannot send bundle"));
256256
}
257257

258+
private static DataSelector failOnSecondCall(Bundle bundle) {
259+
var counter = new AtomicInteger(0);
260+
return p ->
261+
counter.incrementAndGet() == 2
262+
? Flux.error(new RuntimeException("Cannot select data"))
263+
: Flux.just(bundle).map(b -> new ConsentedPatientBundle(b, p));
264+
}
265+
266+
private static Deidentificator failOnSecondCall(TransportBundle bundle) {
267+
var counter = new AtomicInteger(0);
268+
return p ->
269+
counter.incrementAndGet() == 2
270+
? Mono.error(new RuntimeException("Cannot deidentify bundle"))
271+
: just(bundle);
272+
}
273+
274+
private static BundleSender failOnSecondCall(Result result) {
275+
var counter = new AtomicInteger(0);
276+
return p ->
277+
counter.incrementAndGet() == 2
278+
? Mono.error(new RuntimeException("Cannot send bundle"))
279+
: just(result);
280+
}
281+
282+
@Test
283+
void errorInDataSelectorRecordsFailedPatient() {
284+
var process =
285+
new TransferProcessDefinition(
286+
"test",
287+
rawConfig,
288+
pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)),
289+
failOnSecondCall(new Bundle()),
290+
b -> just(new TransportBundle(new Bundle(), "transferId")),
291+
b -> just(new Result()));
292+
293+
var processId = runner.start(process, List.of());
294+
waitForCompletion(processId);
295+
296+
create(runner.status(processId))
297+
.assertNext(
298+
r -> {
299+
assertThat(r.phase()).isEqualTo(Phase.COMPLETED_WITH_ERROR);
300+
assertThat(r.sentBundles()).isEqualTo(2);
301+
assertThat(r.failedPatients()).hasSize(1);
302+
assertThat(r.failedPatients().getFirst().patientId()).isEqualTo(PATIENT_IDENTIFIER_2);
303+
assertThat(r.failedPatients().getFirst().step()).isEqualTo(Step.SELECT_DATA);
304+
assertThat(r.failedPatients().getFirst().errorMessage())
305+
.isEqualTo("Cannot select data");
306+
})
307+
.verifyComplete();
308+
}
309+
310+
@Test
311+
void errorInDeidentificatorRecordsFailedPatient() {
312+
var process =
313+
new TransferProcessDefinition(
314+
"test",
315+
rawConfig,
316+
pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)),
317+
p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), p))),
318+
failOnSecondCall(new TransportBundle(new Bundle(), "transferId")),
319+
b -> just(new Result()));
320+
321+
var processId = runner.start(process, List.of());
322+
waitForCompletion(processId);
323+
324+
create(runner.status(processId))
325+
.assertNext(
326+
r -> {
327+
assertThat(r.phase()).isEqualTo(Phase.COMPLETED_WITH_ERROR);
328+
assertThat(r.sentBundles()).isEqualTo(2);
329+
assertThat(r.failedPatients()).hasSize(1);
330+
assertThat(r.failedPatients().getFirst().step()).isEqualTo(Step.DEIDENTIFY);
331+
assertThat(r.failedPatients().getFirst().errorMessage())
332+
.isEqualTo("Cannot deidentify bundle");
333+
})
334+
.verifyComplete();
335+
}
336+
337+
@Test
338+
void errorInBundleSenderRecordsFailedPatient() {
339+
var process =
340+
new TransferProcessDefinition(
341+
"test",
342+
rawConfig,
343+
pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)),
344+
p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), p))),
345+
b -> just(new TransportBundle(new Bundle(), "transferId")),
346+
failOnSecondCall(new Result()));
347+
348+
var processId = runner.start(process, List.of());
349+
waitForCompletion(processId);
350+
351+
create(runner.status(processId))
352+
.assertNext(
353+
r -> {
354+
assertThat(r.phase()).isEqualTo(Phase.COMPLETED_WITH_ERROR);
355+
assertThat(r.sentBundles()).isEqualTo(2);
356+
assertThat(r.failedPatients()).hasSize(1);
357+
assertThat(r.failedPatients().getFirst().step()).isEqualTo(Step.SEND_BUNDLE);
358+
assertThat(r.failedPatients().getFirst().errorMessage())
359+
.isEqualTo("Cannot send bundle");
360+
})
361+
.verifyComplete();
362+
}
363+
364+
@Test
365+
void successfulTransferHasNoFailedPatients() {
366+
var process =
367+
new TransferProcessDefinition(
368+
"test",
369+
rawConfig,
370+
pids -> fromIterable(List.of(PATIENT)),
371+
p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), PATIENT))),
372+
b -> just(new TransportBundle(new Bundle(), "transferId")),
373+
b -> just(new Result()));
374+
375+
var processId = runner.start(process, List.of());
376+
create(runner.status(processId))
377+
.assertNext(
378+
r -> {
379+
assertThat(r.phase()).isEqualTo(Phase.COMPLETED);
380+
assertThat(r.failedPatients()).isEmpty();
381+
})
382+
.verifyComplete();
383+
}
384+
258385
@Test
259386
void logErrorIncludesExceptionWhenDebugEnabled() {
260387
var event = runWithLogLevel(Level.DEBUG);

0 commit comments

Comments
 (0)