Skip to content

Commit 2a7048b

Browse files
authored
feat: Add async chunking support and refactor common async operations (#226)
- Introduced asynchronous processing methods for hierarchical and hybrid chunking. - Refactored async logic into a reusable `AsyncOperations` base class for cleaner task handling. - Updated client and API layers to support new async methods. - Added corresponding tests for all new async features. Fixes #219 Signed-off-by: Eric Deandrea <[email protected]>
1 parent 501de38 commit 2a7048b

File tree

7 files changed

+323
-96
lines changed

7 files changed

+323
-96
lines changed

docling-serve/docling-serve-api/src/main/java/ai/docling/serve/api/DoclingServeChunkApi.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package ai.docling.serve.api;
22

3+
import java.util.concurrent.CompletableFuture;
4+
35
import ai.docling.serve.api.chunk.request.HierarchicalChunkDocumentRequest;
46
import ai.docling.serve.api.chunk.request.HybridChunkDocumentRequest;
57
import ai.docling.serve.api.chunk.response.ChunkDocumentResponse;
@@ -21,4 +23,27 @@ public interface DoclingServeChunkApi {
2123
* and using a hybrid chunker for splitting the document into smaller chunks.
2224
*/
2325
ChunkDocumentResponse chunkSourceWithHybridChunker(HybridChunkDocumentRequest request);
26+
27+
/**
28+
* Asynchronously processes the provided document source(s) by converting and chunking them
29+
* into smaller documents using the hierarchical chunker. This operation allows for handling
30+
* large document processing tasks without blocking the caller thread.
31+
*
32+
* @param request the request containing the document source(s) and options for hierarchical chunking
33+
* @return a CompletableFuture that resolves to a {@link ChunkDocumentResponse}, which contains
34+
* the processed chunks, optionally the converted document, and processing metadata
35+
*/
36+
CompletableFuture<ChunkDocumentResponse> chunkSourceWithHierarchicalChunkerAsync(HierarchicalChunkDocumentRequest request);
37+
38+
/**
39+
* Asynchronously processes the provided document source(s) by converting and chunking them
40+
* into smaller documents using the hybrid chunker. This operation facilitates non-blocking
41+
* processing of large document tasks by leveraging a hybrid chunking strategy.
42+
*
43+
* @param request the request containing the document source(s), options for conversion,
44+
* hybrid chunking parameters, and optional specifications for output targets
45+
* @return a CompletableFuture that resolves to a {@link ChunkDocumentResponse}, which includes
46+
* the processed chunks, optionally the converted document, and relevant processing metadata
47+
*/
48+
CompletableFuture<ChunkDocumentResponse> chunkSourceWithHybridChunkerAsync(HybridChunkDocumentRequest request);
2449
}

docling-serve/docling-serve-client/src/main/java/ai/docling/serve/client/DoclingServeClient.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ protected DoclingServeClient(DoclingServeClientBuilder builder) {
108108
this.healthOps = new HealthOperations(this);
109109
this.taskOps = new TaskOperations(this);
110110
this.convertOps = new ConvertOperations(this, this.taskOps, this.asyncPollInterval, this.asyncTimeout);
111-
this.chunkOps = new ChunkOperations(this);
111+
this.chunkOps = new ChunkOperations(this, this.taskOps, this.asyncPollInterval, this.asyncTimeout);
112112
this.clearOps = new ClearOperations(this);
113113
}
114114

@@ -273,6 +273,16 @@ public ChunkDocumentResponse chunkSourceWithHybridChunker(HybridChunkDocumentReq
273273
return this.chunkOps.chunkSourceWithHybridChunker(request);
274274
}
275275

276+
@Override
277+
public CompletableFuture<ChunkDocumentResponse> chunkSourceWithHierarchicalChunkerAsync(HierarchicalChunkDocumentRequest request) {
278+
return this.chunkOps.chunkSourceWithHierarchicalChunkerAsync(request);
279+
}
280+
281+
@Override
282+
public CompletableFuture<ChunkDocumentResponse> chunkSourceWithHybridChunkerAsync(HybridChunkDocumentRequest request) {
283+
return this.chunkOps.chunkSourceWithHybridChunkerAsync(request);
284+
}
285+
276286
@Override
277287
public TaskStatusPollResponse pollTaskStatus(TaskStatusPollRequest request) {
278288
return this.taskOps.pollTaskStatus(request);
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package ai.docling.serve.client.operations;
2+
3+
import java.time.Duration;
4+
import java.util.Optional;
5+
import java.util.concurrent.CompletableFuture;
6+
import java.util.concurrent.CompletionStage;
7+
import java.util.concurrent.TimeUnit;
8+
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import ai.docling.serve.api.DoclingServeTaskApi;
13+
import ai.docling.serve.api.task.request.TaskResultRequest;
14+
import ai.docling.serve.api.task.request.TaskStatusPollRequest;
15+
import ai.docling.serve.api.task.response.TaskStatusPollResponse;
16+
import ai.docling.serve.api.util.ValidationUtils;
17+
18+
/**
19+
* Abstract base class for managing asynchronous operations, providing methods to execute
20+
* tasks asynchronously and poll for their status until completion. Subclasses must implement
21+
* specific logic for retrieving task results.
22+
*/
23+
public abstract class AsyncOperations {
24+
private static final Logger LOG = LoggerFactory.getLogger(AsyncOperations.class);
25+
26+
private final HttpOperations httpOperations;
27+
private final DoclingServeTaskApi taskApi;
28+
private final Duration asyncPollInterval;
29+
private final Duration asyncTimeout;
30+
31+
protected AsyncOperations(HttpOperations httpOperations, DoclingServeTaskApi taskApi, Duration asyncPollInterval, Duration asyncTimeout) {
32+
this.httpOperations = httpOperations;
33+
this.taskApi = taskApi;
34+
this.asyncPollInterval = asyncPollInterval;
35+
this.asyncTimeout = asyncTimeout;
36+
}
37+
38+
/**
39+
* Retrieves the result of a task based on the provided task result request.
40+
*
41+
* This method is abstract and meant to be implemented by subclasses.
42+
* It uses the information provided in the {@code TaskResultRequest}
43+
* to obtain the result of the task execution.
44+
*
45+
* @param <O> the type of the result object returned
46+
* @param taskResultRequest the request containing the details, including the task ID,
47+
* required to retrieve the task result
48+
* @return the result of the task execution, of the type {@code O}
49+
*/
50+
protected abstract <O> O getTaskResult(TaskResultRequest taskResultRequest);
51+
52+
/**
53+
* Executes an asynchronous operation by sending a request to the specified URI
54+
* and polling for the task completion. This method uses an HTTP POST operation
55+
* to start the task and then repeatedly polls the task status to determine
56+
* when the operation is complete.
57+
*
58+
* @param <I> the type of the request object being sent
59+
* @param <O> the type of the response object returned upon completion
60+
* @param request the request object containing the data necessary to initialize the task
61+
* @param uri the endpoint URI to which the request will be sent
62+
* @return a {@link CompletableFuture} that will be completed with the result of the asynchronous operation
63+
*/
64+
protected <I, O> CompletableFuture<O> executeAsync(I request, String uri) {
65+
ValidationUtils.ensureNotNull(request, "request");
66+
67+
// Start the async conversion and chain the polling logic
68+
return CompletableFuture.supplyAsync(() ->
69+
this.httpOperations.executePost(createAsyncRequestContext(uri, request))
70+
).thenCompose(taskResponse -> {
71+
LOG.info("Started async conversion with task ID: {}", taskResponse.getTaskId());
72+
73+
long startTime = System.currentTimeMillis();
74+
return pollTaskUntilComplete(taskResponse, startTime);
75+
});
76+
}
77+
78+
private <I> RequestContext<I, TaskStatusPollResponse> createAsyncRequestContext(String uri, I request) {
79+
return RequestContext.<I, TaskStatusPollResponse>builder()
80+
.request(request)
81+
.responseType(TaskStatusPollResponse.class)
82+
.uri(uri)
83+
.build();
84+
}
85+
86+
private <O> CompletionStage<O> pollTaskUntilComplete(TaskStatusPollResponse statusPollResponse, long startTime) {
87+
var taskId = statusPollResponse.getTaskId();
88+
89+
// Check if we've timed out
90+
if (System.currentTimeMillis() - startTime > this.asyncTimeout.toMillis()) {
91+
return CompletableFuture.failedFuture(
92+
new RuntimeException("Async conversion timed out after %s for task: %s".formatted(this.asyncTimeout, taskId))
93+
);
94+
}
95+
96+
// Poll the task status
97+
var pollRequest = TaskStatusPollRequest.builder()
98+
.taskId(taskId)
99+
.build();
100+
101+
return CompletableFuture.supplyAsync(() -> this.taskApi.pollTaskStatus(pollRequest))
102+
.thenCompose(statusResponse -> pollTaskStatus(statusResponse, startTime));
103+
}
104+
105+
private <O> CompletionStage<O> pollTaskStatus(TaskStatusPollResponse statusResponse, long startTime) {
106+
var status = statusResponse.getTaskStatus();
107+
var taskId = statusResponse.getTaskId();
108+
LOG.debug("Task {} status: {}", taskId, status);
109+
110+
return switch (status) {
111+
case SUCCESS -> {
112+
LOG.info("Task {} completed successfully", taskId);
113+
114+
// Retrieve the result
115+
var taskResult = TaskResultRequest.builder()
116+
.taskId(statusResponse.getTaskId())
117+
.build();
118+
119+
yield CompletableFuture.supplyAsync(() -> getTaskResult(taskResult));
120+
}
121+
122+
case FAILURE -> {
123+
var errorMessage = Optional.ofNullable(statusResponse.getTaskStatusMetadata())
124+
.map(metadata -> "Task failed: %s".formatted(metadata))
125+
.orElse("Task failed");
126+
127+
yield CompletableFuture.failedStage(
128+
new RuntimeException("Async conversion failed for task %s: %s".formatted(taskId, errorMessage)));
129+
}
130+
131+
default ->
132+
// Still in progress (PENDING or STARTED), schedule next poll after delay
133+
CompletableFuture.supplyAsync(
134+
() -> null,
135+
CompletableFuture.delayedExecutor(this.asyncPollInterval.toMillis(), TimeUnit.MILLISECONDS)
136+
).thenCompose(v -> pollTaskUntilComplete(statusResponse, startTime));
137+
};
138+
}
139+
}

docling-serve/docling-serve-client/src/main/java/ai/docling/serve/client/operations/ChunkOperations.java

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,58 @@
11
package ai.docling.serve.client.operations;
22

3+
import java.time.Duration;
4+
import java.util.concurrent.CompletableFuture;
5+
36
import ai.docling.serve.api.DoclingServeChunkApi;
7+
import ai.docling.serve.api.DoclingServeTaskApi;
48
import ai.docling.serve.api.chunk.request.HierarchicalChunkDocumentRequest;
59
import ai.docling.serve.api.chunk.request.HybridChunkDocumentRequest;
610
import ai.docling.serve.api.chunk.response.ChunkDocumentResponse;
11+
import ai.docling.serve.api.task.request.TaskResultRequest;
712
import ai.docling.serve.api.util.ValidationUtils;
813

914
/**
1015
* Base class for document chunking API operations. Provides access to document chunking
1116
* functionality with both hierarchical and hybrid strategies.
1217
*/
13-
public final class ChunkOperations implements DoclingServeChunkApi {
18+
public final class ChunkOperations extends AsyncOperations implements DoclingServeChunkApi {
1419
private final HttpOperations httpOperations;
20+
private final DoclingServeTaskApi taskApi;
1521

16-
public ChunkOperations(HttpOperations httpOperations) {
22+
public ChunkOperations(HttpOperations httpOperations, DoclingServeTaskApi taskApi,
23+
Duration asyncPollInterval, Duration asyncTimeout) {
24+
super(httpOperations, taskApi, asyncPollInterval, asyncTimeout);
1725
this.httpOperations = httpOperations;
26+
this.taskApi = taskApi;
27+
}
28+
29+
@Override
30+
protected ChunkDocumentResponse getTaskResult(TaskResultRequest taskResultRequest) {
31+
return this.taskApi.chunkTaskResult(taskResultRequest);
1832
}
1933

20-
/**
21-
* Converts and chunks the provided document source(s) into a processed document based on the specified options
22-
* and using a hierarchical chunker for splitting the document into smaller chunks.
23-
*/
34+
@Override
2435
public ChunkDocumentResponse chunkSourceWithHierarchicalChunker(HierarchicalChunkDocumentRequest request) {
2536
ValidationUtils.ensureNotNull(request, "request");
2637
return this.httpOperations.executePost(createRequestContext("/v1/chunk/hierarchical/source", request));
2738
}
2839

29-
/**
30-
* Converts and chunks the provided document source(s) into a processed document based on the specified options
31-
* and using a hybrid chunker for splitting the document into smaller chunks.
32-
*/
40+
@Override
3341
public ChunkDocumentResponse chunkSourceWithHybridChunker(HybridChunkDocumentRequest request) {
3442
ValidationUtils.ensureNotNull(request, "request");
3543
return this.httpOperations.executePost(createRequestContext("/v1/chunk/hybrid/source", request));
3644
}
3745

46+
@Override
47+
public CompletableFuture<ChunkDocumentResponse> chunkSourceWithHierarchicalChunkerAsync(HierarchicalChunkDocumentRequest request) {
48+
return executeAsync(request, "/v1/chunk/hierarchical/source/async");
49+
}
50+
51+
@Override
52+
public CompletableFuture<ChunkDocumentResponse> chunkSourceWithHybridChunkerAsync(HybridChunkDocumentRequest request) {
53+
return executeAsync(request, "/v1/chunk/hybrid/source/async");
54+
}
55+
3856
private <I> RequestContext<I, ChunkDocumentResponse> createRequestContext(String uri, I request) {
3957
return RequestContext.<I, ChunkDocumentResponse>builder()
4058
.request(request)

0 commit comments

Comments
 (0)