Skip to content

Commit c829665

Browse files
committed
Expose Failed Patient Transfers via Status
1 parent d084b10 commit c829665

File tree

5 files changed

+199
-22
lines changed

5 files changed

+199
-22
lines changed

clinical-domain-agent/src/main/java/care/smith/fts/cda/DefaultTransferProcessRunner.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,12 @@ private Flux<ConsentedPatientBundle> selectDataForPatient(ConsentedPatient patie
151151
return process
152152
.dataSelector()
153153
.select(patient)
154-
.doOnError(e -> logError("select data", patient.identifier(), e));
154+
.onErrorResume(
155+
e -> {
156+
logError("select data", patient.identifier(), e);
157+
status.updateAndGet(s -> s.addFailedPatient(patient.identifier(), e.getMessage()));
158+
return Flux.empty();
159+
});
155160
}
156161

157162
private void logError(String step, String patientIdentifier, Throwable e) {
@@ -171,25 +176,36 @@ private Flux<PatientContext<TransportBundle>> deidentify(
171176

172177
private Mono<PatientContext<TransportBundle>> deidentifyForPatient(
173178
ConsentedPatientBundle bundle) {
179+
var patientId = bundle.consentedPatient().identifier();
174180
return process
175181
.deidentificator()
176182
.deidentify(bundle)
177-
.doOnError(e -> logError("deidentify bundle", bundle.consentedPatient().identifier(), e))
178-
.map(t -> new PatientContext<>(t, bundle.consentedPatient()));
183+
.map(t -> new PatientContext<>(t, bundle.consentedPatient()))
184+
.onErrorResume(
185+
e -> {
186+
logError("deidentify bundle", patientId, e);
187+
status.updateAndGet(s -> s.addFailedPatient(patientId, e.getMessage()));
188+
return Mono.empty();
189+
});
179190
}
180191

181192
private Flux<Result> sendBundles(Flux<PatientContext<TransportBundle>> deidentification) {
182193
return deidentification
183194
.flatMap(this::sendBundleForPatient, config.maxSendConcurrency)
184-
.doOnNext(b -> status.updateAndGet(TransferProcessStatus::incSentBundles))
185-
.onErrorContinue((e, r) -> status.updateAndGet(TransferProcessStatus::incSkippedBundles));
195+
.doOnNext(b -> status.updateAndGet(TransferProcessStatus::incSentBundles));
186196
}
187197

188198
private Mono<Result> sendBundleForPatient(PatientContext<TransportBundle> b) {
199+
var patientId = b.consentedPatient().identifier();
189200
return process
190201
.bundleSender()
191202
.send(b.data())
192-
.doOnError(e -> logError("send bundle", b.consentedPatient().identifier(), e));
203+
.onErrorResume(
204+
e -> {
205+
logError("send bundle", patientId, e);
206+
status.updateAndGet(s -> s.addFailedPatient(patientId, e.getMessage()));
207+
return Mono.empty();
208+
});
193209
}
194210

195211
private void onComplete() {

clinical-domain-agent/src/main/java/care/smith/fts/cda/TransferProcessStatus.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import care.smith.fts.cda.TransferProcessRunner.Phase;
66
import java.time.LocalDateTime;
7+
import java.util.ArrayList;
8+
import java.util.List;
79
import lombok.With;
810

911
public record TransferProcessStatus(
@@ -15,10 +17,14 @@ public record TransferProcessStatus(
1517
@With(PRIVATE) long totalBundles,
1618
@With(PRIVATE) long deidentifiedBundles,
1719
@With(PRIVATE) long sentBundles,
18-
@With(PRIVATE) long skippedBundles) {
20+
@With(PRIVATE) long skippedBundles,
21+
@With(PRIVATE) List<PatientError> failedPatients) {
22+
23+
public record PatientError(String patientId, String errorMessage) {}
24+
1925
public static TransferProcessStatus create(String processId) {
2026
return new TransferProcessStatus(
21-
processId, Phase.QUEUED, LocalDateTime.now(), null, 0, 0, 0, 0, 0);
27+
processId, Phase.QUEUED, LocalDateTime.now(), null, 0, 0, 0, 0, 0, List.of());
2228
}
2329

2430
public TransferProcessStatus incTotalPatients() {
@@ -41,6 +47,12 @@ public TransferProcessStatus incSkippedBundles() {
4147
return withSkippedBundles(skippedBundles + 1);
4248
}
4349

50+
public TransferProcessStatus addFailedPatient(String patientId, String errorMessage) {
51+
var updated = new ArrayList<>(failedPatients);
52+
updated.add(new PatientError(patientId, errorMessage));
53+
return withFailedPatients(List.copyOf(updated)).withSkippedBundles(skippedBundles + 1);
54+
}
55+
4456
/**
4557
* Set the process phase. If the phase switches to COMPLETED or COMPLETED_WITH_ERROR `finishedAt`
4658
* is set. Once the process is in a completed state going back is not possible anymore.

clinical-domain-agent/src/main/java/care/smith/fts/cda/rest/TransferProcessController.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ private URI generateJobUri(UriComponentsBuilder uriBuilder, String id) {
140140
name = "Transfer process status",
141141
value =
142142
"""
143-
{"processId":"ChlblQ","phase":"RUNNING","createdAt":[2024,12,16,9,29,37,186662587],"finishedAt":null,"totalPatients":100,"totalBundles":53,"deidentifiedBundles":32,"sentBundles":0,"skippedBundles":0}
143+
{"processId":"ChlblQ","phase":"RUNNING","createdAt":[2024,12,16,9,29,37,186662587],"finishedAt":null,"totalPatients":100,"totalBundles":53,"deidentifiedBundles":32,"sentBundles":0,"skippedBundles":0,"failedPatients":[]}
144144
""")
145145
})),
146146
@ApiResponse(responseCode = "404", description = "The project could not be found")
@@ -170,8 +170,8 @@ Mono<ResponseEntity<TransferProcessStatus>> status(
170170
value =
171171
"""
172172
[
173-
{"processId":"8R9JGu","phase":"COMPLETED","createdAt":[2024,12,16,9,28,50,772443200],"finishedAt":[2024,12,16,9,29,31,776068091],"totalPatients":100,"totalBundles":119,"deidentifiedBundles":118,"sentBundles":118,"skippedBundles":0},
174-
{"processId":"ChlblQ","phase":"RUNNING","createdAt":[2024,12,16,9,29,37,186662587],"finishedAt":null,"totalPatients":100,"totalBundles":53,"deidentifiedBundles":32,"sentBundles":0,"skippedBundles":0}
173+
{"processId":"8R9JGu","phase":"COMPLETED","createdAt":[2024,12,16,9,28,50,772443200],"finishedAt":[2024,12,16,9,29,31,776068091],"totalPatients":100,"totalBundles":119,"deidentifiedBundles":118,"sentBundles":118,"skippedBundles":0,"failedPatients":[]},
174+
{"processId":"ChlblQ","phase":"RUNNING","createdAt":[2024,12,16,9,29,37,186662587],"finishedAt":null,"totalPatients":100,"totalBundles":53,"deidentifiedBundles":32,"sentBundles":0,"skippedBundles":0,"failedPatients":[]}
175175
]
176176
""")
177177
})),

clinical-domain-agent/src/test/java/care/smith/fts/cda/DefaultTransferProcessRunnerTest.java

Lines changed: 114 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ class DefaultTransferProcessRunnerTest {
3737
private static final String PATIENT_IDENTIFIER_2 = "patient-142391";
3838
private static final ConsentedPatient PATIENT_2 =
3939
new ConsentedPatient(PATIENT_IDENTIFIER_2, "system");
40+
private static final String PATIENT_IDENTIFIER_3 = "patient-293847";
41+
private static final ConsentedPatient PATIENT_3 =
42+
new ConsentedPatient(PATIENT_IDENTIFIER_3, "system");
4043

4144
private DefaultTransferProcessRunner runner;
4245

@@ -168,9 +171,9 @@ void ttl() throws InterruptedException {
168171

169172
create(runner.statuses())
170173
.assertNext(
171-
r -> {
172-
assertThat(r.size()).isEqualTo(0);
173-
})
174+
r ->
175+
assertThat(r.size()).isEqualTo(0)
176+
)
174177
.verifyComplete();
175178
}
176179

@@ -180,7 +183,7 @@ void errorInDataSelectorSkipsBundleAndContinues() {
180183
new TransferProcessDefinition(
181184
"test",
182185
rawConfig,
183-
pids -> fromIterable(List.of(PATIENT, PATIENT_2)),
186+
pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)),
184187
errorOnSecond(new Bundle()),
185188
b -> just(new TransportBundle(new Bundle(), "transferId")),
186189
b -> just(new Result()));
@@ -195,12 +198,12 @@ private static DataSelector errorOnSecond(Bundle bundle) {
195198
var first = new AtomicBoolean(true);
196199
return p ->
197200
first.getAndSet(false)
198-
? Flux.error(new RuntimeException("Cannot select data"))
199-
: Flux.just(bundle).map(b -> new ConsentedPatientBundle(b, p));
201+
? Flux.just(bundle).map(b -> new ConsentedPatientBundle(b, p))
202+
: Flux.error(new RuntimeException("Cannot select data"));
200203
}
201204

202205
private void completedWithErrors(TransferProcessStatus r) {
203-
assertThat(r.sentBundles()).isEqualTo(1);
206+
assertThat(r.sentBundles()).isEqualTo(2);
204207
assertThat(r.skippedBundles()).isEqualTo(1);
205208
assertThat(r.phase()).isEqualTo(Phase.COMPLETED_WITH_ERROR);
206209
}
@@ -211,7 +214,7 @@ void errorInDeidentificatorSkipsBundleAndContinues() {
211214
new TransferProcessDefinition(
212215
"test",
213216
rawConfig,
214-
pids -> fromIterable(List.of(PATIENT, PATIENT_2)),
217+
pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)),
215218
p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), p))),
216219
errorOnSecond(new TransportBundle(new Bundle(), "transferId")),
217220
b -> just(new Result()));
@@ -224,7 +227,7 @@ void errorInDeidentificatorSkipsBundleAndContinues() {
224227

225228
private static Deidentificator errorOnSecond(TransportBundle bundle) {
226229
var first = new AtomicBoolean(true);
227-
return b ->
230+
return p ->
228231
first.getAndSet(false)
229232
? just(bundle)
230233
: Mono.error(new RuntimeException("Cannot deidentify bundle"));
@@ -236,7 +239,7 @@ void errorInBundleSenderSkipsBundleAndContinues() {
236239
new TransferProcessDefinition(
237240
"test",
238241
rawConfig,
239-
pids -> fromIterable(List.of(PATIENT, DefaultTransferProcessRunnerTest.PATIENT_2)),
242+
pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)),
240243
p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), p))),
241244
b -> just(new TransportBundle(new Bundle(), "transferId")),
242245
errorOnSecond(new Result()));
@@ -249,12 +252,112 @@ void errorInBundleSenderSkipsBundleAndContinues() {
249252

250253
private static BundleSender errorOnSecond(Result result) {
251254
var first = new AtomicBoolean(true);
252-
return b ->
255+
return p ->
253256
first.getAndSet(false)
254257
? just(result)
255258
: Mono.error(new RuntimeException("Cannot send bundle"));
256259
}
257260

261+
@Test
262+
void errorInDataSelectorRecordsFailedPatient() {
263+
var process =
264+
new TransferProcessDefinition(
265+
"test",
266+
rawConfig,
267+
pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)),
268+
errorOnSecond(new Bundle()),
269+
b -> just(new TransportBundle(new Bundle(), "transferId")),
270+
b -> just(new Result()));
271+
272+
var processId = runner.start(process, List.of());
273+
waitForCompletion(processId);
274+
275+
create(runner.status(processId))
276+
.assertNext(
277+
r -> {
278+
assertThat(r.phase()).isEqualTo(Phase.COMPLETED_WITH_ERROR);
279+
assertThat(r.sentBundles()).isEqualTo(2);
280+
assertThat(r.failedPatients()).hasSize(1);
281+
assertThat(r.failedPatients().getFirst().patientId()).isEqualTo(PATIENT_IDENTIFIER_2);
282+
assertThat(r.failedPatients().getFirst().errorMessage())
283+
.isEqualTo("Cannot select data");
284+
})
285+
.verifyComplete();
286+
}
287+
288+
@Test
289+
void errorInDeidentificatorRecordsFailedPatient() {
290+
var process =
291+
new TransferProcessDefinition(
292+
"test",
293+
rawConfig,
294+
pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)),
295+
p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), p))),
296+
errorOnSecond(new TransportBundle(new Bundle(), "transferId")),
297+
b -> just(new Result()));
298+
299+
var processId = runner.start(process, List.of());
300+
waitForCompletion(processId);
301+
302+
create(runner.status(processId))
303+
.assertNext(
304+
r -> {
305+
assertThat(r.phase()).isEqualTo(Phase.COMPLETED_WITH_ERROR);
306+
assertThat(r.sentBundles()).isEqualTo(2);
307+
assertThat(r.failedPatients()).hasSize(1);
308+
assertThat(r.failedPatients().getFirst().errorMessage())
309+
.isEqualTo("Cannot deidentify bundle");
310+
})
311+
.verifyComplete();
312+
}
313+
314+
@Test
315+
void errorInBundleSenderRecordsFailedPatient() {
316+
var process =
317+
new TransferProcessDefinition(
318+
"test",
319+
rawConfig,
320+
pids -> fromIterable(List.of(PATIENT, PATIENT_2, PATIENT_3)),
321+
p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), p))),
322+
b -> just(new TransportBundle(new Bundle(), "transferId")),
323+
errorOnSecond(new Result()));
324+
325+
var processId = runner.start(process, List.of());
326+
waitForCompletion(processId);
327+
328+
create(runner.status(processId))
329+
.assertNext(
330+
r -> {
331+
assertThat(r.phase()).isEqualTo(Phase.COMPLETED_WITH_ERROR);
332+
assertThat(r.sentBundles()).isEqualTo(2);
333+
assertThat(r.failedPatients()).hasSize(1);
334+
assertThat(r.failedPatients().getFirst().errorMessage())
335+
.isEqualTo("Cannot send bundle");
336+
})
337+
.verifyComplete();
338+
}
339+
340+
@Test
341+
void successfulTransferHasNoFailedPatients() {
342+
var process =
343+
new TransferProcessDefinition(
344+
"test",
345+
rawConfig,
346+
pids -> fromIterable(List.of(PATIENT)),
347+
p -> fromIterable(List.of(new ConsentedPatientBundle(new Bundle(), PATIENT))),
348+
b -> just(new TransportBundle(new Bundle(), "transferId")),
349+
b -> just(new Result()));
350+
351+
var processId = runner.start(process, List.of());
352+
create(runner.status(processId))
353+
.assertNext(
354+
r -> {
355+
assertThat(r.phase()).isEqualTo(Phase.COMPLETED);
356+
assertThat(r.failedPatients()).isEmpty();
357+
})
358+
.verifyComplete();
359+
}
360+
258361
@Test
259362
void logErrorIncludesExceptionWhenDebugEnabled() {
260363
var event = runWithLogLevel(Level.DEBUG);

clinical-domain-agent/src/test/java/care/smith/fts/cda/TransferProcessStatusTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package care.smith.fts.cda;
22

33
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
45

56
import care.smith.fts.cda.TransferProcessRunner.Phase;
7+
import care.smith.fts.cda.TransferProcessStatus.PatientError;
68
import java.time.LocalDateTime;
79
import org.junit.jupiter.api.Test;
810

@@ -22,6 +24,7 @@ void testCreate() {
2224
assertThat(status.deidentifiedBundles()).isEqualTo(0);
2325
assertThat(status.sentBundles()).isEqualTo(0);
2426
assertThat(status.skippedBundles()).isEqualTo(0);
27+
assertThat(status.failedPatients()).isEmpty();
2528
}
2629

2730
@Test
@@ -93,4 +96,47 @@ void testIsCompleted() {
9396
status = status.setPhase(Phase.FATAL);
9497
assertThat(status.isCompleted(status.phase())).isTrue();
9598
}
99+
100+
@Test
101+
void addFailedPatientIncrementsSkippedAndAddsEntry() {
102+
var status =
103+
TransferProcessStatus.create("process123")
104+
.addFailedPatient("patient-1", "Connection refused");
105+
106+
assertThat(status.skippedBundles()).isEqualTo(1);
107+
assertThat(status.failedPatients()).hasSize(1);
108+
assertThat(status.failedPatients().getFirst().patientId()).isEqualTo("patient-1");
109+
assertThat(status.failedPatients().getFirst().errorMessage()).isEqualTo("Connection refused");
110+
}
111+
112+
@Test
113+
void addMultipleFailedPatientsAccumulatesAll() {
114+
var status =
115+
TransferProcessStatus.create("process123")
116+
.addFailedPatient("patient-1", "Error A")
117+
.addFailedPatient("patient-2", "Error B");
118+
119+
assertThat(status.skippedBundles()).isEqualTo(2);
120+
assertThat(status.failedPatients()).hasSize(2);
121+
assertThat(status.failedPatients().get(0).patientId()).isEqualTo("patient-1");
122+
assertThat(status.failedPatients().get(1).patientId()).isEqualTo("patient-2");
123+
}
124+
125+
@Test
126+
void addFailedPatientPreservesImmutability() {
127+
var original = TransferProcessStatus.create("process123");
128+
var withError = original.addFailedPatient("patient-1", "Error");
129+
130+
assertThat(original.failedPatients()).isEmpty();
131+
assertThat(original.skippedBundles()).isEqualTo(0);
132+
assertThat(withError.failedPatients()).hasSize(1);
133+
}
134+
135+
@Test
136+
void failedPatientsListIsUnmodifiable() {
137+
var status = TransferProcessStatus.create("process123").addFailedPatient("patient-1", "Error");
138+
139+
assertThatThrownBy(() -> status.failedPatients().add(new PatientError("x", "y")))
140+
.isInstanceOf(UnsupportedOperationException.class);
141+
}
96142
}

0 commit comments

Comments
 (0)