Skip to content

Commit 93f4205

Browse files
authored
Toward executor objects (#44)
This PR: - Encapsulate the state of a "DBOS executor" in an interface exported by the package. This must be typeless, hence: - Preserves compile-time type checking by exposing package-level methods that accept a DBOS executor interface - Attempts to keep the programming interface reasonable by creating a `DBOSContext` type which holds both DBOS executor functionalities and extends the native `context.Context` interface. The mirror functions are made with the same signature so it is more intuitive for users when mocking. Few things to note/improve/think about - Workflow functions now require our `DBOSContext` instead of a `context.Context`. This is because all operations need a context. Steps on the other hand can accept a normal context.Context if they wish to, and upcast it to `DBOSContext` if they need to do DBOS stuff. - DBOSContext is local to a process. This means the user cannot expect a context to: 1) be passed to the workflow at recovery 2) be passed to a queued task by the queue runner. This is quite un-intuitive w.r.t native Golang `context.Context` - SetEvent / Send types are not greatly named - We should find a way for library specific contexts to play together. Right now a gin handler looks like: `func checkoutEndpoint(c *gin.Context, dbosCtx dbos.DBOSContext, logger *logrus.Logger) {` Another thing this PR does, on the path of improving the step interface, is allowing users to pass step parameters through the context, using the `WithValue` method. We'll have to improve on this as well: right now `WithValue` does nothing if the provided interface is not a concrete `dbosContext` (our internal struct implementing `DBOSContext`), so a user cannot rely on this in their tests if they mock `DBOSContext`. Immediate next PRs: - Move logger, queue registry in DBOSContext - Tighten usage of context to manage resources like queue runner - Add tests running workflows inside goroutines - Improve step UX `DBOSContext` exposes all the DBOS methods an end-user is expected to write in their code: ```golang type DBOSContext interface { context.Context // Context Lifecycle Launch() error Shutdown() // Workflow operations RunAsStep(_ DBOSContext, fn StepFunc, input ...any) (any, error) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) Send(_ DBOSContext, input WorkflowSendInputInternal) error Recv(_ DBOSContext, input WorkflowRecvInput) (any, error) SetEvent(_ DBOSContext, input WorkflowSetEventInput) error GetEvent(_ DBOSContext, input WorkflowGetEventInput) (any, error) Sleep(duration time.Duration) (time.Duration, error) GetWorkflowID() (string, error) // Workflow management RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Accessors GetApplicationVersion() string GetExecutorID() string GetApplicationID() string } ``` New library usage: ```golang dbosContext, err := dbos.NewDBOSContext(dbos.Config{ AppName: "widget_store_go", DatabaseURL: os.Getenv("DBOS_SYSTEM_DATABASE_URL"), }) if err != nil { logger.WithError(err).Fatal("DBOS initialization failed") } dbos.RegisterWorkflow(dbosContext, checkoutWorkflow) dbos.RegisterWorkflow(dbosContext, dispatchOrderWorkflow) err = dbosContext.Launch() if err != nil { logger.WithError(err).Fatal("DBOS service start failed") } defer dbosContext.Shutdown() ``` Starting workflows: ```golang handle, err := dbos.RunAsWorkflow(dbosCtx, checkoutWorkflow, "", dbos.WithWorkflowID(idempotencyKey)) ``` The testing can be done as: ```golang dbosContextMock := mocks.NewMockDBOSContext(t) // Test running the wrapped workflow t.Run("Payment fails", func(t *testing.T) { wfID := "test-workflow-id" // Set expectations on what DBOS stuff that happens within the workflow dbosContextMock.On("GetWorkflowID").Return(wfID, nil) dbosContextMock.On("RunAsStep", dbosContextMock, mock.Anything, mock.Anything).Return(1, nil).Once() dbosContextMock.On("RunAsStep", dbosContextMock, mock.Anything, mock.Anything).Return(false, nil).Once() dbosContextMock.On("RunAsStep", dbosContextMock, mock.Anything, mock.Anything).Return("", nil).Once() dbosContextMock.On("SetEvent", dbosContextMock, mock.Anything).Return(nil).Once() res, err := checkoutWorkflow(dbosContextMock, "") if err != nil { t.Fatalf("checkout workflow failed: %v", err) } if res != "" { t.Fatalf("expected empty result, got %s", res) } dbosContextMock.AssertExpectations(t) }) ```
1 parent 77b5acf commit 93f4205

