Skip to content

Commit 95f06a8

Browse files
authored
update scopes + DBOSExecutor singleton (#36)
Builder pattern for the DBOS singleton: - Split building the object and launching - Stop exposing internal variables - Make app name & db url mandatory to init DBOS. This means using a config struct rather than functional options. **Manipulating the singleton object** ```golang package main import ( "context" "fmt" "os" "github.com/dbos-inc/dbos-transact-go/dbos" ) var ( wf = dbos.WithWorkflow(workflow) ) func workflow(ctx context.Context, input string) (string, error) { return input, nil } func main() { err := dbos.Initialize(dbos.Config{ AppName: "dbos-hello", DatabaseURL: os.Getenv("DBOS_SYSTEM_DATABASE_URL"), }) if err != nil { panic(err) } err = dbos.Launch() if err != nil { panic(err) } r, err := wf(context.Background(), "Hello, DBOS!") if err != nil { panic(err) } fmt.Println(r) dbos.Shutdown() } ``` If a user needs to pass the DBOS singleton around and mock it, they can create a mock interface and write mocks manually or generate them with something like [mockery](https://github.com/vektra/mockery). **What is public** - Config - WorkflowStatus, Workflow Status types (e.g., dbos.WorkflowStatusPending) - The WorkflowHandle interface (concrete internal types: workflowHandle and workflowPollingHandle) - WithWorkflow wrapper, its functional options, and the returned WorkflowWrapperFunc (not sure whether the latter is actually useful to a user) - WorkflowFunc and StepFunc so users know what they can wrap - Single methods (Send, Recv, RetrieveWorkflow) - Input types for single methods like Send (WorkflowSendInput) - Error types - Queues: WorkflowQueue, NewWorkflowQueue, function options - SystemDatabase: I think it'll be useful to eventually allow users to inject a non-pg implementation (e.g., a mock) for testing. - StepInfo
1 parent 274525f commit 95f06a8

File tree

13 files changed

+527
-526
lines changed

13 files changed

+527
-526
lines changed

dbos/admin-server.go renamed to dbos/admin_server.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,34 +9,34 @@ import (
99
)
1010

1111
const (
12-
HealthCheckPath = "/dbos-healthz"
13-
WorkflowRecoveryPath = "/dbos-workflow-recovery"
14-
WorkflowQueuesMetadataPath = "/dbos-workflow-queues-metadata"
12+
healthCheckPath = "/dbos-healthz"
13+
workflowRecoveryPath = "/dbos-workflow-recovery"
14+
workflowQueuesMetadataPath = "/dbos-workflow-queues-metadata"
1515
)
1616

17-
type AdminServer struct {
17+
type adminServer struct {
1818
server *http.Server
1919
}
2020

21-
type QueueMetadata struct {
21+
type queueMetadata struct {
2222
Name string `json:"name"`
2323
Concurrency *int `json:"concurrency,omitempty"`
2424
WorkerConcurrency *int `json:"workerConcurrency,omitempty"`
2525
RateLimit *RateLimiter `json:"rateLimit,omitempty"`
2626
}
2727

28-
func NewAdminServer(port int) *AdminServer {
28+
func newAdminServer(port int) *adminServer {
2929
mux := http.NewServeMux()
3030

3131
// Health endpoint
32-
mux.HandleFunc(HealthCheckPath, func(w http.ResponseWriter, r *http.Request) {
32+
mux.HandleFunc(healthCheckPath, func(w http.ResponseWriter, r *http.Request) {
3333
w.Header().Set("Content-Type", "application/json")
3434
w.WriteHeader(http.StatusOK)
3535
w.Write([]byte(`{"status":"healthy"}`))
3636
})
3737

3838
// Recovery endpoint
39-
mux.HandleFunc(WorkflowRecoveryPath, func(w http.ResponseWriter, r *http.Request) {
39+
mux.HandleFunc(workflowRecoveryPath, func(w http.ResponseWriter, r *http.Request) {
4040
if r.Method != http.MethodPost {
4141
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
4242
return
@@ -71,21 +71,21 @@ func NewAdminServer(port int) *AdminServer {
7171
})
7272

7373
// Queue metadata endpoint
74-
mux.HandleFunc(WorkflowQueuesMetadataPath, func(w http.ResponseWriter, r *http.Request) {
74+
mux.HandleFunc(workflowQueuesMetadataPath, func(w http.ResponseWriter, r *http.Request) {
7575
if r.Method != http.MethodGet {
7676
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
7777
return
7878
}
7979

80-
var queueMetadataArray []QueueMetadata
80+
var queueMetadataArray []queueMetadata
8181

8282
// Iterate through all queues in the registry
8383
for _, queue := range workflowQueueRegistry {
84-
queueMetadata := QueueMetadata{
85-
Name: queue.Name,
86-
WorkerConcurrency: queue.WorkerConcurrency,
87-
Concurrency: queue.GlobalConcurrency,
88-
RateLimit: queue.Limiter,
84+
queueMetadata := queueMetadata{
85+
Name: queue.name,
86+
WorkerConcurrency: queue.workerConcurrency,
87+
Concurrency: queue.globalConcurrency,
88+
RateLimit: queue.limiter,
8989
}
9090

9191
queueMetadataArray = append(queueMetadataArray, queueMetadata)
@@ -103,12 +103,12 @@ func NewAdminServer(port int) *AdminServer {
103103
Handler: mux,
104104
}
105105

106-
return &AdminServer{
106+
return &adminServer{
107107
server: server,
108108
}
109109
}
110110

111-
func (as *AdminServer) Start() error {
111+
func (as *adminServer) Start() error {
112112
getLogger().Info("Starting admin server", "port", 3001)
113113

114114
go func() {
@@ -120,7 +120,7 @@ func (as *AdminServer) Start() error {
120120
return nil
121121
}
122122

123-
func (as *AdminServer) Shutdown() error {
123+
func (as *adminServer) Shutdown() error {
124124
getLogger().Info("Shutting down admin server")
125125

126126
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

dbos/admin-server_test.go renamed to dbos/admin_server_test.go

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,40 +5,41 @@ import (
55
"encoding/json"
66
"io"
77
"net/http"
8-
"os"
98
"strings"
109
"testing"
1110
"time"
1211
)
1312

1413
func TestAdminServer(t *testing.T) {
15-
// Skip if database is not available
16-
databaseURL := os.Getenv("DBOS_SYSTEM_DATABASE_URL")
17-
if databaseURL == "" && os.Getenv("PGPASSWORD") == "" {
18-
t.Skip("Database not available (DBOS_SYSTEM_DATABASE_URL and PGPASSWORD not set), skipping DBOS integration tests")
19-
}
14+
databaseURL := getDatabaseURL(t)
2015

21-
t.Run("Admin server is not started without WithAdminServer option", func(t *testing.T) {
16+
t.Run("Admin server is not started by default", func(t *testing.T) {
2217
// Ensure clean state
23-
if dbos != nil {
24-
Shutdown()
25-
}
18+
Shutdown()
2619

27-
// Launch DBOS without admin server option
28-
err := Launch()
20+
err := Initialize(Config{
21+
DatabaseURL: databaseURL,
22+
AppName: "test-app",
23+
})
2924
if err != nil {
30-
t.Skipf("Failed to launch DBOS (database likely not available): %v", err)
25+
t.Skipf("Failed to initialize DBOS: %v", err)
26+
}
27+
err = Launch()
28+
if err != nil {
29+
t.Skipf("Failed to initialize DBOS: %v", err)
3130
}
3231

3332
// Ensure cleanup
34-
defer Shutdown()
33+
defer func() {
34+
Shutdown()
35+
}()
3536

3637
// Give time for any startup processes
3738
time.Sleep(100 * time.Millisecond)
3839

3940
// Verify admin server is not running
4041
client := &http.Client{Timeout: 1 * time.Second}
41-
_, err = client.Get("http://localhost:3001" + HealthCheckPath)
42+
_, err = client.Get("http://localhost:3001" + healthCheckPath)
4243
if err == nil {
4344
t.Error("Expected request to fail when admin server is not started, but it succeeded")
4445
}
@@ -54,19 +55,26 @@ func TestAdminServer(t *testing.T) {
5455
})
5556

5657
t.Run("Admin server endpoints", func(t *testing.T) {
57-
// Ensure clean state
58-
if dbos != nil {
59-
Shutdown()
60-
}
58+
Shutdown()
6159

6260
// Launch DBOS with admin server once for all endpoint tests
63-
err := Launch(WithAdminServer())
61+
err := Initialize(Config{
62+
DatabaseURL: databaseURL,
63+
AppName: "test-app",
64+
AdminServer: true,
65+
})
6466
if err != nil {
65-
t.Skipf("Failed to launch DBOS with admin server (database likely not available): %v", err)
67+
t.Skipf("Failed to initialize DBOS with admin server: %v", err)
68+
}
69+
err = Launch()
70+
if err != nil {
71+
t.Skipf("Failed to initialize DBOS with admin server: %v", err)
6672
}
6773

6874
// Ensure cleanup
69-
defer Shutdown()
75+
defer func() {
76+
Shutdown()
77+
}()
7078

7179
// Give the server a moment to start
7280
time.Sleep(100 * time.Millisecond)
@@ -94,13 +102,13 @@ func TestAdminServer(t *testing.T) {
94102
{
95103
name: "Health endpoint responds correctly",
96104
method: "GET",
97-
endpoint: "http://localhost:3001" + HealthCheckPath,
105+
endpoint: "http://localhost:3001" + healthCheckPath,
98106
expectedStatus: http.StatusOK,
99107
},
100108
{
101109
name: "Recovery endpoint responds correctly with valid JSON",
102110
method: "POST",
103-
endpoint: "http://localhost:3001" + WorkflowRecoveryPath,
111+
endpoint: "http://localhost:3001" + workflowRecoveryPath,
104112
body: bytes.NewBuffer(mustMarshal([]string{"executor1", "executor2"})),
105113
contentType: "application/json",
106114
expectedStatus: http.StatusOK,
@@ -117,24 +125,24 @@ func TestAdminServer(t *testing.T) {
117125
{
118126
name: "Recovery endpoint rejects invalid methods",
119127
method: "GET",
120-
endpoint: "http://localhost:3001" + WorkflowRecoveryPath,
128+
endpoint: "http://localhost:3001" + workflowRecoveryPath,
121129
expectedStatus: http.StatusMethodNotAllowed,
122130
},
123131
{
124132
name: "Recovery endpoint rejects invalid JSON",
125133
method: "POST",
126-
endpoint: "http://localhost:3001" + WorkflowRecoveryPath,
134+
endpoint: "http://localhost:3001" + workflowRecoveryPath,
127135
body: strings.NewReader(`{"invalid": json}`),
128136
contentType: "application/json",
129137
expectedStatus: http.StatusBadRequest,
130138
},
131139
{
132140
name: "Queue metadata endpoint responds correctly",
133141
method: "GET",
134-
endpoint: "http://localhost:3001" + WorkflowQueuesMetadataPath,
142+
endpoint: "http://localhost:3001" + workflowQueuesMetadataPath,
135143
expectedStatus: http.StatusOK,
136144
validateResp: func(t *testing.T, resp *http.Response) {
137-
var queueMetadata []QueueMetadata
145+
var queueMetadata []queueMetadata
138146
if err := json.NewDecoder(resp.Body).Decode(&queueMetadata); err != nil {
139147
t.Errorf("Failed to decode response as QueueMetadata array: %v", err)
140148
}
@@ -170,7 +178,7 @@ func TestAdminServer(t *testing.T) {
170178
{
171179
name: "Queue metadata endpoint rejects invalid methods",
172180
method: "POST",
173-
endpoint: "http://localhost:3001" + WorkflowQueuesMetadataPath,
181+
endpoint: "http://localhost:3001" + workflowQueuesMetadataPath,
174182
expectedStatus: http.StatusMethodNotAllowed,
175183
},
176184
}

0 commit comments

Comments
 (0)