Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion server/src/main/java/au/csiro/pathling/FhirServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static au.csiro.pathling.utilities.Preconditions.checkPresent;

import au.csiro.pathling.async.JobProvider;
import au.csiro.pathling.async.JobResultProvider;
import au.csiro.pathling.cache.EntityTagInterceptor;
import au.csiro.pathling.config.OperationConfiguration;
import au.csiro.pathling.config.ServerConfiguration;
Expand Down Expand Up @@ -127,6 +128,8 @@ public class FhirServer extends RestfulServer {

@Nonnull private final transient Optional<JobProvider> jobProvider;

@Nonnull private final transient Optional<JobResultProvider> jobResultProvider;

@Nonnull private final transient SystemExportProvider exportProvider;

@Nonnull private final transient ExportResultProvider exportResultProvider;
Expand Down Expand Up @@ -178,6 +181,7 @@ public class FhirServer extends RestfulServer {
* @param configuration the server configuration
* @param oidcConfiguration the optional OIDC configuration
* @param jobProvider the optional job provider
* @param jobResultProvider the optional job result provider
* @param exportProvider the export provider
* @param exportResultProvider the export result provider
* @param patientExportProvider the patient export provider
Expand Down Expand Up @@ -205,6 +209,7 @@ public FhirServer(
@Nonnull final ServerConfiguration configuration,
@Nonnull final Optional<OidcConfiguration> oidcConfiguration,
@Nonnull final Optional<JobProvider> jobProvider,
@Nonnull final Optional<JobResultProvider> jobResultProvider,
@Nonnull final SystemExportProvider exportProvider,
@Nonnull final ExportResultProvider exportResultProvider,
@Nonnull final PatientExportProvider patientExportProvider,
Expand Down Expand Up @@ -232,6 +237,7 @@ public FhirServer(
this.configuration = configuration;
this.oidcConfiguration = oidcConfiguration;
this.jobProvider = jobProvider;
this.jobResultProvider = jobResultProvider;
this.exportProvider = exportProvider;
this.exportResultProvider = exportResultProvider;
this.patientExportProvider = patientExportProvider;
Expand Down Expand Up @@ -272,8 +278,9 @@ protected void initialize() throws ServletException {
// Get operation configuration.
final OperationConfiguration ops = configuration.getOperations();

// Register job provider, if async is enabled.
// Register job providers, if async is enabled.
jobProvider.ifPresent(this::registerProvider);
jobResultProvider.ifPresent(this::registerProvider);

// Register export providers based on configuration.
if (ops.isExportEnabled()) {
Expand Down
11 changes: 6 additions & 5 deletions server/src/main/java/au/csiro/pathling/async/AsyncAspect.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,11 @@ protected IBaseResource maybeExecuteAsynchronously(
log.info("Asynchronous processing requested");

if (result == null) {
// the class containing the async annotation on a method does not implement
// PreAsyncValidation
// set some values to prevent NPEs
// The class containing the async annotation on a method does not implement
// PreAsyncValidation. Set some values to prevent NPEs.
result = new PreAsyncValidationResult<>(new Object(), List.of());
}
processRequestAsynchronously(joinPoint, requestDetails, result, spark);
processRequestAsynchronously(joinPoint, requestDetails, result, spark, asyncSupported);
throw new ProcessingNotCompletedException("Accepted", buildOperationOutcome(result));
} else {
return (IBaseResource) joinPoint.proceed();
Expand All @@ -146,7 +145,8 @@ private void processRequestAsynchronously(
@Nonnull final ProceedingJoinPoint joinPoint,
@Nonnull final ServletRequestDetails requestDetails,
@Nonnull final PreAsyncValidationResult<?> preAsyncValidationResult,
@Nonnull final SparkSession spark) {
@Nonnull final SparkSession spark,
@Nonnull final AsyncSupported asyncSupported) {
final Authentication authentication = SecurityContextHolder.getContext().getAuthentication();

// Compute operation-specific cache key if the operation provides it.
Expand Down Expand Up @@ -217,6 +217,7 @@ private void processRequestAsynchronously(
final Optional<String> ownerId = getCurrentUserId(authentication);
final Job<IBaseResource> newJob = new Job<>(jobId, operation, result, ownerId);
newJob.setPreAsyncValidationResult(preAsyncValidationResult.result());
newJob.setRedirectOnComplete(asyncSupported.redirectOnComplete());
return newJob;
});
final HttpServletResponse response = requestDetails.getServletResponse();
Expand Down
11 changes: 10 additions & 1 deletion server/src/main/java/au/csiro/pathling/async/AsyncSupported.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,13 @@
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface AsyncSupported {}
public @interface AsyncSupported {

/**
* When true, completed jobs return 303 See Other with a redirect to the result endpoint, rather
* than returning the result inline. This follows the SQL on FHIR unify-async specification.
*
* @return true if completed jobs should redirect to the result endpoint
*/
boolean redirectOnComplete() default false;
}
6 changes: 6 additions & 0 deletions server/src/main/java/au/csiro/pathling/async/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ public interface JobTag {}
/** Indicates whether this job has been marked for deletion. */
@Setter private boolean markedAsDeleted;

/**
* When true, completed jobs return 303 See Other with redirect to result endpoint, rather than
* returning the result inline. This follows the SQL on FHIR unify-async specification.
*/
@Setter private boolean redirectOnComplete;

/**
* The last calculated progress percentage. When a job is at 100% that does not always indicate
* that the job is actually finished. Most of the time, this indicates that a new stage has not
Expand Down
41 changes: 37 additions & 4 deletions server/src/main/java/au/csiro/pathling/async/JobProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private IBaseResource handleJobGetRequest(
throw handleCancelledJob();
}
if (job.getResult().isDone()) {
return handleCompletedJob(job, response);
return handleCompletedJob(job, request, response);
}
return handleInProgressJob(request, response, job);
}
Expand All @@ -253,21 +253,38 @@ private static ResourceNotFoundException handleCancelledJob() {
}

/**
* Handles a completed job by returning its result or converting exceptions.
* Handles a completed job by returning its result or redirecting to the result endpoint.
*
* <p>If the job has {@code redirectOnComplete} enabled (following the SQL on FHIR unify-async
* specification), returns 303 See Other with a Location header pointing to the result endpoint.
* Otherwise, returns the result inline.
*
* @param job The completed job.
* @param request The HTTP request for building the result URL.
* @param response The HTTP response for applying response modifications.
* @return The job result.
* @return The job result (if not redirecting) or an empty Parameters resource (if redirecting).
* @throws InternalErrorException If the job was interrupted.
*/
@Nonnull
private IBaseResource handleCompletedJob(
@Nonnull final Job<?> job, @Nullable final HttpServletResponse response) {
@Nonnull final Job<?> job,
@Nonnull final HttpServletRequest request,
@Nullable final HttpServletResponse response) {
try {
// Completed responses use TTL-based caching with configured max-age.
if (response != null) {
setAsyncCacheHeaders(response);
}

// If redirect is enabled, return 303 See Other with Location header.
if (job.isRedirectOnComplete() && response != null) {
final String resultUrl = buildResultUrl(request, job.getId());
response.setStatus(HttpServletResponse.SC_SEE_OTHER);
response.setHeader("Location", resultUrl);
return new Parameters();
}

// Otherwise return the result inline (legacy behaviour).
job.getResponseModification().accept(response);
return job.getResult().get();
} catch (final InterruptedException e) {
Expand All @@ -278,6 +295,22 @@ private IBaseResource handleCompletedJob(
}
}

/**
* Builds the URL for the job result endpoint. Uses the servlet context path to ensure the URL is
* correctly prefixed (e.g., /fhir/$job-result).
*
* @param request The HTTP request to extract the context path from.
* @param jobId The job ID.
* @return The result URL.
*/
@Nonnull
private static String buildResultUrl(
@Nonnull final HttpServletRequest request, @Nonnull final String jobId) {
// Use the servlet path to get the FHIR server mount point (e.g., "/fhir").
final String servletPath = request.getServletPath();
return servletPath + "/$job-result?id=" + jobId;
}

/**
* Unwraps the cause chain from an ExecutionException. The Future wraps exceptions in
* ExecutionException, and AsyncAspect may wrap them in IllegalStateException.
Expand Down
179 changes: 179 additions & 0 deletions server/src/main/java/au/csiro/pathling/async/JobResultProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Copyright © 2018-2026 Commonwealth Scientific and Industrial Research
* Organisation (CSIRO) ABN 41 687 119 230.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package au.csiro.pathling.async;

import static au.csiro.pathling.security.SecurityAspect.checkHasAuthority;
import static au.csiro.pathling.security.SecurityAspect.getCurrentUserId;

import au.csiro.pathling.config.ServerConfiguration;
import au.csiro.pathling.errors.AccessDeniedError;
import au.csiro.pathling.errors.ErrorHandlingInterceptor;
import au.csiro.pathling.errors.ResourceNotFoundError;
import au.csiro.pathling.security.PathlingAuthority;
import ca.uhn.fhir.rest.annotation.Operation;
import ca.uhn.fhir.rest.annotation.OperationParam;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.InvalidRequestException;
import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;

/**
* Provides the $job-result operation for retrieving the result of a completed async job. This
* endpoint is used when operations are configured with {@code redirectOnComplete=true}, following
* the SQL on FHIR unify-async specification.
*
* <p>The flow is: 1. Client polls $job endpoint until job completes 2. $job returns 303 See Other
* with Location header pointing to $job-result 3. Client fetches result from $job-result endpoint
*
* @author John Grimes
*/
@Component
@ConditionalOnProperty(prefix = "pathling", name = "async.enabled", havingValue = "true")
@Slf4j
public class JobResultProvider {

private static final Pattern ID_PATTERN =
Pattern.compile("^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$");

@Nonnull private final ServerConfiguration configuration;

@Nonnull private final JobRegistry jobRegistry;

/**
* Creates a new JobResultProvider.
*
* @param configuration the server configuration
* @param jobRegistry the job registry
*/
public JobResultProvider(
@Nonnull final ServerConfiguration configuration, @Nonnull final JobRegistry jobRegistry) {
this.configuration = configuration;
this.jobRegistry = jobRegistry;
}

/**
* Retrieves the result of a completed async job.
*
* @param id the job ID
* @param request the HTTP request
* @param response the HTTP response
* @return the job result as a Parameters resource
*/
@SuppressWarnings("unused")
@Operation(name = "$job-result", idempotent = true)
public IBaseResource jobResult(
@Nullable @OperationParam(name = "id") final String id,
@Nonnull final HttpServletRequest request,
@Nullable final HttpServletResponse response) {
log.debug("Received $job-result request with id: {}", id);

final Job<?> job = getJob(id);

if (configuration.getAuth().isEnabled()) {
// Check for the required authority associated with the operation that initiated the job.
checkHasAuthority(PathlingAuthority.operationAccess(job.getOperation()));
// Check that the user requesting the job result is the same user that started the job.
final Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
final Optional<String> currentUserId = getCurrentUserId(authentication);
if (!job.getOwnerId().equals(currentUserId)) {
throw new AccessDeniedError("The requested job is not owned by the current user");
}
}

return handleJobResultRequest(job, response);
}

@Nonnull
private Job<?> getJob(@Nullable final String id) {
// Validate that the ID looks reasonable.
if (id == null || !ID_PATTERN.matcher(id).matches()) {
throw new ResourceNotFoundError("Job ID not found");
}

log.debug("Received request for job result: {}", id);
@Nullable final Job<?> job = jobRegistry.get(id);
// Check that the job exists.
if (job == null) {
throw new ResourceNotFoundError("Job ID not found");
}
return job;
}

@Nonnull
private IBaseResource handleJobResultRequest(
@Nonnull final Job<?> job, @Nullable final HttpServletResponse response) {
// Handle cancelled jobs.
if (job.getResult().isCancelled()) {
throw new ResourceNotFoundException(
"A DELETE request cancelled this job or deleted all files associated with this job.");
}

// Verify the job is complete.
if (!job.getResult().isDone()) {
throw new InvalidRequestException(
"Job is not yet complete. Poll the $job endpoint to check status.");
}

// Set cache headers.
if (response != null) {
final int maxAge = configuration.getAsync().getCacheMaxAge();
response.setHeader("Cache-Control", "max-age=" + maxAge);
}

// Apply any response modifications set by the operation (e.g., Expires header).
job.getResponseModification().accept(response);

// Return the result.
try {
return job.getResult().get();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new InternalErrorException("Job was interrupted", e);
} catch (final ExecutionException e) {
throw ErrorHandlingInterceptor.convertError(unwrapExecutionException(e));
}
}

/**
* Unwraps the cause chain from an ExecutionException. The Future wraps exceptions in
* ExecutionException, and AsyncAspect may wrap them in IllegalStateException.
*
* @param e The ExecutionException to unwrap.
* @return The root cause or the original exception.
*/
@Nonnull
private static Throwable unwrapExecutionException(@Nonnull final ExecutionException e) {
Throwable cause = e.getCause();
if (cause != null && cause.getCause() != null) {
cause = cause.getCause();
}
return cause != null ? cause : e;
}
}
Loading