Skip to content

Commit 83f795f

Browse files
committed
feat: update tasks/list implementation to match A2A 0.4.0 spec
- Sort tasks by status.timestamp DESC (most recent first), then ID ASC - Add lastUpdatedAfter parameter filtering - Update InMemoryTaskStore and JpaDatabaseTaskStore sorting logic - Add timestamp denormalized column to JpaTask for efficient querying - Update all transport handlers (JSON-RPC, gRPC, REST)
1 parent 4c71032 commit 83f795f

File tree

7 files changed

+92
-31
lines changed

7 files changed

+92
-31
lines changed

extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,13 +227,19 @@ public ListTasksResult list(ListTasksParams params) {
227227
countQueryBuilder.append(" AND t.state = :state");
228228
}
229229

230+
// Apply lastUpdatedAfter filter using denormalized timestamp column
231+
if (params.lastUpdatedAfter() != null) {
232+
queryBuilder.append(" AND t.statusTimestamp > :lastUpdatedAfter");
233+
countQueryBuilder.append(" AND t.statusTimestamp > :lastUpdatedAfter");
234+
}
235+
230236
// Apply pagination cursor (tasks after pageToken)
231237
if (params.pageToken() != null && !params.pageToken().isEmpty()) {
232238
queryBuilder.append(" AND t.id > :pageToken");
233239
}
234240

235-
// Sort by task ID for consistent pagination
236-
queryBuilder.append(" ORDER BY t.id");
241+
// Sort by status timestamp descending (most recent first), then by ID for stable ordering
242+
queryBuilder.append(" ORDER BY t.statusTimestamp DESC, t.id ASC");
237243

