Skip to content

Commit a0b4b41

Browse files
committed
list endpoints accept rfc3339 strings and return epochs
1 parent 3ff74d9 commit a0b4b41

File tree

2 files changed

+109
-22
lines changed

2 files changed

+109
-22
lines changed

dbos/admin_server.go

Lines changed: 79 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ const (
3434
type listWorkflowsRequest struct {
3535
WorkflowUUIDs []string `json:"workflow_uuids"` // Filter by specific workflow IDs
3636
AuthenticatedUser *string `json:"authenticated_user"` // Filter by user who initiated the workflow
37-
StartTime *int64 `json:"start_time"` // Filter workflows created after this time (UTC timestamp in milliseconds)
38-
EndTime *int64 `json:"end_time"` // Filter workflows created before this time (UTC timestamp in milliseconds)
37+
StartTime *time.Time `json:"start_time"` // Filter workflows created after this time (RFC3339 format)
38+
EndTime *time.Time `json:"end_time"` // Filter workflows created before this time (RFC3339 format)
3939
Status []WorkflowStatusType `json:"status"` // Filter by workflow status(es)
4040
ApplicationVersion *string `json:"application_version"` // Filter by application version
4141
WorkflowName *string `json:"workflow_name"` // Filter by workflow function name
@@ -58,14 +58,10 @@ func (req *listWorkflowsRequest) toListWorkflowsOptions() []ListWorkflowsOption
5858
opts = append(opts, WithUser(*req.AuthenticatedUser))
5959
}
6060
if req.StartTime != nil {
61-
// Convert milliseconds to time.Time
62-
startTime := time.UnixMilli(*req.StartTime)
63-
opts = append(opts, WithStartTime(startTime))
61+
opts = append(opts, WithStartTime(*req.StartTime))
6462
}
6563
if req.EndTime != nil {
66-
// Convert milliseconds to time.Time
67-
endTime := time.UnixMilli(*req.EndTime)
68-
opts = append(opts, WithEndTime(endTime))
64+
opts = append(opts, WithEndTime(*req.EndTime))
6965
}
7066
if len(req.Status) > 0 {
7167
opts = append(opts, WithStatus(req.Status))
@@ -107,6 +103,56 @@ type adminServer struct {
107103
isDeactivated atomic.Int32
108104
}
109105

106+
// workflowStatusToUTC converts a WorkflowStatus to a map with all time fields in UTC
107+
// not super ergonomic but the DBOS console excepts unix timestamps
108+
func workflowStatusToUTC(ws WorkflowStatus) map[string]any {
109+
result := map[string]any{
110+
"workflow_uuid": ws.ID,
111+
"status": ws.Status,
112+
"name": ws.Name,
113+
"authenticated_user": ws.AuthenticatedUser,
114+
"assumed_role": ws.AssumedRole,
115+
"authenticated_roles": ws.AuthenticatedRoles,
116+
"output": ws.Output,
117+
"error": ws.Error,
118+
"executor_id": ws.ExecutorID,
119+
"application_version": ws.ApplicationVersion,
120+
"application_id": ws.ApplicationID,
121+
"attempts": ws.Attempts,
122+
"queue_name": ws.QueueName,
123+
"timeout": ws.Timeout,
124+
"deduplication_id": ws.DeduplicationID,
125+
"input": ws.Input,
126+
}
127+
128+
// Convert time fields to UTC Unix timestamps (milliseconds)
129+
if !ws.CreatedAt.IsZero() {
130+
result["created_at"] = ws.CreatedAt.UTC().UnixMilli()
131+
} else {
132+
result["created_at"] = nil
133+
}
134+
135+
if !ws.UpdatedAt.IsZero() {
136+
result["updated_at"] = ws.UpdatedAt.UTC().UnixMilli()
137+
} else {
138+
result["updated_at"] = nil
139+
}
140+
141+
if !ws.Deadline.IsZero() {
142+
result["deadline"] = ws.Deadline.UTC().UnixMilli()
143+
} else {
144+
result["deadline"] = nil
145+
}
146+
147+
if !ws.StartedAt.IsZero() {
148+
result["started_at"] = ws.StartedAt.UTC().UnixMilli()
149+
} else {
150+
result["started_at"] = nil
151+
}
152+
153+
return result
154+
}
155+
110156
func newAdminServer(ctx *dbosContext, port int) *adminServer {
111157
as := &adminServer{
112158
logger: ctx.logger,
@@ -246,8 +292,14 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
246292
return
247293
}
248294

295+
// Transform to UTC before encoding
296+
utcWorkflows := make([]map[string]any, len(workflows))
297+
for i, wf := range workflows {
298+
utcWorkflows[i] = workflowStatusToUTC(wf)
299+
}
300+
249301
w.Header().Set("Content-Type", "application/json")
250-
if err := json.NewEncoder(w).Encode(workflows); err != nil {
302+
if err := json.NewEncoder(w).Encode(utcWorkflows); err != nil {
251303
ctx.logger.Error("Error encoding workflows response", "error", err)
252304
http.Error(w, fmt.Sprintf("Failed to encode response: %v", err), http.StatusInternalServerError)
253305
}
@@ -270,8 +322,25 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
270322
return
271323
}
272324

325+
// If not queue was specified, filter out non-queued workflows
326+
if req.QueueName == nil {
327+
filtered := make([]WorkflowStatus, 0, len(workflows))
328+
for _, wf := range workflows {
329+
if len(wf.QueueName) > 0 && wf.QueueName != _DBOS_INTERNAL_QUEUE_NAME {
330+
filtered = append(filtered, wf)
331+
}
332+
}
333+
workflows = filtered
334+
}
335+
336+
// Transform to UNIX timestamps before encoding
337+
utcWorkflows := make([]map[string]any, len(workflows))
338+
for i, wf := range workflows {
339+
utcWorkflows[i] = workflowStatusToUTC(wf)
340+
}
341+
273342
w.Header().Set("Content-Type", "application/json")
274-
if err := json.NewEncoder(w).Encode(workflows); err != nil {
343+
if err := json.NewEncoder(w).Encode(utcWorkflows); err != nil {
275344
ctx.logger.Error("Error encoding queued workflows response", "error", err)
276345
http.Error(w, fmt.Sprintf("Failed to encode response: %v", err), http.StatusInternalServerError)
277346
}

dbos/admin_server_test.go

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func TestAdminServer(t *testing.T) {
165165
for _, tt := range tests {
166166
t.Run(tt.name, func(t *testing.T) {
167167
// Special handling for workflows time filter test
168-
if tt.name == "List Workflows endpoint" {
168+
if tt.name == "List endpoints time filtering" {
169169
// Create first workflow
170170
handle1, err := RunAsWorkflow(ctx, testWorkflow, "workflow1")
171171
require.NoError(t, err, "Failed to create first workflow")
@@ -190,7 +190,7 @@ func TestAdminServer(t *testing.T) {
190190

191191
// Test 1: Query with start_time before timeBetween (should get both workflows)
192192
reqBody1 := map[string]any{
193-
"start_time": timeBetween.Add(-2 * time.Second).UnixMilli(),
193+
"start_time": timeBetween.Add(-2 * time.Second).Format(time.RFC3339),
194194
"limit": 10,
195195
}
196196
req1, err := http.NewRequest(tt.method, tt.endpoint, bytes.NewBuffer(mustMarshal(reqBody1)))
@@ -203,24 +203,36 @@ func TestAdminServer(t *testing.T) {
203203

204204
assert.Equal(t, tt.expectedStatus, resp1.StatusCode)
205205

206-
var workflows1 []WorkflowStatus
206+
var workflows1 []map[string]any
207207
err = json.NewDecoder(resp1.Body).Decode(&workflows1)
208208
require.NoError(t, err, "Failed to decode workflows response 1")
209209

210210
// Should have exactly 2 workflows that we just created
211211
assert.Equal(t, 2, len(workflows1), "Expected exactly 2 workflows with start_time before timeBetween")
212212

213+
// Verify timestamps are epoch milliseconds
214+
timeBetweenMillis := timeBetween.UnixMilli()
215+
for _, wf := range workflows1 {
216+
_, ok := wf["created_at"].(float64)
217+
require.True(t, ok, "created_at should be a number")
218+
}
219+
// Verify the timestamp is around timeBetween (within 2 seconds before or after)
220+
assert.Less(t, int64(workflows1[0]["created_at"].(float64)), timeBetweenMillis, "first workflow CreatedAt should be before timeBetween")
221+
assert.Greater(t, int64(workflows1[1]["created_at"].(float64)), timeBetweenMillis, "second workflow CreatedAt should be before timeBetween")
222+
213223
// Verify both workflow IDs are present
214224
foundIDs1 := make(map[string]bool)
215225
for _, wf := range workflows1 {
216-
foundIDs1[wf.ID] = true
226+
id, ok := wf["workflow_uuid"].(string)
227+
require.True(t, ok, "workflow_uuid should be a string")
228+
foundIDs1[id] = true
217229
}
218230
assert.True(t, foundIDs1[workflowID1], "Expected to find first workflow ID in results")
219231
assert.True(t, foundIDs1[workflowID2], "Expected to find second workflow ID in results")
220232

221233
// Test 2: Query with start_time after timeBetween (should get only second workflow)
222234
reqBody2 := map[string]any{
223-
"start_time": timeBetween.UnixMilli(),
235+
"start_time": timeBetween.Format(time.RFC3339),
224236
"limit": 10,
225237
}
226238
req2, err := http.NewRequest(tt.method, tt.endpoint, bytes.NewBuffer(mustMarshal(reqBody2)))
@@ -233,19 +245,21 @@ func TestAdminServer(t *testing.T) {
233245

234246
assert.Equal(t, tt.expectedStatus, resp2.StatusCode)
235247

236-
var workflows2 []WorkflowStatus
248+
var workflows2 []map[string]any
237249
err = json.NewDecoder(resp2.Body).Decode(&workflows2)
238250
require.NoError(t, err, "Failed to decode workflows response 2")
239251

240252
// Should have exactly 1 workflow (the second one)
241253
assert.Equal(t, 1, len(workflows2), "Expected exactly 1 workflow with start_time after timeBetween")
242254

243255
// Verify it's the second workflow
244-
assert.Equal(t, workflowID2, workflows2[0].ID, "Expected second workflow ID in results")
256+
id2, ok := workflows2[0]["workflow_uuid"].(string)
257+
require.True(t, ok, "workflow_uuid should be a string")
258+
assert.Equal(t, workflowID2, id2, "Expected second workflow ID in results")
245259

246260
// Also test end_time filter
247261
reqBody3 := map[string]any{
248-
"end_time": timeBetween.UnixMilli(),
262+
"end_time": timeBetween.Format(time.RFC3339),
249263
"limit": 10,
250264
}
251265
req3, err := http.NewRequest(tt.method, tt.endpoint, bytes.NewBuffer(mustMarshal(reqBody3)))
@@ -258,15 +272,17 @@ func TestAdminServer(t *testing.T) {
258272

259273
assert.Equal(t, tt.expectedStatus, resp3.StatusCode)
260274

261-
var workflows3 []WorkflowStatus
275+
var workflows3 []map[string]any
262276
err = json.NewDecoder(resp3.Body).Decode(&workflows3)
263277
require.NoError(t, err, "Failed to decode workflows response 3")
264278

265279
// Should have exactly 1 workflow (the first one)
266280
assert.Equal(t, 1, len(workflows3), "Expected exactly 1 workflow with end_time before timeBetween")
267281

268282
// Verify it's the first workflow
269-
assert.Equal(t, workflowID1, workflows3[0].ID, "Expected first workflow ID in results")
283+
id3, ok := workflows3[0]["workflow_uuid"].(string)
284+
require.True(t, ok, "workflow_uuid should be a string")
285+
assert.Equal(t, workflowID1, id3, "Expected first workflow ID in results")
270286

271287
// Test 4: Query with empty body (should return all workflows)
272288
req4, err := http.NewRequest(tt.method, tt.endpoint, nil)
@@ -278,7 +294,7 @@ func TestAdminServer(t *testing.T) {
278294

279295
assert.Equal(t, tt.expectedStatus, resp4.StatusCode)
280296

281-
var workflows4 []WorkflowStatus
297+
var workflows4 []map[string]any
282298
err = json.NewDecoder(resp4.Body).Decode(&workflows4)
283299
require.NoError(t, err, "Failed to decode workflows response 4")
284300

@@ -288,7 +304,9 @@ func TestAdminServer(t *testing.T) {
288304
// Verify both workflow IDs are present
289305
foundIDs4 := make(map[string]bool)
290306
for _, wf := range workflows4 {
291-
foundIDs4[wf.ID] = true
307+
id, ok := wf["workflow_uuid"].(string)
308+
require.True(t, ok, "workflow_uuid should be a string")
309+
foundIDs4[id] = true
292310
}
293311
assert.True(t, foundIDs4[workflowID1], "Expected to find first workflow ID in empty body results")
294312
assert.True(t, foundIDs4[workflowID2], "Expected to find second workflow ID in empty body results")

0 commit comments

Comments
 (0)