Skip to content

Commit 870b82c

Browse files
authored
feat: update tasks/list implementation to match A2A 0.4.0 spec (a2aproject#459)
- Sort tasks by status.timestamp DESC (most recent first), then ID ASC - Add lastUpdatedAfter parameter filtering - Update InMemoryTaskStore and JpaDatabaseTaskStore sorting logic - Add timestamp denormalized column to JpaTask for efficient querying - Update all transport handlers (JSON-RPC, gRPC, REST) Follows up on a2aproject#455 and a2aproject#359
1 parent 22f5b1d commit 870b82c

File tree

7 files changed

+341
-52
lines changed

7 files changed

+341
-52
lines changed

extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -227,13 +227,29 @@ public ListTasksResult list(ListTasksParams params) {
227227
countQueryBuilder.append(" AND t.state = :state");
228228
}
229229

230-
// Apply pagination cursor (tasks after pageToken)
230+
// Apply lastUpdatedAfter filter using denormalized timestamp column
231+
if (params.lastUpdatedAfter() != null) {
232+
queryBuilder.append(" AND t.statusTimestamp > :lastUpdatedAfter");
233+
countQueryBuilder.append(" AND t.statusTimestamp > :lastUpdatedAfter");
234+
}
235+
236+
// Apply pagination cursor using keyset pagination for composite sort (timestamp DESC, id ASC)
237+
// PageToken format: "timestamp_millis:taskId" (e.g., "1699999999000:task-123")
231238
if (params.pageToken() != null && !params.pageToken().isEmpty()) {
232-
queryBuilder.append(" AND t.id > :pageToken");
239+
String[] tokenParts = params.pageToken().split(":", 2);
240+
if (tokenParts.length == 2) {
241+
// Keyset pagination: get tasks where timestamp < tokenTimestamp OR (timestamp = tokenTimestamp AND id > tokenId)
242+
// All tasks have timestamps (TaskStatus canonical constructor ensures this)
243+
queryBuilder.append(" AND (t.statusTimestamp < :tokenTimestamp OR (t.statusTimestamp = :tokenTimestamp AND t.id > :tokenId))");
244+
} else {
245+
// Legacy ID-only pageToken format is not supported with timestamp-based sorting
246+
// Throw error to prevent incorrect pagination results
247+
throw new io.a2a.spec.InvalidParamsError(null, "Invalid pageToken format: expected 'timestamp:id'", null);
248+
}
233249
}
234250

235-
// Sort by task ID for consistent pagination
236-
queryBuilder.append(" ORDER BY t.id");
251+
// Sort by status timestamp descending (most recent first), then by ID for stable ordering
252+
queryBuilder.append(" ORDER BY t.statusTimestamp DESC, t.id ASC");
237253

238254
// Create and configure the main query
239255
TypedQuery<JpaTask> query = em.createQuery(queryBuilder.toString(), JpaTask.class);
@@ -245,8 +261,28 @@ public ListTasksResult list(ListTasksParams params) {
245261
if (params.status() != null) {
246262
query.setParameter("state", params.status().asString());
247263
}
264+
if (params.lastUpdatedAfter() != null) {
265+
query.setParameter("lastUpdatedAfter", params.lastUpdatedAfter());
266+
}
248267
if (params.pageToken() != null && !params.pageToken().isEmpty()) {
249-
query.setParameter("pageToken", params.pageToken());
268+
String[] tokenParts = params.pageToken().split(":", 2);
269+
if (tokenParts.length == 2) {
270+
// Parse keyset pagination parameters
271+
try {
272+
long timestampMillis = Long.parseLong(tokenParts[0]);
273+
String tokenId = tokenParts[1];
274+
275+
// All tasks have timestamps (TaskStatus canonical constructor ensures this)
276+
Instant tokenTimestamp = Instant.ofEpochMilli(timestampMillis);
277+
query.setParameter("tokenTimestamp", tokenTimestamp);
278+
query.setParameter("tokenId", tokenId);
279+
} catch (NumberFormatException e) {
280+
// Malformed timestamp in pageToken
281+
throw new io.a2a.spec.InvalidParamsError(null,
282+
"Invalid pageToken format: timestamp must be numeric milliseconds", null);
283+
}
284+
}
285+
// Note: Legacy ID-only format already rejected in query building phase
250286
}
251287

252288
// Apply page size limit (+1 to check for next page)
@@ -270,6 +306,9 @@ public ListTasksResult list(ListTasksParams params) {
270306
if (params.status() != null) {
271307
countQuery.setParameter("state", params.status().asString());
272308
}
309+
if (params.lastUpdatedAfter() != null) {
310+
countQuery.setParameter("lastUpdatedAfter", params.lastUpdatedAfter());
311+
}
273312
int totalSize = countQuery.getSingleResult().intValue();
274313

275314
// Deserialize tasks from JSON
@@ -283,10 +322,14 @@ public ListTasksResult list(ListTasksParams params) {
283322
}
284323
}
285324

286-
// Determine next page token (ID of last task if there are more results)
325+
// Determine next page token (timestamp:ID of last task if there are more results)
326+
// Format: "timestamp_millis:taskId" for keyset pagination
287327
String nextPageToken = null;
288328
if (hasMore && !tasks.isEmpty()) {
289-
nextPageToken = tasks.get(tasks.size() - 1).getId();
329+
Task lastTask = tasks.get(tasks.size() - 1);
330+
// All tasks have timestamps (TaskStatus canonical constructor ensures this)
331+
long timestampMillis = lastTask.getStatus().timestamp().toInstant().toEpochMilli();
332+
nextPageToken = timestampMillis + ":" + lastTask.getId();
290333
}
291334

292335
// Apply post-processing transformations (history limiting, artifact removal)

extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaTask.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ public class JpaTask {
2525
@Column(name = "state")
2626
private String state;
2727

28+
@Column(name = "status_timestamp")
29+
private Instant statusTimestamp;
30+
2831
@Column(name = "task_data", columnDefinition = "TEXT", nullable = false)
2932
private String taskJson;
3033

@@ -67,6 +70,14 @@ public void setState(String state) {
6770
this.state = state;
6871
}
6972

73+
public Instant getStatusTimestamp() {
74+
return statusTimestamp;
75+
}
76+
77+
public void setStatusTimestamp(Instant statusTimestamp) {
78+
this.statusTimestamp = statusTimestamp;
79+
}
80+
7081
public String getTaskJson() {
7182
return taskJson;
7283
}
@@ -123,7 +134,7 @@ static JpaTask createFromTask(Task task) throws JsonProcessingException {
123134
}
124135

125136
/**
126-
* Updates denormalized fields (contextId, state) from the task object.
137+
* Updates denormalized fields (contextId, state, statusTimestamp) from the task object.
127138
* These fields are duplicated from the JSON to enable efficient querying.
128139
*
129140
* @param task the task to extract fields from
@@ -133,8 +144,14 @@ private void updateDenormalizedFields(Task task) {
133144
if (task.getStatus() != null) {
134145
io.a2a.spec.TaskState taskState = task.getStatus().state();
135146
this.state = (taskState != null) ? taskState.asString() : null;
147+
// Extract status timestamp for efficient querying and sorting
148+
// Truncate to milliseconds for keyset pagination consistency (pageToken uses millis)
149+
this.statusTimestamp = (task.getStatus().timestamp() != null)
150+
? task.getStatus().timestamp().toInstant().truncatedTo(java.time.temporal.ChronoUnit.MILLIS)
151+
: null;
136152
} else {
137153
this.state = null;
154+
this.statusTimestamp = null;
138155
}
139156
}
140157

extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java

Lines changed: 172 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import static org.junit.jupiter.api.Assertions.assertNull;
77
import static org.junit.jupiter.api.Assertions.assertTrue;
88

9+
import java.time.OffsetDateTime;
910
import java.util.ArrayList;
1011
import java.util.Collections;
1112
import java.util.HashMap;
@@ -418,12 +419,14 @@ public void testListTasksCombinedFilters() {
418419
@Test
419420
@Transactional
420421
public void testListTasksPagination() {
421-
// Create 5 tasks
422+
// Create 5 tasks with same timestamp to ensure ID-based pagination works
423+
// (With timestamp DESC sorting, same timestamps allow ID ASC tie-breaking)
424+
OffsetDateTime sameTimestamp = OffsetDateTime.now(java.time.ZoneOffset.UTC);
422425
for (int i = 1; i <= 5; i++) {
423426
Task task = new Task.Builder()
424427
.id("task-page-" + i)
425428
.contextId("context-pagination")
426-
.status(new TaskStatus(TaskState.SUBMITTED))
429+
.status(new TaskStatus(TaskState.SUBMITTED, null, sameTimestamp))
427430
.build();
428431
taskStore.save(task);
429432
}
@@ -465,6 +468,122 @@ public void testListTasksPagination() {
465468
assertNull(result3.nextPageToken(), "Last page should have no next page token");
466469
}
467470

471+
@Test
472+
@Transactional
473+
public void testListTasksPaginationWithDifferentTimestamps() {
474+
// Create tasks with different timestamps to verify keyset pagination
475+
// with composite sort (timestamp DESC, id ASC)
476+
OffsetDateTime now = OffsetDateTime.now(java.time.ZoneOffset.UTC);
477+
478+
// Task 1: 10 minutes ago, ID="task-diff-a"
479+
Task task1 = new Task.Builder()
480+
.id("task-diff-a")
481+
.contextId("context-diff-timestamps")
482+
.status(new TaskStatus(TaskState.WORKING, null, now.minusMinutes(10)))
483+
.build();
484+
taskStore.save(task1);
485+
486+
// Task 2: 5 minutes ago, ID="task-diff-b"
487+
Task task2 = new Task.Builder()
488+
.id("task-diff-b")
489+
.contextId("context-diff-timestamps")
490+
.status(new TaskStatus(TaskState.WORKING, null, now.minusMinutes(5)))
491+
.build();
492+
taskStore.save(task2);
493+
494+
// Task 3: 5 minutes ago, ID="task-diff-c" (same timestamp as task2, tests ID tie-breaker)
495+
Task task3 = new Task.Builder()
496+
.id("task-diff-c")
497+
.contextId("context-diff-timestamps")
498+
.status(new TaskStatus(TaskState.WORKING, null, now.minusMinutes(5)))
499+
.build();
500+
taskStore.save(task3);
501+
502+
// Task 4: Now, ID="task-diff-d"
503+
Task task4 = new Task.Builder()
504+
.id("task-diff-d")
505+
.contextId("context-diff-timestamps")
506+
.status(new TaskStatus(TaskState.WORKING, null, now))
507+
.build();
508+
taskStore.save(task4);
509+
510+
// Task 5: 1 minute ago, ID="task-diff-e"
511+
Task task5 = new Task.Builder()
512+
.id("task-diff-e")
513+
.contextId("context-diff-timestamps")
514+
.status(new TaskStatus(TaskState.WORKING, null, now.minusMinutes(1)))
515+
.build();
516+
taskStore.save(task5);
517+
518+
// Expected order (timestamp DESC, id ASC):
519+
// 1. task-diff-d (now)
520+
// 2. task-diff-e (1 min ago)
521+
// 3. task-diff-b (5 min ago, ID 'b')
522+
// 4. task-diff-c (5 min ago, ID 'c')
523+
// 5. task-diff-a (10 min ago)
524+
525+
// Page 1: Get first 2 tasks
526+
ListTasksParams params1 = new ListTasksParams.Builder()
527+
.contextId("context-diff-timestamps")
528+
.pageSize(2)
529+
.build();
530+
ListTasksResult result1 = taskStore.list(params1);
531+
532+
assertEquals(5, result1.totalSize());
533+
assertEquals(2, result1.pageSize());
534+
assertNotNull(result1.nextPageToken(), "Should have next page token");
535+
536+
// Verify first page order
537+
assertEquals("task-diff-d", result1.tasks().get(0).getId(), "First task should be most recent");
538+
assertEquals("task-diff-e", result1.tasks().get(1).getId(), "Second task should be 1 min ago");
539+
540+
// Verify pageToken format: "timestamp_millis:taskId"
541+
assertTrue(result1.nextPageToken().contains(":"), "PageToken should have format timestamp:id");
542+
String[] tokenParts = result1.nextPageToken().split(":", 2);
543+
assertEquals(2, tokenParts.length, "PageToken should have exactly 2 parts");
544+
assertEquals("task-diff-e", tokenParts[1], "PageToken should contain last task ID");
545+
546+
// Page 2: Get next 2 tasks
547+
ListTasksParams params2 = new ListTasksParams.Builder()
548+
.contextId("context-diff-timestamps")
549+
.pageSize(2)
550+
.pageToken(result1.nextPageToken())
551+
.build();
552+
ListTasksResult result2 = taskStore.list(params2);
553+
554+
assertEquals(5, result2.totalSize());
555+
assertEquals(2, result2.pageSize());
556+
assertNotNull(result2.nextPageToken(), "Should have next page token");
557+
558+
// Verify second page order (tasks with same timestamp, sorted by ID)
559+
assertEquals("task-diff-b", result2.tasks().get(0).getId(), "Third task should be 5 min ago, ID 'b'");
560+
assertEquals("task-diff-c", result2.tasks().get(1).getId(), "Fourth task should be 5 min ago, ID 'c'");
561+
562+
// Page 3: Get last task
563+
ListTasksParams params3 = new ListTasksParams.Builder()
564+
.contextId("context-diff-timestamps")
565+
.pageSize(2)
566+
.pageToken(result2.nextPageToken())
567+
.build();
568+
ListTasksResult result3 = taskStore.list(params3);
569+
570+
assertEquals(5, result3.totalSize());
571+
assertEquals(1, result3.pageSize());
572+
assertNull(result3.nextPageToken(), "Last page should have no next page token");
573+
574+
// Verify last task
575+
assertEquals("task-diff-a", result3.tasks().get(0).getId(), "Last task should be oldest");
576+
577+
// Verify no duplicates across all pages
578+
List<String> allTaskIds = new ArrayList<>();
579+
allTaskIds.addAll(result1.tasks().stream().map(Task::getId).toList());
580+
allTaskIds.addAll(result2.tasks().stream().map(Task::getId).toList());
581+
allTaskIds.addAll(result3.tasks().stream().map(Task::getId).toList());
582+
583+
assertEquals(5, allTaskIds.size(), "Should have exactly 5 tasks across all pages");
584+
assertEquals(5, allTaskIds.stream().distinct().count(), "Should have no duplicate tasks");
585+
}
586+
468587
@Test
469588
@Transactional
470589
public void testListTasksHistoryLimiting() {
@@ -573,34 +692,80 @@ public void testListTasksDefaultPageSize() {
573692
assertNotNull(result.nextPageToken(), "Should have next page");
574693
}
575694

695+
@Test
696+
@Transactional
697+
public void testListTasksInvalidPageTokenFormat() {
698+
// Create a task
699+
Task task = new Task.Builder()
700+
.id("task-invalid-token")
701+
.contextId("context-invalid-token")
702+
.status(new TaskStatus(TaskState.WORKING))
703+
.build();
704+
taskStore.save(task);
705+
706+
// Test 1: Legacy ID-only pageToken should throw InvalidParamsError
707+
ListTasksParams params1 = new ListTasksParams.Builder()
708+
.contextId("context-invalid-token")
709+
.pageToken("task-invalid-token") // ID-only format (legacy)
710+
.build();
711+
712+
try {
713+
taskStore.list(params1);
714+
throw new AssertionError("Expected InvalidParamsError for legacy ID-only pageToken");
715+
} catch (io.a2a.spec.InvalidParamsError e) {
716+
// Expected - legacy format not supported
717+
assertTrue(e.getMessage().contains("Invalid pageToken format"),
718+
"Error message should mention invalid format");
719+
}
720+
721+
// Test 2: Malformed timestamp in pageToken should throw InvalidParamsError
722+
ListTasksParams params2 = new ListTasksParams.Builder()
723+
.contextId("context-invalid-token")
724+
.pageToken("not-a-number:task-id") // Invalid timestamp
725+
.build();
726+
727+
try {
728+
taskStore.list(params2);
729+
throw new AssertionError("Expected InvalidParamsError for malformed timestamp");
730+
} catch (io.a2a.spec.InvalidParamsError e) {
731+
// Expected - malformed timestamp
732+
assertTrue(e.getMessage().contains("timestamp must be numeric"),
733+
"Error message should mention numeric timestamp requirement");
734+
}
735+
}
736+
737+
576738
@Test
577739
@Transactional
578740
public void testListTasksOrderingById() {
579-
// Create tasks with IDs that will sort in specific order
741+
// Create tasks with same timestamp to test ID-based tie-breaking
742+
// (spec requires sorting by timestamp DESC, then ID ASC)
743+
OffsetDateTime sameTimestamp = OffsetDateTime.now(java.time.ZoneOffset.UTC);
744+
580745
Task task1 = new Task.Builder()
581746
.id("task-order-a")
582747
.contextId("context-order")
583-
.status(new TaskStatus(TaskState.SUBMITTED))
748+
.status(new TaskStatus(TaskState.SUBMITTED, null, sameTimestamp))
584749
.build();
585750

586751
Task task2 = new Task.Builder()
587752
.id("task-order-b")
588753
.contextId("context-order")
589-
.status(new TaskStatus(TaskState.SUBMITTED))
754+
.status(new TaskStatus(TaskState.SUBMITTED, null, sameTimestamp))
590755
.build();
591756

592757
Task task3 = new Task.Builder()
593758
.id("task-order-c")
594759
.contextId("context-order")
595-
.status(new TaskStatus(TaskState.SUBMITTED))
760+
.status(new TaskStatus(TaskState.SUBMITTED, null, sameTimestamp))
596761
.build();
597762

598763
// Save in reverse order
599764
taskStore.save(task3);
600765
taskStore.save(task1);
601766
taskStore.save(task2);
602767

603-
// List should return in ID order
768+
// List should return sorted by timestamp DESC (all same), then by ID ASC
604769
ListTasksParams params = new ListTasksParams.Builder()
605770
.contextId("context-order")
606771
.build();

reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ public void listTasks(RoutingContext rc) {
128128
String pageSizeStr = rc.request().params().get("pageSize");
129129
String pageToken = rc.request().params().get("pageToken");
130130
String historyLengthStr = rc.request().params().get("historyLength");
131+
String lastUpdatedAfter = rc.request().params().get("lastUpdatedAfter");
131132
String includeArtifactsStr = rc.request().params().get("includeArtifacts");
132133

133134
// Parse optional parameters
@@ -143,11 +144,11 @@ public void listTasks(RoutingContext rc) {
143144

144145
Boolean includeArtifacts = null;
145146
if (includeArtifactsStr != null && !includeArtifactsStr.isEmpty()) {
146-
includeArtifacts = Boolean.parseBoolean(includeArtifactsStr);
147+
includeArtifacts = Boolean.valueOf(includeArtifactsStr);
147148
}
148149

149150
response = jsonRestHandler.listTasks(contextId, statusStr, pageSize, pageToken,
150-
historyLength, includeArtifacts, context);
151+
historyLength, lastUpdatedAfter, includeArtifacts, context);
151152
} catch (NumberFormatException e) {
152153
response = jsonRestHandler.createErrorResponse(new InvalidParamsError("Invalid number format in parameters"));
153154
} catch (IllegalArgumentException e) {

0 commit comments

Comments
 (0)