238244
// Create and configure the main query
239245
TypedQuery<JpaTask> query = em.createQuery(queryBuilder.toString(), JpaTask.class);
@@ -245,6 +251,9 @@ public ListTasksResult list(ListTasksParams params) {
245251
if (params.status() != null) {
246252
query.setParameter("state", params.status().asString());
247253
}
254+
if (params.lastUpdatedAfter() != null) {
255+
query.setParameter("lastUpdatedAfter", params.lastUpdatedAfter());
256+
}
248257
if (params.pageToken() != null && !params.pageToken().isEmpty()) {
249258
query.setParameter("pageToken", params.pageToken());
250259
}
@@ -270,6 +279,9 @@ public ListTasksResult list(ListTasksParams params) {
270279
if (params.status() != null) {
271280
countQuery.setParameter("state", params.status().asString());
272281
}
282+
if (params.lastUpdatedAfter() != null) {
283+
countQuery.setParameter("lastUpdatedAfter", params.lastUpdatedAfter());
284+
}
273285
int totalSize = countQuery.getSingleResult().intValue();
274286

275287
// Deserialize tasks from JSON

extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaTask.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ public class JpaTask {
2525
@Column(name = "state")
2626
private String state;
2727

28+
@Column(name = "status_timestamp")
29+
private Instant statusTimestamp;
30+
2831
@Column(name = "task_data", columnDefinition = "TEXT", nullable = false)
2932
private String taskJson;
3033

@@ -67,6 +70,14 @@ public void setState(String state) {
6770
this.state = state;
6871
}
6972

73+
public Instant getStatusTimestamp() {
74+
return statusTimestamp;
75+
}
76+
77+
public void setStatusTimestamp(Instant statusTimestamp) {
78+
this.statusTimestamp = statusTimestamp;
79+
}
80+
7081
public String getTaskJson() {
7182
return taskJson;
7283
}
@@ -123,7 +134,7 @@ static JpaTask createFromTask(Task task) throws JsonProcessingException {
123134
}
124135

125136
/**
126-
* Updates denormalized fields (contextId, state) from the task object.
137+
* Updates denormalized fields (contextId, state, statusTimestamp) from the task object.
127138
* These fields are duplicated from the JSON to enable efficient querying.
128139
*
129140
* @param task the task to extract fields from
@@ -133,8 +144,13 @@ private void updateDenormalizedFields(Task task) {
133144
if (task.getStatus() != null) {
134145
io.a2a.spec.TaskState taskState = task.getStatus().state();
135146
this.state = (taskState != null) ? taskState.asString() : null;
147+
// Extract status timestamp for efficient querying and sorting
148+
this.statusTimestamp = (task.getStatus().timestamp() != null)
149+
? task.getStatus().timestamp().toInstant()
150+
: null;
136151
} else {
137152
this.state = null;
153+
this.statusTimestamp = null;
138154
}
139155
}
140156

extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -418,12 +418,14 @@ public void testListTasksCombinedFilters() {
418418
@Test
419419
@Transactional
420420
public void testListTasksPagination() {
421-
// Create 5 tasks
421+
// Create 5 tasks with same timestamp to ensure ID-based pagination works
422+
// (With timestamp DESC sorting, same timestamps allow ID ASC tie-breaking)
423+
java.time.OffsetDateTime sameTimestamp = java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC);
422424
for (int i = 1; i <= 5; i++) {
423425
Task task = new Task.Builder()
424426
.id("task-page-" + i)
425427
.contextId("context-pagination")
426-
.status(new TaskStatus(TaskState.SUBMITTED))
428+
.status(new TaskStatus(TaskState.SUBMITTED, null, sameTimestamp))
427429
.build();
428430
taskStore.save(task);
429431
}
@@ -576,31 +578,34 @@ public void testListTasksDefaultPageSize() {
576578
@Test
577579
@Transactional
578580
public void testListTasksOrderingById() {
579-
// Create tasks with IDs that will sort in specific order
581+
// Create tasks with same timestamp to test ID-based tie-breaking
582+
// (spec requires sorting by timestamp DESC, then ID ASC)
583+
java.time.OffsetDateTime sameTimestamp = java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC);
584+
580585
Task task1 = new Task.Builder()
581586
.id("task-order-a")
582587
.contextId("context-order")
583-
.status(new TaskStatus(TaskState.SUBMITTED))
588+
.status(new TaskStatus(TaskState.SUBMITTED, null, sameTimestamp))
584589
.build();
585590

586591
Task task2 = new Task.Builder()
587592
.id("task-order-b")
588593
.contextId("context-order")
589-
.status(new TaskStatus(TaskState.SUBMITTED))
594+
.status(new TaskStatus(TaskState.SUBMITTED, null, sameTimestamp))
590595
.build();
591596

592597
Task task3 = new Task.Builder()
593598
.id("task-order-c")
594599
.contextId("context-order")
595-
.status(new TaskStatus(TaskState.SUBMITTED))
600+
.status(new TaskStatus(TaskState.SUBMITTED, null, sameTimestamp))
596601
.build();
597602

598603
// Save in reverse order
599604
taskStore.save(task3);
600605
taskStore.save(task1);
601606
taskStore.save(task2);
602607

603-
// List should return in ID order
608+
// List should return sorted by timestamp DESC (all same), then by ID ASC
604609
ListTasksParams params = new ListTasksParams.Builder()
605610
.contextId("context-order")
606611
.build();

reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ public void listTasks(RoutingContext rc) {
128128
String pageSizeStr = rc.request().params().get("pageSize");
129129
String pageToken = rc.request().params().get("pageToken");
130130
String historyLengthStr = rc.request().params().get("historyLength");
131+
String lastUpdatedAfter = rc.request().params().get("lastUpdatedAfter");
131132
String includeArtifactsStr = rc.request().params().get("includeArtifacts");
132133

133134
// Parse optional parameters
@@ -147,7 +148,7 @@ public void listTasks(RoutingContext rc) {
147148
}
148149

149150
response = jsonRestHandler.listTasks(contextId, statusStr, pageSize, pageToken,
150-
historyLength, includeArtifacts, context);
151+
historyLength, lastUpdatedAfter, includeArtifacts, context);
151152
} catch (NumberFormatException e) {
152153
response = jsonRestHandler.createErrorResponse(new InvalidParamsError("Invalid number format in parameters"));
153154
} catch (IllegalArgumentException e) {

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import io.a2a.spec.EventKind;
4444
import io.a2a.spec.GetTaskPushNotificationConfigParams;
4545
import io.a2a.spec.InternalError;
46+
import io.a2a.spec.InvalidParamsError;
4647
import io.a2a.spec.JSONRPCError;
4748
import io.a2a.spec.ListTaskPushNotificationConfigParams;
4849
import io.a2a.spec.ListTasksParams;
@@ -165,8 +166,21 @@ private static Task limitTaskHistory(Task task, int historyLength) {
165166

166167
@Override
167168
public ListTasksResult onListTasks(ListTasksParams params, ServerCallContext context) throws JSONRPCError {
168-
LOGGER.debug("onListTasks with contextId={}, status={}, pageSize={}, pageToken={}",
169-
params.contextId(), params.status(), params.pageSize(), params.pageToken());
169+
LOGGER.debug("onListTasks with contextId={}, status={}, pageSize={}, pageToken={}, lastUpdatedAfter={}",
170+
params.contextId(), params.status(), params.pageSize(), params.pageToken(), params.lastUpdatedAfter());
171+
172+
// Validate lastUpdatedAfter timestamp if provided
173+
if (params.lastUpdatedAfter() != null) {
174+
// Check if timestamp is in the future (optional validation per spec)
175+
java.time.Instant now = java.time.Instant.now();
176+
if (params.lastUpdatedAfter().isAfter(now)) {
177+
java.util.Map<String, Object> errorData = new java.util.HashMap<>();
178+
errorData.put("parameter", "lastUpdatedAfter");
179+
errorData.put("reason", "Timestamp cannot be in the future");
180+
throw new InvalidParamsError(null, "Invalid params", errorData);
181+
}
182+
}
183+
170184
ListTasksResult result = taskStore.list(params);
171185
LOGGER.debug("Found {} tasks (total: {})", result.pageSize(), result.totalSize());
172186
return result;

server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,18 @@ public ListTasksResult list(ListTasksParams params) {
4949
task.getStatus() != null && params.status().equals(task.getStatus().state())
5050
);
5151
}
52-
// Note: lastUpdatedAfter filtering not implemented in InMemoryTaskStore
53-
// as Task doesn't have a lastUpdated timestamp field
52+
if (params.lastUpdatedAfter() != null) {
53+
taskStream = taskStream.filter(task ->
54+
task.getStatus() != null &&
55+
task.getStatus().timestamp() != null &&
56+
task.getStatus().timestamp().toInstant().isAfter(params.lastUpdatedAfter())
57+
);
58+
}
5459

55-
// Sort by task ID for consistent pagination
60+
// Sort by status timestamp descending (most recent first), then by ID ascending for stable ordering
5661
List<Task> allFilteredTasks = taskStream
57-
.sorted(Comparator.comparing(Task::getId))
62+
.sorted(Comparator.comparing((Task t) -> t.getStatus().timestamp(), Comparator.nullsLast(Comparator.reverseOrder()))
63+
.thenComparing(Task::getId))
5864
.toList();
5965

6066
int totalSize = allFilteredTasks.size();
@@ -63,21 +69,17 @@ public ListTasksResult list(ListTasksParams params) {
6369
int pageSize = params.getEffectivePageSize();
6470
int startIndex = 0;
6571

66-
// Handle page token (simple cursor: last task ID from previous page)
72+
// Handle page token (cursor: task ID from previous page)
73+
// Since we're sorted by timestamp DESC then ID ASC, we can't use binary search
74+
// Instead, find the task with the matching ID using linear search
6775
if (params.pageToken() != null && !params.pageToken().isEmpty()) {
68-
// Use binary search since list is sorted by task ID (O(log N) vs O(N))
69-
int index = Collections.binarySearch(allFilteredTasks, null,
70-
(t1, t2) -> {
71-
// Handle null key comparisons (binarySearch passes null as one argument)
72-
if (t1 == null && t2 == null) return 0;
73-
if (t1 == null) return params.pageToken().compareTo(t2.getId());
74-
if (t2 == null) return t1.getId().compareTo(params.pageToken());
75-
return t1.getId().compareTo(t2.getId());
76-
});
77-
if (index >= 0) {
78-
startIndex = index + 1;
76+
for (int i = 0; i < allFilteredTasks.size(); i++) {
77+
if (allFilteredTasks.get(i).getId().equals(params.pageToken())) {
78+
startIndex = i + 1;
79+
break;
80+
}
7981
}
80-
// If not found (index < 0), startIndex remains 0 (start from beginning)
82+
// If not found, startIndex remains 0 (start from beginning)
8183
}
8284

8385
// Get the page of tasks

transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,8 @@ public HTTPRestResponse getTask(String taskId, int historyLength, ServerCallCont
182182

183183
public HTTPRestResponse listTasks(@Nullable String contextId, @Nullable String status,
184184
@Nullable Integer pageSize, @Nullable String pageToken,
185-
@Nullable Integer historyLength, @Nullable Boolean includeArtifacts,
185+
@Nullable Integer historyLength, @Nullable String lastUpdatedAfter,
186+
@Nullable Boolean includeArtifacts,
186187
ServerCallContext context) {
187188
try {
188189
// Build params
@@ -202,6 +203,16 @@ public HTTPRestResponse listTasks(@Nullable String contextId, @Nullable String s
202203
if (historyLength != null) {
203204
paramsBuilder.historyLength(historyLength);
204205
}
206+
if (lastUpdatedAfter != null) {
207+
try {
208+
paramsBuilder.lastUpdatedAfter(java.time.Instant.parse(lastUpdatedAfter));
209+
} catch (java.time.format.DateTimeParseException e) {
210+
java.util.Map<String, Object> errorData = new java.util.HashMap<>();
211+
errorData.put("parameter", "lastUpdatedAfter");
212+
errorData.put("reason", "Must be valid ISO-8601 timestamp");
213+
throw new InvalidParamsError(null, "Invalid params", errorData);
214+
}
215+
}
205216
if (includeArtifacts != null) {
206217
paramsBuilder.includeArtifacts(includeArtifacts);
207218
}

0 commit comments

Comments
 (0)