Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ For the latest build see: https://github.com/medizininformatik-initiative/torch/
| TORCH_CONCEPT_TREE_FILE | ontology/mapping_tree.json | The file for the concept tree mapping. |
| TORCH_DSE_MAPPING_TREE_FILE | ontology/dse_mapping_tree.json | The file for DSE concept tree mapping. |
| TORCH_USE_CQL | true | Flag indicating if CQL should be used. |
| TORCH_BASE_URL | – | The server name before the proxy from which torch is accessed |
| TORCH_OUTPUT_FILE_SERVER_URL | – | The URL to access Result location TORCH_RESULTS_DIR served by a proxy/fileserver |
| LOG_LEVEL<br/>_DE_MEDIZININFORMATIKINITIATIVE_TORCH | info | Log level for torch core functionality. |
| LOG_LEVEL<br/>_CA_UHN_FHIR | error | Log level for HAPI FHIR library. |
| SPRING_PROFILES_ACTIVE | active | The active Spring profile. |
Expand Down Expand Up @@ -217,6 +215,9 @@ This can be used to track the progress of your data extraction.

If a server is set up for the files e.g. NGINX, the files can be fetched by a Request on the URL set in
TORCH_OUTPUT_FILE_SERVER_URL in [enviroment variables](#environment-variables).
**Torch assumes that the files are served from the baseurl that is next to the fhir api**
E.g.: if localhost:8080/test/fhir is the url from which torch gets called (before forwarding), then
the file url would start with **http://localhost:8080/test/**

```sh
curl -s "http://localhost:8080/da4a1c56-f5d9-468c-b57a-b8186ea4fea8/f33634bd-d51b-463c-a956-93409d96935f.ndjson"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.adapter.ForwardedHeaderTransformer;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;

Expand Down Expand Up @@ -92,6 +93,11 @@ public AppConfig(TorchProperties torchProperties) {
this.torchProperties = torchProperties;
}

@Bean
public ForwardedHeaderTransformer forwardedHeaderTransformer() {
return new ForwardedHeaderTransformer();
}


@Bean
public String searchParametersFile(@Value("${torch.search_parameters_file}") String searchParametersFile) {
Expand Down Expand Up @@ -370,7 +376,7 @@ public StructureDefinitionHandler cdsStructureDefinitionHandler(ResourceReader r

@Bean
public ResultFileManager resultFileManager(FhirContext fhirContext) {
return new ResultFileManager(torchProperties.results().dir(), torchProperties.results().persistence(), fhirContext, torchProperties.base().url(), torchProperties.output().file().server().url());
return new ResultFileManager(torchProperties.results().dir(), torchProperties.results().persistence(), fhirContext);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

@ConfigurationProperties(prefix = "torch")
public record TorchProperties(
Base base,
Output output,
Profile profile,
Mapping mapping,
Fhir fhir,
Expand All @@ -19,24 +17,9 @@ public record TorchProperties(
String dseMappingTreeFile,
boolean useCql
) {

public record Base(String url) {

}

public record Max(int connections) {
}

public record Output(File file) {
public record File(Server server) {
public record Server(String url) {

}
}

}


public record Profile(String dir) {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
Expand All @@ -47,6 +49,7 @@ public class FhirController {
private static final Logger logger = LoggerFactory.getLogger(FhirController.class);

private static final MediaType MEDIA_TYPE_FHIR_JSON = MediaType.valueOf("application/fhir+json");
public static final String FHIR_STATUS = "/fhir/__status/";

private final ObjectMapper objectMapper;
private final FhirContext fhirContext;
Expand Down Expand Up @@ -82,7 +85,7 @@ private record DecodedContent(byte[] crtdl, List<String> patientIds) {

@Autowired
public FhirController(ObjectMapper objectMapper, FhirContext fhirContext, ResultFileManager resultFileManager,
CrtdlProcessingService crtdlProcessingService, CrtdlValidatorService validatorService) {
CrtdlProcessingService crtdlProcessingService, CrtdlValidatorService validatorService) {
this.objectMapper = objectMapper;
this.fhirContext = fhirContext;
this.resultFileManager = resultFileManager;
Expand Down Expand Up @@ -119,7 +122,7 @@ private static DecodedContent decodeCrtdlContent(Parameters parameters) {
public RouterFunction<ServerResponse> queryRouter() {
logger.info("Init FhirController Router");
return route(POST("/fhir/$extract-data").and(accept(MEDIA_TYPE_FHIR_JSON)), this::handleExtractData)
.andRoute(GET("/fhir/__status/{jobId}"), this::checkStatus).andRoute(GET("/fhir/__status/"), this::getGlobalStatus);
.andRoute(GET("/fhir/__status/{jobId}"), this::checkStatus).andRoute(GET(FHIR_STATUS), this::getGlobalStatus);
}

private record DecodedCRTDLContent(Crtdl crtdl, List<String> patientIds) {
Expand Down Expand Up @@ -183,7 +186,7 @@ public Mono<ServerResponse> handleExtractData(ServerRequest request) {
.subscribeOn(Schedulers.boundedElastic()).subscribe(); // final fire-and-forget

return accepted()
.header("Content-Location", request.uriBuilder().replacePath("/fhir/__status/" + jobId).build().toString())
.header("Content-Location", request.uriBuilder().replacePath(FHIR_STATUS + jobId).build().toString())
.build();
})
.onErrorResume(IllegalArgumentException.class, e -> {
Expand Down Expand Up @@ -228,7 +231,30 @@ private Crtdl parseCrtdlContent(byte[] content) throws IOException {
return objectMapper.readValue(content, Crtdl.class);
}

public String stripToBasePath(URI originalUri, String basePath) {

String fullPath = originalUri.getPath();

int index = fullPath.indexOf(basePath);
if (index == -1) {
return originalUri.toString(); // fallback: no change
}

String newPath = fullPath.substring(0, index);

return UriComponentsBuilder
.fromUri(originalUri)
.replacePath(newPath)
.replaceQuery(null)
.fragment(null)
.build()
.toUriString();
}


public Mono<ServerResponse> checkStatus(ServerRequest request) {
String truncatedUrl = stripToBasePath(request.uri(), FHIR_STATUS);
logger.info("Base url: {}", truncatedUrl);
var jobId = request.pathVariable("jobId");
HttpStatus status = resultFileManager.getStatus(jobId);

Expand All @@ -242,7 +268,7 @@ public Mono<ServerResponse> checkStatus(ServerRequest request) {
case HttpStatus.OK -> {
// Capture the full request URL and transaction time
String transactionTime = DateTimeFormatter.ISO_INSTANT.format(Instant.now());
return Mono.fromCallable(() -> resultFileManager.loadBundleFromFileSystem(jobId, transactionTime))
return Mono.fromCallable(() -> resultFileManager.loadBundleFromFileSystem(truncatedUrl, jobId, transactionTime))
.flatMap(bundleMap -> {
if (bundleMap == null) {
return ServerResponse.notFound().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@
import ca.uhn.fhir.context.FhirContext;
import de.medizininformatikinitiative.torch.management.OperationOutcomeCreator;
import de.medizininformatikinitiative.torch.model.consent.PatientBatchWithConsent;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.OperationOutcome;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
Expand All @@ -23,7 +20,13 @@
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

Expand All @@ -38,18 +41,14 @@ public class ResultFileManager {

private final Path resultsDirPath;
private final FhirContext fhirContext;
private final String hostname;
private final String fileServerName;
public final ConcurrentHashMap<String, HttpStatus> jobStatusMap = new ConcurrentHashMap<>();

public ResultFileManager(String resultsDir, String duration, FhirContext fhirContext, String hostname, String fileServerName) {
public ResultFileManager(String resultsDir, String duration, FhirContext fhirContext) {
this.resultsDirPath = Paths.get(resultsDir).toAbsolutePath();
this.fhirContext = fhirContext;


Duration duration1 = Duration.parse(duration);
this.hostname = hostname;
this.fileServerName = fileServerName;


logger.debug("Duration of persistence {}", duration1);
Expand Down Expand Up @@ -213,7 +212,15 @@ public String loadErrorFromFileSystem(String jobId) {
}
}

public Map<String, Object> loadBundleFromFileSystem(String jobId, String transactionTime) {
/**
* Generates the server response for a successful operation.
*
* @param url of the calling request.
* @param jobId id to be handled
* @param transactionTime time of the transaction
* @return Map Containing the server response with file locations
*/
public Map<String, Object> loadBundleFromFileSystem(String url, String jobId, String transactionTime) {
Map<String, Object> response = new HashMap<>();
try {
Path jobDir = getJobDirectory(jobId);
Expand All @@ -224,10 +231,9 @@ public Map<String, Object> loadBundleFromFileSystem(String jobId, String transac

Files.list(jobDir).forEach(file -> {
String fileName = file.getFileName().toString();
String url = fileServerName + "/" + jobId + "/" + fileName;

Map<String, String> fileEntry = new HashMap<>();
fileEntry.put("url", url);
fileEntry.put("url", url + "/" + jobId + "/" + fileName);

if (fileName.endsWith(".ndjson")) {
fileEntry.put("type", "NDJSON Bundle");
Expand All @@ -244,7 +250,7 @@ public Map<String, Object> loadBundleFromFileSystem(String jobId, String transac
logger.debug("OutputFiles size {}", outputFiles.size());

response.put("transactionTime", transactionTime);
response.put("request", hostname + "/fhir/$extract-data");
response.put("request", url + "/fhir/$extract-data");
response.put("requiresAccessToken", false);
response.put("output", outputFiles);
response.put("deleted", deletedFiles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.adapter.ForwardedHeaderTransformer;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;

Expand Down Expand Up @@ -89,6 +90,10 @@ public TestConfig(TorchProperties torchProperties) {
this.torchProperties = torchProperties;
}

@Bean
public ForwardedHeaderTransformer forwardedHeaderTransformer() {
return new ForwardedHeaderTransformer();
}

@Bean
public CascadingDelete cascadingDelete() {
Expand Down Expand Up @@ -304,7 +309,7 @@ public StructureDefinitionHandler cdsStructureDefinitionHandler(ResourceReader r

@Bean
public ResultFileManager resultFileManager(FhirContext fhirContext) {
return new ResultFileManager(torchProperties.results().dir(), torchProperties.results().persistence(), fhirContext, torchProperties.base().url(), torchProperties.output().file().server().url());
return new ResultFileManager(torchProperties.results().dir(), torchProperties.results().persistence(), fhirContext);
}

@Bean
Expand Down