Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import care.smith.fts.api.ConsentedPatientBundle;
import care.smith.fts.api.TransportBundle;
import care.smith.fts.api.cda.BundleSender.Result;
import care.smith.fts.cda.TransferProcessStatus.Step;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.LocalDateTime;
import java.util.HashMap;
Expand Down Expand Up @@ -151,10 +152,17 @@ private Flux<ConsentedPatientBundle> selectDataForPatient(ConsentedPatient patie
return process
.dataSelector()
.select(patient)
.doOnError(e -> logError("select data", patient.identifier(), e));
.onErrorResume(e -> handlePatientError(patient.identifier(), Step.SELECT_DATA, e));
}

private void logError(String step, String patientIdentifier, Throwable e) {
private <T> Mono<T> handlePatientError(String patientId, Step step, Throwable e) {
logError(step, patientId, e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The step string should come from the step enum.

status.updateAndGet(
s -> s.incSkippedBundles().addFailedPatient(patientId, step, e.getMessage()));
return Mono.empty();
}

private void logError(Step step, String patientIdentifier, Throwable e) {
var msg = "[Process {}] Failed to {} for patient {}. {}";
log.error(
msg, processId(), step, patientIdentifier, log.isDebugEnabled() ? e : e.getMessage());
Expand All @@ -171,25 +179,26 @@ private Flux<PatientContext<TransportBundle>> deidentify(

private Mono<PatientContext<TransportBundle>> deidentifyForPatient(
ConsentedPatientBundle bundle) {
var patientId = bundle.consentedPatient().identifier();
return process
.deidentificator()
.deidentify(bundle)
.doOnError(e -> logError("deidentify bundle", bundle.consentedPatient().identifier(), e))
.map(t -> new PatientContext<>(t, bundle.consentedPatient()));
.map(t -> new PatientContext<>(t, bundle.consentedPatient()))
.onErrorResume(e -> handlePatientError(patientId, Step.DEIDENTIFY, e));
}

private Flux<Result> sendBundles(Flux<PatientContext<TransportBundle>> deidentification) {
return deidentification
.flatMap(this::sendBundleForPatient, config.maxSendConcurrency)
.doOnNext(b -> status.updateAndGet(TransferProcessStatus::incSentBundles))
.onErrorContinue((e, r) -> status.updateAndGet(TransferProcessStatus::incSkippedBundles));
.doOnNext(b -> status.updateAndGet(TransferProcessStatus::incSentBundles));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was the onErrorContinue removed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The onErrorContinue was replaced by onErrorResume(... -> Mono.empty()) inside each per-patient method (selectDataForPatient, deidentifyForPatient, sendBundleForPatient). This converts errors to empty publishers before they reach the outer flux, making onErrorContinue at the flux level unnecessary.

This approach is also safer — onErrorContinue has well-known pitfalls where it doesn't work as expected with certain operators (e.g. flatMap), because it relies on operator-level support. onErrorResume is the more predictable pattern.

}

private Mono<Result> sendBundleForPatient(PatientContext<TransportBundle> b) {
var patientId = b.consentedPatient().identifier();
return process
.bundleSender()
.send(b.data())
.doOnError(e -> logError("send bundle", b.consentedPatient().identifier(), e));
.onErrorResume(e -> handlePatientError(patientId, Step.SEND_BUNDLE, e));
}

private void onComplete() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import static lombok.AccessLevel.PRIVATE;

import care.smith.fts.cda.TransferProcessRunner.Phase;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import lombok.With;

public record TransferProcessStatus(
Expand All @@ -15,10 +18,31 @@ public record TransferProcessStatus(
@With(PRIVATE) long totalBundles,
@With(PRIVATE) long deidentifiedBundles,
@With(PRIVATE) long sentBundles,
@With(PRIVATE) long skippedBundles) {
@With(PRIVATE) long skippedBundles,
@JsonIgnore @With(PRIVATE) List<PatientError> failedPatients) {

public enum Step {
SELECT_DATA("select data"),
DEIDENTIFY("deidentify bundle"),
SEND_BUNDLE("send bundle");

private final String displayName;

Step(String displayName) {
this.displayName = displayName;
}

@Override
public String toString() {
return displayName;
}
}

public record PatientError(String patientId, Step step, String errorMessage) {}

public static TransferProcessStatus create(String processId) {
return new TransferProcessStatus(
processId, Phase.QUEUED, LocalDateTime.now(), null, 0, 0, 0, 0, 0);
processId, Phase.QUEUED, LocalDateTime.now(), null, 0, 0, 0, 0, 0, List.of());
}

public TransferProcessStatus incTotalPatients() {
Expand All @@ -41,6 +65,12 @@ public TransferProcessStatus incSkippedBundles() {
return withSkippedBundles(skippedBundles + 1);
}

public TransferProcessStatus addFailedPatient(String patientId, Step step, String errorMessage) {
var updated = new ArrayList<>(failedPatients);
updated.add(new PatientError(patientId, step, errorMessage));
return withFailedPatients(List.copyOf(updated));
}

/**
* Set the process phase. If the phase switches to COMPLETED or COMPLETED_WITH_ERROR `finishedAt`
* is set. Once the process is in a completed state going back is not possible anymore.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import care.smith.fts.cda.TransferProcessDefinition;
import care.smith.fts.cda.TransferProcessRunner;
import care.smith.fts.cda.TransferProcessStatus;
import care.smith.fts.cda.TransferProcessStatus.PatientError;
import care.smith.fts.util.error.ErrorResponseUtil;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
Expand Down Expand Up @@ -120,7 +121,7 @@ private URI generateJobUri(UriComponentsBuilder uriBuilder, String id) {

@GetMapping("/process/status/{processId:[\\w-]+}")
@Operation(
summary = "Transfer process's status",
summary = "Transfer process status",
description = "**Since 5.0**\n\n",
parameters = {
@Parameter(
Expand Down Expand Up @@ -153,6 +154,42 @@ Mono<ResponseEntity<TransferProcessStatus>> status(
.onErrorResume(ErrorResponseUtil::notFound);
}

@GetMapping("/process/status/{processId:[\\w-]+}/failed_patients")
@Operation(
summary = "Failed patients of a transfer process",
description = "**Since 6.0**\n\nReturns patient IDs and error messages for failed transfers.",
parameters = {
@Parameter(
name = "processId",
schema = @Schema(implementation = String.class),
description = "Transfer process ID")
},
responses = {
@ApiResponse(
responseCode = "200",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = PatientError.class),
examples = {
@ExampleObject(
name = "Failed patients",
value =
"""
[{"patientId":"patient-001","step":"SELECT_DATA","errorMessage":"Connection refused"},\
{"patientId":"patient-042","step":"DEIDENTIFY","errorMessage":"Cannot deidentify bundle"}]
""")
})),
@ApiResponse(responseCode = "404", description = "The process could not be found")
})
Mono<ResponseEntity<List<PatientError>>> failedPatients(
@PathVariable(value = "processId") String processId) {
return processRunner
.status(processId)
.map(s -> ResponseEntity.ok().body(s.failedPatients()))
.onErrorResume(ErrorResponseUtil::notFound);
}

@GetMapping("/process/statuses")
@Operation(
summary = "List of all transfer process statuses",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import care.smith.fts.api.cda.DataSelector;
import care.smith.fts.api.cda.Deidentificator;
import care.smith.fts.cda.TransferProcessRunner.Phase;
import care.smith.fts.cda.TransferProcessStatus.Step;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
Expand All @@ -21,6 +22,7 @@
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.hl7.fhir.r4.model.Bundle;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -37,6 +39,9 @@ class DefaultTransferProcessRunnerTest {
private static final String PATIENT_IDENTIFIER_2 = "patient-142391";
private static final ConsentedPatient PATIENT_2 =
new ConsentedPatient(PATIENT_IDENTIFIER_2, "system");
private static final String PATIENT_IDENTIFIER_3 = "patient-293847";
private static final ConsentedPatient PATIENT_3 =
new ConsentedPatient(PATIENT_IDENTIFIER_3, "system");

private DefaultTransferProcessRunner runner;

Expand Down Expand Up @@ -166,12 +171,7 @@ void ttl() throws InterruptedException {

sleep(110L); // wait TTL seconds for process 1 and 2 to be removed

create(runner.statuses())
.assertNext(
r -> {
assertThat(r.size()).isEqualTo(0);
})
.verifyComplete();
create(runner.statuses()).assertNext(r -> assertThat(r.size()).isEqualTo(0)).verifyComplete();
}

@Test
Expand Down Expand Up @@ -236,7 +236,7 @@ void errorInBundleSenderSkipsBundleAndContinues() {
new TransferProcessDefinition(
"test",
rawConfig,
pids -> fromIterable(List.of(PATIENT, DefaultTransferProcessRunnerTest.PATIENT_2)),
pids -> fromIterable(List.of(PATIENT, PATIENT_2)),
p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), p))),
b -> just(new TransportBundle(new Bundle(), "transferId")),
errorOnSecond(new Result()));
Expand All @@ -255,6 +255,133 @@ private static BundleSender errorOnSecond(Result result) {
: Mono.error(new RuntimeException("Cannot send bundle"));
}

private static DataSelector failOnSecondCall(Bundle bundle) {
var counter = new AtomicInteger(0);
return p ->
counter.incrementAndGet() == 2
? Flux.error(new RuntimeException("Cannot select data"))
: Flux.just(bundle).map(b -> new ConsentedPatientBundle(b, p));
}

private static Deidentificator failOnSecondCall(TransportBundle bundle) {
var counter = new AtomicInteger(0);
return p ->
counter.incrementAndGet() == 2
? Mono.error(new RuntimeException("Cannot deidentify bundle"))
: just(bundle);
}

private static BundleSender failOnSecondCall(Result result) {
var counter = new AtomicInteger(0);
return p ->
counter.incrementAndGet() == 2
? Mono.error(new RuntimeException("Cannot send bundle"))
: just(result);
}

@Test
void errorInDataSelectorRecordsFailedPatient() {
var process =
new TransferProcessDefinition(
"test",
rawConfig,
pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)),
failOnSecondCall(new Bundle()),
b -> just(new TransportBundle(new Bundle(), "transferId")),
b -> just(new Result()));

var processId = runner.start(process, List.of());
waitForCompletion(processId);

create(runner.status(processId))
.assertNext(
r -> {
assertThat(r.phase()).isEqualTo(Phase.COMPLETED_WITH_ERROR);
assertThat(r.sentBundles()).isEqualTo(2);
assertThat(r.failedPatients()).hasSize(1);
assertThat(r.failedPatients().getFirst().patientId()).isEqualTo(PATIENT_IDENTIFIER_2);
assertThat(r.failedPatients().getFirst().step()).isEqualTo(Step.SELECT_DATA);
assertThat(r.failedPatients().getFirst().errorMessage())
.isEqualTo("Cannot select data");
})
.verifyComplete();
}

@Test
void errorInDeidentificatorRecordsFailedPatient() {
var process =
new TransferProcessDefinition(
"test",
rawConfig,
pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)),
p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), p))),
failOnSecondCall(new TransportBundle(new Bundle(), "transferId")),
b -> just(new Result()));

var processId = runner.start(process, List.of());
waitForCompletion(processId);

create(runner.status(processId))
.assertNext(
r -> {
assertThat(r.phase()).isEqualTo(Phase.COMPLETED_WITH_ERROR);
assertThat(r.sentBundles()).isEqualTo(2);
assertThat(r.failedPatients()).hasSize(1);
assertThat(r.failedPatients().getFirst().step()).isEqualTo(Step.DEIDENTIFY);
assertThat(r.failedPatients().getFirst().errorMessage())
.isEqualTo("Cannot deidentify bundle");
})
.verifyComplete();
}

@Test
void errorInBundleSenderRecordsFailedPatient() {
var process =
new TransferProcessDefinition(
"test",
rawConfig,
pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)),
p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), p))),
b -> just(new TransportBundle(new Bundle(), "transferId")),
failOnSecondCall(new Result()));

var processId = runner.start(process, List.of());
waitForCompletion(processId);

create(runner.status(processId))
.assertNext(
r -> {
assertThat(r.phase()).isEqualTo(Phase.COMPLETED_WITH_ERROR);
assertThat(r.sentBundles()).isEqualTo(2);
assertThat(r.failedPatients()).hasSize(1);
assertThat(r.failedPatients().getFirst().step()).isEqualTo(Step.SEND_BUNDLE);
assertThat(r.failedPatients().getFirst().errorMessage())
.isEqualTo("Cannot send bundle");
})
.verifyComplete();
}

@Test
void successfulTransferHasNoFailedPatients() {
var process =
new TransferProcessDefinition(
"test",
rawConfig,
pids -> fromIterable(List.of(PATIENT)),
p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), PATIENT))),
b -> just(new TransportBundle(new Bundle(), "transferId")),
b -> just(new Result()));

var processId = runner.start(process, List.of());
create(runner.status(processId))
.assertNext(
r -> {
assertThat(r.phase()).isEqualTo(Phase.COMPLETED);
assertThat(r.failedPatients()).isEmpty();
})
.verifyComplete();
}

@Test
void logErrorIncludesExceptionWhenDebugEnabled() {
var event = runWithLogLevel(Level.DEBUG);
Expand Down
Loading
Loading