Skip to content

Commit d1add35

Browse files
Merge branch 'dbos-inc:main' into chore/exp-backoff-delay-system-db
2 parents 87157f5 + 68cdafb commit d1add35

12 files changed

+205
-124
lines changed

dbos/admin_server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (req *listWorkflowsRequest) toListWorkflowsOptions() []ListWorkflowsOption
8181
opts = append(opts, WithOffset(*req.Offset))
8282
}
8383
if req.SortDesc != nil {
84-
opts = append(opts, WithSortDesc(*req.SortDesc))
84+
opts = append(opts, WithSortDesc())
8585
}
8686
if req.WorkflowIDPrefix != nil {
8787
opts = append(opts, WithWorkflowIDPrefix(*req.WorkflowIDPrefix))
@@ -412,7 +412,7 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
412412
mux.HandleFunc(_WORKFLOW_STEPS_PATTERN, func(w http.ResponseWriter, r *http.Request) {
413413
workflowID := r.PathValue("id")
414414

415-
steps, err := ctx.systemDB.getWorkflowSteps(ctx, workflowID)
415+
steps, err := GetWorkflowSteps(ctx, workflowID)
416416
if err != nil {
417417
ctx.logger.Error("Failed to list workflow steps", "workflow_id", workflowID, "error", err)
418418
http.Error(w, fmt.Sprintf("Failed to list steps: %v", err), http.StatusInternalServerError)

dbos/admin_server_test.go

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func TestAdminServer(t *testing.T) {
3636

3737
// Verify admin server is not running
3838
client := &http.Client{Timeout: 1 * time.Second}
39-
_, err = client.Get(fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_HEALTHCHECK_PATTERN, "GET /")))
39+
_, err = client.Get(fmt.Sprintf("http://localhost:%d/%s", _DEFAULT_ADMIN_SERVER_PORT, strings.TrimPrefix(_HEALTHCHECK_PATTERN, "GET /")))
4040
require.Error(t, err, "Expected request to fail when admin server is not started")
4141

4242
// Verify the DBOS executor doesn't have an admin server instance
@@ -51,9 +51,10 @@ func TestAdminServer(t *testing.T) {
5151
resetTestDatabase(t, databaseURL)
5252
// Launch DBOS with admin server once for all endpoint tests
5353
ctx, err := NewDBOSContext(Config{
54-
DatabaseURL: databaseURL,
55-
AppName: "test-app",
56-
AdminServer: true,
54+
DatabaseURL: databaseURL,
55+
AppName: "test-app",
56+
AdminServer: true,
57+
AdminServerPort: _DEFAULT_ADMIN_SERVER_PORT,
5758
})
5859
require.NoError(t, err)
5960

@@ -92,13 +93,13 @@ func TestAdminServer(t *testing.T) {
9293
{
9394
name: "Health endpoint responds correctly",
9495
method: "GET",
95-
endpoint: fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_HEALTHCHECK_PATTERN, "GET /")),
96+
endpoint: fmt.Sprintf("http://localhost:%d/%s", _DEFAULT_ADMIN_SERVER_PORT, strings.TrimPrefix(_HEALTHCHECK_PATTERN, "GET /")),
9697
expectedStatus: http.StatusOK,
9798
},
9899
{
99100
name: "Recovery endpoint responds correctly with valid JSON",
100101
method: "POST",
101-
endpoint: fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_WORKFLOW_RECOVERY_PATTERN, "POST /")),
102+
endpoint: fmt.Sprintf("http://localhost:%d/%s", _DEFAULT_ADMIN_SERVER_PORT, strings.TrimPrefix(_WORKFLOW_RECOVERY_PATTERN, "POST /")),
102103
body: bytes.NewBuffer(mustMarshal([]string{"executor1", "executor2"})),
103104
contentType: "application/json",
104105
expectedStatus: http.StatusOK,
@@ -112,15 +113,15 @@ func TestAdminServer(t *testing.T) {
112113
{
113114
name: "Recovery endpoint rejects invalid JSON",
114115
method: "POST",
115-
endpoint: fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_WORKFLOW_RECOVERY_PATTERN, "POST /")),
116+
endpoint: fmt.Sprintf("http://localhost:%d/%s", _DEFAULT_ADMIN_SERVER_PORT, strings.TrimPrefix(_WORKFLOW_RECOVERY_PATTERN, "POST /")),
116117
body: strings.NewReader(`{"invalid": json}`),
117118
contentType: "application/json",
118119
expectedStatus: http.StatusBadRequest,
119120
},
120121
{
121122
name: "Queue metadata endpoint responds correctly",
122123
method: "GET",
123-
endpoint: fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_WORKFLOW_QUEUES_METADATA_PATTERN, "GET /")),
124+
endpoint: fmt.Sprintf("http://localhost:%d/%s", _DEFAULT_ADMIN_SERVER_PORT, strings.TrimPrefix(_WORKFLOW_QUEUES_METADATA_PATTERN, "GET /")),
124125
expectedStatus: http.StatusOK,
125126
validateResp: func(t *testing.T, resp *http.Response) {
126127
var queueMetadata []WorkflowQueue
@@ -146,7 +147,7 @@ func TestAdminServer(t *testing.T) {
146147
{
147148
name: "Workflows endpoint accepts all filters without error",
148149
method: "POST",
149-
endpoint: fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_WORKFLOWS_PATTERN, "POST /")),
150+
endpoint: fmt.Sprintf("http://localhost:%d/%s", _DEFAULT_ADMIN_SERVER_PORT, strings.TrimPrefix(_WORKFLOWS_PATTERN, "POST /")),
150151
body: bytes.NewBuffer(mustMarshal(map[string]any{
151152
"workflow_uuids": []string{"test-id-1", "test-id-2"},
152153
"authenticated_user": "test-user",
@@ -177,7 +178,7 @@ func TestAdminServer(t *testing.T) {
177178
{
178179
name: "Get single workflow returns 404 for non-existent workflow",
179180
method: "GET",
180-
endpoint: "http://localhost:3001/workflow/non-existent-workflow-id",
181+
endpoint: fmt.Sprintf("http://localhost:%d/workflow/non-existent-workflow-id", _DEFAULT_ADMIN_SERVER_PORT),
181182
expectedStatus: http.StatusNotFound,
182183
},
183184
}
@@ -214,9 +215,10 @@ func TestAdminServer(t *testing.T) {
214215
t.Run("List workflows input/output values", func(t *testing.T) {
215216
resetTestDatabase(t, databaseURL)
216217
ctx, err := NewDBOSContext(Config{
217-
DatabaseURL: databaseURL,
218-
AppName: "test-app",
219-
AdminServer: true,
218+
DatabaseURL: databaseURL,
219+
AppName: "test-app",
220+
AdminServer: true,
221+
AdminServerPort: _DEFAULT_ADMIN_SERVER_PORT,
220222
})
221223
require.NoError(t, err)
222224

@@ -258,7 +260,7 @@ func TestAdminServer(t *testing.T) {
258260
time.Sleep(100 * time.Millisecond)
259261

260262
client := &http.Client{Timeout: 5 * time.Second}
261-
endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_WORKFLOWS_PATTERN, "POST /"))
263+
endpoint := fmt.Sprintf("http://localhost:%d/%s", _DEFAULT_ADMIN_SERVER_PORT, strings.TrimPrefix(_WORKFLOWS_PATTERN, "POST /"))
262264

263265
// Create workflows with different input/output types
264266
// 1. Integer workflow
@@ -360,9 +362,10 @@ func TestAdminServer(t *testing.T) {
360362
t.Run("List endpoints time filtering", func(t *testing.T) {
361363
resetTestDatabase(t, databaseURL)
362364
ctx, err := NewDBOSContext(Config{
363-
DatabaseURL: databaseURL,
364-
AppName: "test-app",
365-
AdminServer: true,
365+
DatabaseURL: databaseURL,
366+
AppName: "test-app",
367+
AdminServer: true,
368+
AdminServerPort: _DEFAULT_ADMIN_SERVER_PORT,
366369
})
367370
require.NoError(t, err)
368371

@@ -382,7 +385,7 @@ func TestAdminServer(t *testing.T) {
382385
}()
383386

384387
client := &http.Client{Timeout: 5 * time.Second}
385-
endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_WORKFLOWS_PATTERN, "POST /"))
388+
endpoint := fmt.Sprintf("http://localhost:%d/%s", _DEFAULT_ADMIN_SERVER_PORT, strings.TrimPrefix(_WORKFLOWS_PATTERN, "POST /"))
386389

387390
// Create first workflow
388391
handle1, err := RunWorkflow(ctx, testWorkflow, "workflow1")
@@ -533,9 +536,10 @@ func TestAdminServer(t *testing.T) {
533536
t.Run("ListQueuedWorkflows", func(t *testing.T) {
534537
resetTestDatabase(t, databaseURL)
535538
ctx, err := NewDBOSContext(Config{
536-
DatabaseURL: databaseURL,
537-
AppName: "test-app",
538-
AdminServer: true,
539+
DatabaseURL: databaseURL,
540+
AppName: "test-app",
541+
AdminServer: true,
542+
AdminServerPort: _DEFAULT_ADMIN_SERVER_PORT,
539543
})
540544
require.NoError(t, err)
541545

@@ -570,7 +574,7 @@ func TestAdminServer(t *testing.T) {
570574
}()
571575

572576
client := &http.Client{Timeout: 5 * time.Second}
573-
endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_QUEUED_WORKFLOWS_PATTERN, "POST /"))
577+
endpoint := fmt.Sprintf("http://localhost:%d/%s", _DEFAULT_ADMIN_SERVER_PORT, strings.TrimPrefix(_QUEUED_WORKFLOWS_PATTERN, "POST /"))
574578

575579
/// Create a workflow that will not block the queue
576580
h1, err := RunWorkflow(ctx, regularWorkflow, "regular", WithQueue(queue.Name))
@@ -718,9 +722,10 @@ func TestAdminServer(t *testing.T) {
718722
t.Run("TestDeactivate", func(t *testing.T) {
719723
resetTestDatabase(t, databaseURL)
720724
ctx, err := NewDBOSContext(Config{
721-
DatabaseURL: databaseURL,
722-
AppName: "test-app",
723-
AdminServer: true,
725+
DatabaseURL: databaseURL,
726+
AppName: "test-app",
727+
AdminServer: true,
728+
AdminServerPort: _DEFAULT_ADMIN_SERVER_PORT,
724729
})
725730
require.NoError(t, err)
726731

@@ -754,7 +759,7 @@ func TestAdminServer(t *testing.T) {
754759
}, 3*time.Second, 100*time.Millisecond, "Expected at least 2 scheduled workflow executions")
755760

756761
// Call deactivate endpoint
757-
endpoint := fmt.Sprintf("http://localhost:3001/%s", strings.TrimPrefix(_DEACTIVATE_PATTERN, "GET /"))
762+
endpoint := fmt.Sprintf("http://localhost:%d/%s", _DEFAULT_ADMIN_SERVER_PORT, strings.TrimPrefix(_DEACTIVATE_PATTERN, "GET /"))
758763
req, err := http.NewRequest("GET", endpoint, nil)
759764
require.NoError(t, err, "Failed to create deactivate request")
760765

dbos/client_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -805,14 +805,13 @@ func TestListWorkflows(t *testing.T) {
805805

806806
// Test 7: Test sorting order (ascending - default)
807807
ascWorkflows, err := ListWorkflows(clientCtx,
808-
WithWorkflowIDPrefix("test-"),
809-
WithSortDesc(false))
808+
WithWorkflowIDPrefix("test-"))
810809
require.NoError(t, err, "failed to list workflows ascending")
811810

812811
// Test 8: Test sorting order (descending)
813812
descWorkflows, err := ListWorkflows(clientCtx,
814813
WithWorkflowIDPrefix("test-"),
815-
WithSortDesc(true))
814+
WithSortDesc())
816815
require.NoError(t, err, "failed to list workflows descending")
817816

818817
// Verify sorting - workflows should be ordered by creation time

dbos/conductor.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@ func (c *Conductor) handleListWorkflowsRequest(data []byte, requestID string) er
562562
var opts []ListWorkflowsOption
563563
opts = append(opts, WithLoadInput(req.Body.LoadInput))
564564
opts = append(opts, WithLoadOutput(req.Body.LoadOutput))
565-
opts = append(opts, WithSortDesc(req.Body.SortDesc))
565+
opts = append(opts, WithSortDesc())
566566
if len(req.Body.WorkflowUUIDs) > 0 {
567567
opts = append(opts, WithWorkflowIDs(req.Body.WorkflowUUIDs))
568568
}
@@ -638,7 +638,7 @@ func (c *Conductor) handleListQueuedWorkflowsRequest(data []byte, requestID stri
638638
var opts []ListWorkflowsOption
639639
opts = append(opts, WithLoadInput(req.Body.LoadInput))
640640
opts = append(opts, WithLoadOutput(false)) // Don't load output for queued workflows
641-
opts = append(opts, WithSortDesc(req.Body.SortDesc))
641+
opts = append(opts, WithSortDesc())
642642
opts = append(opts, WithQueuesOnly()) // Only include workflows that are in queues
643643

644644
// Add status filter for queued workflows
@@ -719,8 +719,8 @@ func (c *Conductor) handleListStepsRequest(data []byte, requestID string) error
719719
}
720720
c.logger.Debug("Handling list steps request", "request", req)
721721

722-
// Get workflow steps using the existing systemDB method
723-
steps, err := c.dbosCtx.systemDB.getWorkflowSteps(c.dbosCtx, req.WorkflowID)
722+
// Get workflow steps using the public GetWorkflowSteps method
723+
steps, err := GetWorkflowSteps(c.dbosCtx, req.WorkflowID)
724724
if err != nil {
725725
c.logger.Error("Failed to list workflow steps", "workflow_id", req.WorkflowID, "error", err)
726726
errorMsg := fmt.Sprintf("failed to list workflow steps: %v", err)

dbos/conductor_protocol.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,8 @@ type listStepsConductorResponse struct {
190190
Output *[]workflowStepsConductorResponseBody `json:"output,omitempty"`
191191
}
192192

193-
// formatWorkflowStepsResponseBody converts stepInfo to workflowStepsConductorResponseBody for the conductor protocol
194-
func formatWorkflowStepsResponseBody(step stepInfo) workflowStepsConductorResponseBody {
193+
// formatWorkflowStepsResponseBody converts StepInfo to workflowStepsConductorResponseBody for the conductor protocol
194+
func formatWorkflowStepsResponseBody(step StepInfo) workflowStepsConductorResponseBody {
195195
output := workflowStepsConductorResponseBody{
196196
FunctionID: step.StepID,
197197
FunctionName: step.StepName,

dbos/dbos.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type Config struct {
3636
AppName string // Application name for identification (required)
3737
Logger *slog.Logger // Custom logger instance (defaults to a new slog logger)
3838
AdminServer bool // Enable Transact admin HTTP server (disabled by default)
39+
AdminServerPort int // Port for the admin HTTP server (default: 3001)
3940
ConductorURL string // DBOS conductor service URL (optional)
4041
ConductorAPIKey string // DBOS conductor API key (optional)
4142
ApplicationVersion string // Application version (optional, overridden by DBOS__APPVERSION env var)
@@ -51,12 +52,16 @@ func processConfig(inputConfig *Config) (*Config, error) {
5152
if len(inputConfig.AppName) == 0 {
5253
return nil, fmt.Errorf("missing required config field: appName")
5354
}
55+
if inputConfig.AdminServerPort == 0 {
56+
inputConfig.AdminServerPort = _DEFAULT_ADMIN_SERVER_PORT
57+
}
5458

5559
dbosConfig := &Config{
5660
DatabaseURL: inputConfig.DatabaseURL,
5761
AppName: inputConfig.AppName,
5862
Logger: inputConfig.Logger,
5963
AdminServer: inputConfig.AdminServer,
64+
AdminServerPort: inputConfig.AdminServerPort,
6065
ConductorURL: inputConfig.ConductorURL,
6166
ConductorAPIKey: inputConfig.ConductorAPIKey,
6267
ApplicationVersion: inputConfig.ApplicationVersion,
@@ -101,15 +106,15 @@ type DBOSContext interface {
101106
Shutdown(timeout time.Duration) // Gracefully shutdown all DBOS runtime components with ordered cleanup sequence
102107

103108
// Workflow operations
104-
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
109+
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
105110
RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution
106-
Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow
107-
Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow
108-
SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow
109-
GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow
110-
Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
111-
GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows)
112-
GetStepID() (int, error) // Get the current step ID (only available within workflows)
111+
Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow
112+
Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow
113+
SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow
114+
GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow
115+
Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
116+
GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows)
117+
GetStepID() (int, error) // Get the current step ID (only available within workflows)
113118

114119
// Workflow management
115120
RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow
@@ -118,6 +123,7 @@ type DBOSContext interface {
118123
ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow
119124
ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step
120125
ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria
126+
GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) // Get the execution steps of a workflow
121127

122128
// Accessors
123129
GetApplicationVersion() string // Get the application version for this context
@@ -328,7 +334,6 @@ func NewDBOSContext(inputConfig Config) (DBOSContext, error) {
328334

329335
// Initialize the queue runner and register DBOS internal queue
330336
initExecutor.queueRunner = newQueueRunner(initExecutor.logger)
331-
NewWorkflowQueue(initExecutor, _DBOS_INTERNAL_QUEUE_NAME)
332337

333338
// Initialize conductor if API key is provided
334339
if config.ConductorAPIKey != "" {
@@ -372,17 +377,18 @@ func (c *dbosContext) Launch() error {
372377

373378
// Start the admin server if configured
374379
if c.config.AdminServer {
375-
adminServer := newAdminServer(c, _DEFAULT_ADMIN_SERVER_PORT)
380+
adminServer := newAdminServer(c, c.config.AdminServerPort)
376381
err := adminServer.Start()
377382
if err != nil {
378383
c.logger.Error("Failed to start admin server", "error", err)
379384
return newInitializationError(fmt.Sprintf("failed to start admin server: %v", err))
380385
}
381-
c.logger.Info("Admin server started", "port", _DEFAULT_ADMIN_SERVER_PORT)
386+
c.logger.Info("Admin server started", "port", c.config.AdminServerPort)
382387
c.adminServer = adminServer
383388
}
384389

385390
// Start the queue runner in a goroutine
391+
NewWorkflowQueue(c, _DBOS_INTERNAL_QUEUE_NAME)
386392
go func() {
387393
c.queueRunner.run(c)
388394
}()

0 commit comments

Comments
 (0)