From 55de5b202c8e8874fe063b99551c248fff47e2bc Mon Sep 17 00:00:00 2001 From: Daniel Hahne Date: Wed, 25 Feb 2026 12:51:28 +0100 Subject: [PATCH] Expose Failed Patient Transfers via Status --- .../fts/cda/DefaultTransferProcessRunner.java | 23 ++- .../smith/fts/cda/TransferProcessStatus.java | 34 ++++- .../cda/rest/TransferProcessController.java | 39 ++++- .../cda/DefaultTransferProcessRunnerTest.java | 141 +++++++++++++++++- .../fts/cda/TransferProcessStatusTest.java | 58 ++++++- .../rest/TransferProcessControllerTest.java | 34 +++++ docs/usage/execution.md | 36 ++++- 7 files changed, 343 insertions(+), 22 deletions(-) diff --git a/clinical-domain-agent/src/main/java/care/smith/fts/cda/DefaultTransferProcessRunner.java b/clinical-domain-agent/src/main/java/care/smith/fts/cda/DefaultTransferProcessRunner.java index 6ff0f8ac8..5690fe787 100644 --- a/clinical-domain-agent/src/main/java/care/smith/fts/cda/DefaultTransferProcessRunner.java +++ b/clinical-domain-agent/src/main/java/care/smith/fts/cda/DefaultTransferProcessRunner.java @@ -8,6 +8,7 @@ import care.smith.fts.api.ConsentedPatientBundle; import care.smith.fts.api.TransportBundle; import care.smith.fts.api.cda.BundleSender.Result; +import care.smith.fts.cda.TransferProcessStatus.Step; import com.fasterxml.jackson.databind.ObjectMapper; import java.time.LocalDateTime; import java.util.HashMap; @@ -151,10 +152,17 @@ private Flux selectDataForPatient(ConsentedPatient patie return process .dataSelector() .select(patient) - .doOnError(e -> logError("select data", patient.identifier(), e)); + .onErrorResume(e -> handlePatientError(patient.identifier(), Step.SELECT_DATA, e)); } - private void logError(String step, String patientIdentifier, Throwable e) { + private Mono handlePatientError(String patientId, Step step, Throwable e) { + logError(step, patientId, e); + status.updateAndGet( + s -> s.incSkippedBundles().addFailedPatient(patientId, step, e.getMessage())); + return Mono.empty(); + } + + private void logError(Step step, String patientIdentifier, Throwable e) { var msg = "[Process {}] Failed to {} for patient {}. {}"; log.error( msg, processId(), step, patientIdentifier, log.isDebugEnabled() ? e : e.getMessage()); @@ -171,25 +179,26 @@ private Flux> deidentify( private Mono> deidentifyForPatient( ConsentedPatientBundle bundle) { + var patientId = bundle.consentedPatient().identifier(); return process .deidentificator() .deidentify(bundle) - .doOnError(e -> logError("deidentify bundle", bundle.consentedPatient().identifier(), e)) - .map(t -> new PatientContext<>(t, bundle.consentedPatient())); + .map(t -> new PatientContext<>(t, bundle.consentedPatient())) + .onErrorResume(e -> handlePatientError(patientId, Step.DEIDENTIFY, e)); } private Flux sendBundles(Flux> deidentification) { return deidentification .flatMap(this::sendBundleForPatient, config.maxSendConcurrency) - .doOnNext(b -> status.updateAndGet(TransferProcessStatus::incSentBundles)) - .onErrorContinue((e, r) -> status.updateAndGet(TransferProcessStatus::incSkippedBundles)); + .doOnNext(b -> status.updateAndGet(TransferProcessStatus::incSentBundles)); } private Mono sendBundleForPatient(PatientContext b) { + var patientId = b.consentedPatient().identifier(); return process .bundleSender() .send(b.data()) - .doOnError(e -> logError("send bundle", b.consentedPatient().identifier(), e)); + .onErrorResume(e -> handlePatientError(patientId, Step.SEND_BUNDLE, e)); } private void onComplete() { diff --git a/clinical-domain-agent/src/main/java/care/smith/fts/cda/TransferProcessStatus.java b/clinical-domain-agent/src/main/java/care/smith/fts/cda/TransferProcessStatus.java index 5364303ab..618d5c3d9 100644 --- a/clinical-domain-agent/src/main/java/care/smith/fts/cda/TransferProcessStatus.java +++ b/clinical-domain-agent/src/main/java/care/smith/fts/cda/TransferProcessStatus.java @@ -3,7 +3,10 @@ import static lombok.AccessLevel.PRIVATE; import care.smith.fts.cda.TransferProcessRunner.Phase; +import com.fasterxml.jackson.annotation.JsonIgnore; import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; import lombok.With; public record TransferProcessStatus( @@ -15,10 +18,31 @@ public record TransferProcessStatus( @With(PRIVATE) long totalBundles, @With(PRIVATE) long deidentifiedBundles, @With(PRIVATE) long sentBundles, - @With(PRIVATE) long skippedBundles) { + @With(PRIVATE) long skippedBundles, + @JsonIgnore @With(PRIVATE) List failedPatients) { + + public enum Step { + SELECT_DATA("select data"), + DEIDENTIFY("deidentify bundle"), + SEND_BUNDLE("send bundle"); + + private final String displayName; + + Step(String displayName) { + this.displayName = displayName; + } + + @Override + public String toString() { + return displayName; + } + } + + public record PatientError(String patientId, Step step, String errorMessage) {} + public static TransferProcessStatus create(String processId) { return new TransferProcessStatus( - processId, Phase.QUEUED, LocalDateTime.now(), null, 0, 0, 0, 0, 0); + processId, Phase.QUEUED, LocalDateTime.now(), null, 0, 0, 0, 0, 0, List.of()); } public TransferProcessStatus incTotalPatients() { @@ -41,6 +65,12 @@ public TransferProcessStatus incSkippedBundles() { return withSkippedBundles(skippedBundles + 1); } + public TransferProcessStatus addFailedPatient(String patientId, Step step, String errorMessage) { + var updated = new ArrayList<>(failedPatients); + updated.add(new PatientError(patientId, step, errorMessage)); + return withFailedPatients(List.copyOf(updated)); + } + /** * Set the process phase. If the phase switches to COMPLETED or COMPLETED_WITH_ERROR `finishedAt` * is set. Once the process is in a completed state going back is not possible anymore. diff --git a/clinical-domain-agent/src/main/java/care/smith/fts/cda/rest/TransferProcessController.java b/clinical-domain-agent/src/main/java/care/smith/fts/cda/rest/TransferProcessController.java index 34351d0c5..6616a07d9 100644 --- a/clinical-domain-agent/src/main/java/care/smith/fts/cda/rest/TransferProcessController.java +++ b/clinical-domain-agent/src/main/java/care/smith/fts/cda/rest/TransferProcessController.java @@ -9,6 +9,7 @@ import care.smith.fts.cda.TransferProcessDefinition; import care.smith.fts.cda.TransferProcessRunner; import care.smith.fts.cda.TransferProcessStatus; +import care.smith.fts.cda.TransferProcessStatus.PatientError; import care.smith.fts.util.error.ErrorResponseUtil; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -120,7 +121,7 @@ private URI generateJobUri(UriComponentsBuilder uriBuilder, String id) { @GetMapping("/process/status/{processId:[\\w-]+}") @Operation( - summary = "Transfer process's status", + summary = "Transfer process status", description = "**Since 5.0**\n\n", parameters = { @Parameter( @@ -153,6 +154,42 @@ Mono> status( .onErrorResume(ErrorResponseUtil::notFound); } + @GetMapping("/process/status/{processId:[\\w-]+}/failed_patients") + @Operation( + summary = "Failed patients of a transfer process", + description = "**Since 6.0**\n\nReturns patient IDs and error messages for failed transfers.", + parameters = { + @Parameter( + name = "processId", + schema = @Schema(implementation = String.class), + description = "Transfer process ID") + }, + responses = { + @ApiResponse( + responseCode = "200", + content = + @Content( + mediaType = "application/json", + schema = @Schema(implementation = PatientError.class), + examples = { + @ExampleObject( + name = "Failed patients", + value = +""" +[{"patientId":"patient-001","step":"SELECT_DATA","errorMessage":"Connection refused"},\ +{"patientId":"patient-042","step":"DEIDENTIFY","errorMessage":"Cannot deidentify bundle"}] +""") + })), + @ApiResponse(responseCode = "404", description = "The process could not be found") + }) + Mono>> failedPatients( + @PathVariable(value = "processId") String processId) { + return processRunner + .status(processId) + .map(s -> ResponseEntity.ok().body(s.failedPatients())) + .onErrorResume(ErrorResponseUtil::notFound); + } + @GetMapping("/process/statuses") @Operation( summary = "List of all transfer process statuses", diff --git a/clinical-domain-agent/src/test/java/care/smith/fts/cda/DefaultTransferProcessRunnerTest.java b/clinical-domain-agent/src/test/java/care/smith/fts/cda/DefaultTransferProcessRunnerTest.java index 787759152..b129afb9f 100644 --- a/clinical-domain-agent/src/test/java/care/smith/fts/cda/DefaultTransferProcessRunnerTest.java +++ b/clinical-domain-agent/src/test/java/care/smith/fts/cda/DefaultTransferProcessRunnerTest.java @@ -13,6 +13,7 @@ import care.smith.fts.api.cda.DataSelector; import care.smith.fts.api.cda.Deidentificator; import care.smith.fts.cda.TransferProcessRunner.Phase; +import care.smith.fts.cda.TransferProcessStatus.Step; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.spi.ILoggingEvent; @@ -21,6 +22,7 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.hl7.fhir.r4.model.Bundle; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -37,6 +39,9 @@ class DefaultTransferProcessRunnerTest { private static final String PATIENT_IDENTIFIER_2 = "patient-142391"; private static final ConsentedPatient PATIENT_2 = new ConsentedPatient(PATIENT_IDENTIFIER_2, "system"); + private static final String PATIENT_IDENTIFIER_3 = "patient-293847"; + private static final ConsentedPatient PATIENT_3 = + new ConsentedPatient(PATIENT_IDENTIFIER_3, "system"); private DefaultTransferProcessRunner runner; @@ -166,12 +171,7 @@ void ttl() throws InterruptedException { sleep(110L); // wait TTL seconds for process 1 and 2 to be removed - create(runner.statuses()) - .assertNext( - r -> { - assertThat(r.size()).isEqualTo(0); - }) - .verifyComplete(); + create(runner.statuses()).assertNext(r -> assertThat(r.size()).isEqualTo(0)).verifyComplete(); } @Test @@ -236,7 +236,7 @@ void errorInBundleSenderSkipsBundleAndContinues() { new TransferProcessDefinition( "test", rawConfig, - pids -> fromIterable(List.of(PATIENT, DefaultTransferProcessRunnerTest.PATIENT_2)), + pids -> fromIterable(List.of(PATIENT, PATIENT_2)), p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), p))), b -> just(new TransportBundle(new Bundle(), "transferId")), errorOnSecond(new Result())); @@ -255,6 +255,133 @@ private static BundleSender errorOnSecond(Result result) { : Mono.error(new RuntimeException("Cannot send bundle")); } + private static DataSelector failOnSecondCall(Bundle bundle) { + var counter = new AtomicInteger(0); + return p -> + counter.incrementAndGet() == 2 + ? Flux.error(new RuntimeException("Cannot select data")) + : Flux.just(bundle).map(b -> new ConsentedPatientBundle(b, p)); + } + + private static Deidentificator failOnSecondCall(TransportBundle bundle) { + var counter = new AtomicInteger(0); + return p -> + counter.incrementAndGet() == 2 + ? Mono.error(new RuntimeException("Cannot deidentify bundle")) + : just(bundle); + } + + private static BundleSender failOnSecondCall(Result result) { + var counter = new AtomicInteger(0); + return p -> + counter.incrementAndGet() == 2 + ? Mono.error(new RuntimeException("Cannot send bundle")) + : just(result); + } + + @Test + void errorInDataSelectorRecordsFailedPatient() { + var process = + new TransferProcessDefinition( + "test", + rawConfig, + pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)), + failOnSecondCall(new Bundle()), + b -> just(new TransportBundle(new Bundle(), "transferId")), + b -> just(new Result())); + + var processId = runner.start(process, List.of()); + waitForCompletion(processId); + + create(runner.status(processId)) + .assertNext( + r -> { + assertThat(r.phase()).isEqualTo(Phase.COMPLETED_WITH_ERROR); + assertThat(r.sentBundles()).isEqualTo(2); + assertThat(r.failedPatients()).hasSize(1); + assertThat(r.failedPatients().getFirst().patientId()).isEqualTo(PATIENT_IDENTIFIER_2); + assertThat(r.failedPatients().getFirst().step()).isEqualTo(Step.SELECT_DATA); + assertThat(r.failedPatients().getFirst().errorMessage()) + .isEqualTo("Cannot select data"); + }) + .verifyComplete(); + } + + @Test + void errorInDeidentificatorRecordsFailedPatient() { + var process = + new TransferProcessDefinition( + "test", + rawConfig, + pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)), + p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), p))), + failOnSecondCall(new TransportBundle(new Bundle(), "transferId")), + b -> just(new Result())); + + var processId = runner.start(process, List.of()); + waitForCompletion(processId); + + create(runner.status(processId)) + .assertNext( + r -> { + assertThat(r.phase()).isEqualTo(Phase.COMPLETED_WITH_ERROR); + assertThat(r.sentBundles()).isEqualTo(2); + assertThat(r.failedPatients()).hasSize(1); + assertThat(r.failedPatients().getFirst().step()).isEqualTo(Step.DEIDENTIFY); + assertThat(r.failedPatients().getFirst().errorMessage()) + .isEqualTo("Cannot deidentify bundle"); + }) + .verifyComplete(); + } + + @Test + void errorInBundleSenderRecordsFailedPatient() { + var process = + new TransferProcessDefinition( + "test", + rawConfig, + pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)), + p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), p))), + b -> just(new TransportBundle(new Bundle(), "transferId")), + failOnSecondCall(new Result())); + + var processId = runner.start(process, List.of()); + waitForCompletion(processId); + + create(runner.status(processId)) + .assertNext( + r -> { + assertThat(r.phase()).isEqualTo(Phase.COMPLETED_WITH_ERROR); + assertThat(r.sentBundles()).isEqualTo(2); + assertThat(r.failedPatients()).hasSize(1); + assertThat(r.failedPatients().getFirst().step()).isEqualTo(Step.SEND_BUNDLE); + assertThat(r.failedPatients().getFirst().errorMessage()) + .isEqualTo("Cannot send bundle"); + }) + .verifyComplete(); + } + + @Test + void successfulTransferHasNoFailedPatients() { + var process = + new TransferProcessDefinition( + "test", + rawConfig, + pids -> fromIterable(List.of(PATIENT)), + p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), PATIENT))), + b -> just(new TransportBundle(new Bundle(), "transferId")), + b -> just(new Result())); + + var processId = runner.start(process, List.of()); + create(runner.status(processId)) + .assertNext( + r -> { + assertThat(r.phase()).isEqualTo(Phase.COMPLETED); + assertThat(r.failedPatients()).isEmpty(); + }) + .verifyComplete(); + } + @Test void logErrorIncludesExceptionWhenDebugEnabled() { var event = runWithLogLevel(Level.DEBUG); diff --git a/clinical-domain-agent/src/test/java/care/smith/fts/cda/TransferProcessStatusTest.java b/clinical-domain-agent/src/test/java/care/smith/fts/cda/TransferProcessStatusTest.java index 600511a54..6a1f81bc9 100644 --- a/clinical-domain-agent/src/test/java/care/smith/fts/cda/TransferProcessStatusTest.java +++ b/clinical-domain-agent/src/test/java/care/smith/fts/cda/TransferProcessStatusTest.java @@ -1,8 +1,11 @@ package care.smith.fts.cda; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import care.smith.fts.cda.TransferProcessRunner.Phase; +import care.smith.fts.cda.TransferProcessStatus.PatientError; +import care.smith.fts.cda.TransferProcessStatus.Step; import java.time.LocalDateTime; import org.junit.jupiter.api.Test; @@ -22,6 +25,7 @@ void testCreate() { assertThat(status.deidentifiedBundles()).isEqualTo(0); assertThat(status.sentBundles()).isEqualTo(0); assertThat(status.skippedBundles()).isEqualTo(0); + assertThat(status.failedPatients()).isEmpty(); } @Test @@ -82,15 +86,61 @@ void testMayBeRemoved() { void testIsCompleted() { var status = TransferProcessStatus.create("process123"); - assertThat(status.isCompleted(status.phase())).isFalse(); + assertThat(TransferProcessStatus.isCompleted(status.phase())).isFalse(); status = status.setPhase(Phase.COMPLETED); - assertThat(status.isCompleted(status.phase())).isTrue(); + assertThat(TransferProcessStatus.isCompleted(status.phase())).isTrue(); status = status.setPhase(Phase.COMPLETED_WITH_ERROR); - assertThat(status.isCompleted(status.phase())).isTrue(); + assertThat(TransferProcessStatus.isCompleted(status.phase())).isTrue(); status = status.setPhase(Phase.FATAL); - assertThat(status.isCompleted(status.phase())).isTrue(); + assertThat(TransferProcessStatus.isCompleted(status.phase())).isTrue(); + } + + @Test + void addFailedPatientAddsEntry() { + var status = + TransferProcessStatus.create("process123") + .addFailedPatient("patient-1", Step.SELECT_DATA, "Connection refused"); + + assertThat(status.failedPatients()).hasSize(1); + assertThat(status.failedPatients().getFirst().patientId()).isEqualTo("patient-1"); + assertThat(status.failedPatients().getFirst().step()).isEqualTo(Step.SELECT_DATA); + assertThat(status.failedPatients().getFirst().errorMessage()).isEqualTo("Connection refused"); + } + + @Test + void addMultipleFailedPatientsAccumulatesAll() { + var status = + TransferProcessStatus.create("process123") + .addFailedPatient("patient-1", Step.SELECT_DATA, "Error A") + .addFailedPatient("patient-2", Step.DEIDENTIFY, "Error B"); + + assertThat(status.failedPatients()).hasSize(2); + assertThat(status.failedPatients().get(0).patientId()).isEqualTo("patient-1"); + assertThat(status.failedPatients().get(0).step()).isEqualTo(Step.SELECT_DATA); + assertThat(status.failedPatients().get(1).patientId()).isEqualTo("patient-2"); + assertThat(status.failedPatients().get(1).step()).isEqualTo(Step.DEIDENTIFY); + } + + @Test + void addFailedPatientPreservesImmutability() { + var original = TransferProcessStatus.create("process123"); + var withError = original.addFailedPatient("patient-1", Step.SEND_BUNDLE, "Error"); + + assertThat(original.failedPatients()).isEmpty(); + assertThat(withError.failedPatients()).hasSize(1); + } + + @Test + void failedPatientsListIsUnmodifiable() { + var status = + TransferProcessStatus.create("process123") + .addFailedPatient("patient-1", Step.SELECT_DATA, "Error"); + + assertThatThrownBy( + () -> status.failedPatients().add(new PatientError("x", Step.SELECT_DATA, "y"))) + .isInstanceOf(UnsupportedOperationException.class); } } diff --git a/clinical-domain-agent/src/test/java/care/smith/fts/cda/rest/TransferProcessControllerTest.java b/clinical-domain-agent/src/test/java/care/smith/fts/cda/rest/TransferProcessControllerTest.java index 4326fe7d3..515a7f360 100644 --- a/clinical-domain-agent/src/test/java/care/smith/fts/cda/rest/TransferProcessControllerTest.java +++ b/clinical-domain-agent/src/test/java/care/smith/fts/cda/rest/TransferProcessControllerTest.java @@ -14,6 +14,7 @@ import care.smith.fts.cda.TransferProcessRunner; import care.smith.fts.cda.TransferProcessRunner.Phase; import care.smith.fts.cda.TransferProcessStatus; +import care.smith.fts.cda.TransferProcessStatus.Step; import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -170,4 +171,37 @@ void unknownProject() { var config = api.project("unknown"); assertThat(config.getStatusCode()).isEqualTo(NOT_FOUND); } + + @Test + void failedPatientsReturnsErrors() { + var processId = "failed-123456"; + var result = + TransferProcessStatus.create(processId) + .addFailedPatient("patient-001", Step.SELECT_DATA, "Connection refused") + .addFailedPatient("patient-042", Step.DEIDENTIFY, "Cannot deidentify bundle"); + + when(mockRunner.status(processId)).thenReturn(Mono.just(result)); + + create(api.failedPatients(processId)) + .assertNext( + r -> { + assertThat(r.getStatusCode()).isEqualTo(OK); + assertThat(r.getBody()).hasSize(2); + assertThat(r.getBody().get(0).patientId()).isEqualTo("patient-001"); + assertThat(r.getBody().get(0).step()).isEqualTo(Step.SELECT_DATA); + assertThat(r.getBody().get(1).patientId()).isEqualTo("patient-042"); + assertThat(r.getBody().get(1).step()).isEqualTo(Step.DEIDENTIFY); + }) + .verifyComplete(); + } + + @Test + void failedPatientsWithUnknownProcessIdReturns404() { + when(mockRunner.status(Mockito.anyString())) + .thenReturn(Mono.error(new IllegalStateException("No transfer process with processId: "))); + + create(api.failedPatients("unknown")) + .assertNext(r -> assertThat(r.getStatusCode()).isEqualTo(NOT_FOUND)) + .verifyComplete(); + } } diff --git a/docs/usage/execution.md b/docs/usage/execution.md index fac21be3d..8416c13ed 100644 --- a/docs/usage/execution.md +++ b/docs/usage/execution.md @@ -71,10 +71,44 @@ The status response looks like this: | `totalBundles` | Total number of bundles to be processed | | `deidentifiedBundles` | Number of bundles after deidentification | | `sentBundles` | Number of bundles sent to RDA | -| `skippedBundles` | Number of skipped bundles; if greater than zero, investigate logs to determine the cause | +| `skippedBundles` | Number of skipped bundles; if greater than zero, query the [failed patients endpoint](#failed-patients) for details | [API Reference for Status Endpoint](/open-api/cd-openapi.html#get-/api/v2/process/status/-processId-) +### Failed Patients + +When `skippedBundles` is greater than zero, the failed patients endpoint returns +which patients failed and at which step: + +```shell +curl -sSf "https://cd-agent:8080/api/v2/process/status/52792219-b966-44bf-bc1b-c0eafbe8ead0/failed_patients" +``` + + +```json +[ + { + "patientId": "patient-001", + "step": "SELECT_DATA", + "errorMessage": "Connection refused" + }, + { + "patientId": "patient-042", + "step": "DEIDENTIFY", + "errorMessage": "Cannot deidentify bundle" + } +] +``` + + +| Field | Description | +|----------------|--------------------------------------------------------------------------------------------------| +| `patientId` | Identifier of the patient whose transfer failed | +| `step` | Processing step where the error occurred (`SELECT_DATA`, `DEIDENTIFY`, `SEND_BUNDLE`) | +| `errorMessage` | Error message describing the failure | + +[API Reference for Failed Patients Endpoint](/open-api/cd-openapi.html#get-/api/v2/process/status/-processId-/failed_patients) + ## Monitoring FTSnext provides a monitoring docker container with Grafana dashboards that show some metrics.