File tree

13 files changed

+1260
-852
lines changed

13 files changed

+1260
-852
lines changed

dbos/admin_server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type queueMetadata struct {
2525
RateLimit *RateLimiter `json:"rateLimit,omitempty"`
2626
}
2727

28-
func newAdminServer(port int) *adminServer {
28+
func newAdminServer(ctx *dbosContext, port int) *adminServer {
2929
mux := http.NewServeMux()
3030

3131
// Health endpoint
@@ -50,7 +50,7 @@ func newAdminServer(port int) *adminServer {
5050

5151
getLogger().Info("Recovering workflows for executors", "executors", executorIDs)
5252

53-
handles, err := recoverPendingWorkflows(r.Context(), executorIDs)
53+
handles, err := recoverPendingWorkflows(ctx, executorIDs)
5454
if err != nil {
5555
getLogger().Error("Error recovering workflows", "error", err)
5656
http.Error(w, fmt.Sprintf("Recovery failed: %v", err), http.StatusInternalServerError)

dbos/admin_server_test.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,27 @@ import (
1111
)
1212

1313
func TestAdminServer(t *testing.T) {
14-
databaseURL := getDatabaseURL(t)
14+
databaseURL := getDatabaseURL()
1515

1616
t.Run("Admin server is not started by default", func(t *testing.T) {
17-
// Ensure clean state
18-
Shutdown()
1917

20-
err := Initialize(Config{
18+
ctx, err := NewDBOSContext(Config{
2119
DatabaseURL: databaseURL,
2220
AppName: "test-app",
2321
})
2422
if err != nil {
2523
t.Skipf("Failed to initialize DBOS: %v", err)
2624
}
27-
err = Launch()
25+
err = ctx.Launch()
2826
if err != nil {
2927
t.Skipf("Failed to initialize DBOS: %v", err)
3028
}
3129

3230
// Ensure cleanup
3331
defer func() {
34-
Shutdown()
32+
if ctx != nil {
33+
ctx.Shutdown()
34+
}
3535
}()
3636

3737
// Give time for any startup processes
@@ -45,46 +45,48 @@ func TestAdminServer(t *testing.T) {
4545
}
4646

4747
// Verify the DBOS executor doesn't have an admin server instance
48-
if dbos == nil {
48+
if ctx == nil {
4949
t.Fatal("Expected DBOS instance to be created")
5050
}
5151

52-
if dbos.adminServer != nil {
52+
exec := ctx.(*dbosContext)
53+
if exec.adminServer != nil {
5354
t.Error("Expected admin server to be nil when not configured")
5455
}
5556
})
5657

5758
t.Run("Admin server endpoints", func(t *testing.T) {
58-
Shutdown()
59-
6059
// Launch DBOS with admin server once for all endpoint tests
61-
err := Initialize(Config{
60+
ctx, err := NewDBOSContext(Config{
6261
DatabaseURL: databaseURL,
6362
AppName: "test-app",
6463
AdminServer: true,
6564
})
6665
if err != nil {
6766
t.Skipf("Failed to initialize DBOS with admin server: %v", err)
6867
}
69-
err = Launch()
68+
err = ctx.Launch()
7069
if err != nil {
7170
t.Skipf("Failed to initialize DBOS with admin server: %v", err)
7271
}
7372

7473
// Ensure cleanup
7574
defer func() {
76-
Shutdown()
75+
if ctx != nil {
76+
ctx.Shutdown()
77+
}
7778
}()
7879

7980
// Give the server a moment to start
8081
time.Sleep(100 * time.Millisecond)
8182

8283
// Verify the DBOS executor has an admin server instance
83-
if dbos == nil {
84+
if ctx == nil {
8485
t.Fatal("Expected DBOS instance to be created")
8586
}
8687

87-
if dbos.adminServer == nil {
88+
exec := ctx.(*dbosContext)
89+
if exec.adminServer == nil {
8890
t.Fatal("Expected admin server to be created in DBOS instance")
8991
}
9092

0 commit comments

Comments
 (0)