Skip to content

Commit 2a3c538

Browse files
committed
allow user provided executor ID, app version, and make update wf outcome idempotent if status transitions from cancel to error
1 parent c260ad0 commit 2a3c538

File tree

4 files changed

+185
-38
lines changed

4 files changed

+185
-38
lines changed

dbos/dbos.go

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,14 @@ const (
3232
// Config holds configuration parameters for initializing a DBOS context.
3333
// DatabaseURL and AppName are required.
3434
type Config struct {
35-
DatabaseURL string // PostgreSQL connection string (required)
36-
AppName string // Application name for identification (required)
37-
Logger *slog.Logger // Custom logger instance (defaults to a new slog logger)
38-
AdminServer bool // Enable Transact admin HTTP server (disabled by default)
39-
ConductorURL string // DBOS conductor service URL (optional)
40-
ConductorAPIKey string // DBOS conductor API key (optional)
35+
DatabaseURL string // PostgreSQL connection string (required)
36+
AppName string // Application name for identification (required)
37+
Logger *slog.Logger // Custom logger instance (defaults to a new slog logger)
38+
AdminServer bool // Enable Transact admin HTTP server (disabled by default)
39+
ConductorURL string // DBOS conductor service URL (optional)
40+
ConductorAPIKey string // DBOS conductor API key (optional)
41+
ApplicationVersion string // Application version (optional, overridden by DBOS__APPVERSION env var)
42+
ExecutorID string // Executor ID (optional, overridden by DBOS__VMID env var)
4143
}
4244

4345
// processConfig enforces mandatory fields and applies defaults.
@@ -51,19 +53,37 @@ func processConfig(inputConfig *Config) (*Config, error) {
5153
}
5254

5355
dbosConfig := &Config{
54-
DatabaseURL: inputConfig.DatabaseURL,
55-
AppName: inputConfig.AppName,
56-
Logger: inputConfig.Logger,
57-
AdminServer: inputConfig.AdminServer,
58-
ConductorURL: inputConfig.ConductorURL,
59-
ConductorAPIKey: inputConfig.ConductorAPIKey,
56+
DatabaseURL: inputConfig.DatabaseURL,
57+
AppName: inputConfig.AppName,
58+
Logger: inputConfig.Logger,
59+
AdminServer: inputConfig.AdminServer,
60+
ConductorURL: inputConfig.ConductorURL,
61+
ConductorAPIKey: inputConfig.ConductorAPIKey,
62+
ApplicationVersion: inputConfig.ApplicationVersion,
63+
ExecutorID: inputConfig.ExecutorID,
6064
}
6165

6266
// Load defaults
6367
if dbosConfig.Logger == nil {
6468
dbosConfig.Logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo}))
6569
}
6670

71+
// Override with environment variables if set
72+
if envAppVersion := os.Getenv("DBOS__APPVERSION"); envAppVersion != "" {
73+
dbosConfig.ApplicationVersion = envAppVersion
74+
}
75+
if envExecutorID := os.Getenv("DBOS__VMID"); envExecutorID != "" {
76+
dbosConfig.ExecutorID = envExecutorID
77+
}
78+
79+
// Apply defaults for empty values
80+
if dbosConfig.ApplicationVersion == "" {
81+
dbosConfig.ApplicationVersion = computeApplicationVersion()
82+
}
83+
if dbosConfig.ExecutorID == "" {
84+
dbosConfig.ExecutorID = "local"
85+
}
86+
6787
return dbosConfig, nil
6888
}
6989

@@ -292,16 +312,9 @@ func NewDBOSContext(inputConfig Config) (DBOSContext, error) {
292312
var t time.Time
293313
gob.Register(t)
294314

295-
// Initialize global variables with environment variables, providing defaults if not set
296-
initExecutor.applicationVersion = os.Getenv("DBOS__APPVERSION")
297-
if initExecutor.applicationVersion == "" {
298-
initExecutor.applicationVersion = computeApplicationVersion()
299-
}
300-
301-
initExecutor.executorID = os.Getenv("DBOS__VMID")
302-
if initExecutor.executorID == "" {
303-
initExecutor.executorID = "local"
304-
}
315+
// Initialize global variables from processed config (already handles env vars and defaults)
316+
initExecutor.applicationVersion = config.ApplicationVersion
317+
initExecutor.executorID = config.ExecutorID
305318

306319
initExecutor.applicationID = os.Getenv("DBOS__APPID")
307320

dbos/dbos_test.go

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"github.com/stretchr/testify/require"
99
)
1010

11-
func TestConfigValidationErrorTypes(t *testing.T) {
11+
func TestConfig(t *testing.T) {
1212
databaseURL := getDatabaseURL()
1313

1414
t.Run("CreatesDBOSContext", func(t *testing.T) {
@@ -73,4 +73,97 @@ func TestConfigValidationErrorTypes(t *testing.T) {
7373
expectedMsg := "Error initializing DBOS Transact: missing required config field: databaseURL"
7474
assert.Equal(t, expectedMsg, dbosErr.Message)
7575
})
76+
77+
t.Run("ConfigApplicationVersionAndExecutorID", func(t *testing.T) {
78+
t.Run("UsesConfigValues", func(t *testing.T) {
79+
// Clear env vars to ensure we're testing config values
80+
t.Setenv("DBOS__APPVERSION", "")
81+
t.Setenv("DBOS__VMID", "")
82+
83+
ctx, err := NewDBOSContext(Config{
84+
DatabaseURL: databaseURL,
85+
AppName: "test-config-values",
86+
ApplicationVersion: "config-v1.2.3",
87+
ExecutorID: "config-executor-123",
88+
})
89+
require.NoError(t, err)
90+
defer func() {
91+
if ctx != nil {
92+
ctx.Shutdown(1 * time.Minute)
93+
}
94+
}()
95+
96+
assert.Equal(t, "config-v1.2.3", ctx.GetApplicationVersion())
97+
assert.Equal(t, "config-executor-123", ctx.GetExecutorID())
98+
})
99+
100+
t.Run("EnvVarsOverrideConfigValues", func(t *testing.T) {
101+
t.Setenv("DBOS__APPVERSION", "env-v2.0.0")
102+
t.Setenv("DBOS__VMID", "env-executor-456")
103+
104+
ctx, err := NewDBOSContext(Config{
105+
DatabaseURL: databaseURL,
106+
AppName: "test-env-override",
107+
ApplicationVersion: "config-v1.2.3",
108+
ExecutorID: "config-executor-123",
109+
})
110+
require.NoError(t, err)
111+
defer func() {
112+
if ctx != nil {
113+
ctx.Shutdown(1 * time.Minute)
114+
}
115+
}()
116+
117+
// Env vars should override config values
118+
assert.Equal(t, "env-v2.0.0", ctx.GetApplicationVersion())
119+
assert.Equal(t, "env-executor-456", ctx.GetExecutorID())
120+
})
121+
122+
t.Run("UsesDefaultsWhenEmpty", func(t *testing.T) {
123+
// Clear env vars and don't set config values
124+
t.Setenv("DBOS__APPVERSION", "")
125+
t.Setenv("DBOS__VMID", "")
126+
127+
ctx, err := NewDBOSContext(Config{
128+
DatabaseURL: databaseURL,
129+
AppName: "test-defaults",
130+
// ApplicationVersion and ExecutorID left empty
131+
})
132+
require.NoError(t, err)
133+
defer func() {
134+
if ctx != nil {
135+
ctx.Shutdown(1 * time.Minute)
136+
}
137+
}()
138+
139+
// Should use computed application version (hash) and "local" executor ID
140+
appVersion := ctx.GetApplicationVersion()
141+
assert.NotEmpty(t, appVersion, "ApplicationVersion should not be empty")
142+
assert.NotEqual(t, "", appVersion, "ApplicationVersion should have a default value")
143+
144+
executorID := ctx.GetExecutorID()
145+
assert.Equal(t, "local", executorID)
146+
})
147+
148+
t.Run("EnvVarsOverrideEmptyConfig", func(t *testing.T) {
149+
t.Setenv("DBOS__APPVERSION", "env-only-v3.0.0")
150+
t.Setenv("DBOS__VMID", "env-only-executor")
151+
152+
ctx, err := NewDBOSContext(Config{
153+
DatabaseURL: databaseURL,
154+
AppName: "test-env-only",
155+
// ApplicationVersion and ExecutorID left empty
156+
})
157+
require.NoError(t, err)
158+
defer func() {
159+
if ctx != nil {
160+
ctx.Shutdown(1 * time.Minute)
161+
}
162+
}()
163+
164+
// Should use env vars even when config is empty
165+
assert.Equal(t, "env-only-v3.0.0", ctx.GetApplicationVersion())
166+
assert.Equal(t, "env-only-executor", ctx.GetExecutorID())
167+
})
168+
})
76169
}

dbos/system_database.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,7 @@ type updateWorkflowOutcomeDBInput struct {
701701
func (s *sysDB) updateWorkflowOutcome(ctx context.Context, input updateWorkflowOutcomeDBInput) error {
702702
query := `UPDATE dbos.workflow_status
703703
SET status = $1, output = $2, error = $3, updated_at = $4, deduplication_id = NULL
704-
WHERE workflow_uuid = $5`
704+
WHERE workflow_uuid = $5 AND NOT (status = $6 AND $1 = $7)`
705705

706706
outputString, err := serialize(input.output)
707707
if err != nil {
@@ -714,9 +714,9 @@ func (s *sysDB) updateWorkflowOutcome(ctx context.Context, input updateWorkflowO
714714
}
715715

716716
if input.tx != nil {
717-
_, err = input.tx.Exec(ctx, query, input.status, outputString, errorStr, time.Now().UnixMilli(), input.workflowID)
717+
_, err = input.tx.Exec(ctx, query, input.status, outputString, errorStr, time.Now().UnixMilli(), input.workflowID, WorkflowStatusCancelled, WorkflowStatusError)
718718
} else {
719-
_, err = s.pool.Exec(ctx, query, input.status, outputString, errorStr, time.Now().UnixMilli(), input.workflowID)
719+
_, err = s.pool.Exec(ctx, query, input.status, outputString, errorStr, time.Now().UnixMilli(), input.workflowID, WorkflowStatusCancelled, WorkflowStatusError)
720720
}
721721

722722
if err != nil {
@@ -1646,7 +1646,7 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) {
16461646
}
16471647
recordedResult, err := s.checkOperationExecution(ctx, checkInput)
16481648
if err != nil {
1649-
return nil, fmt.Errorf("failed to check operation execution: %w", err)
1649+
return nil, err
16501650
}
16511651
if recordedResult != nil {
16521652
if recordedResult.output != nil {

dbos/workflows_test.go

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,5 @@
11
package dbos
22

3-
/**
4-
Test workflow and steps features
5-
[x] Wrapping various golang methods in DBOS workflows
6-
[x] workflow idempotency
7-
[x] workflow DLQ
8-
[x] workflow conflicting name
9-
[x] workflow timeouts & deadlines (including child workflows)
10-
*/
11-
123
import (
134
"context"
145
"errors"
@@ -1760,15 +1751,15 @@ func TestSendRecv(t *testing.T) {
17601751

17611752
for err := range errors {
17621753
t.Logf("Receiver error (expected): %v", err)
1763-
1754+
17641755
// Check that the error is of the expected type
17651756
dbosErr, ok := err.(*DBOSError)
17661757
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
17671758
require.Equal(t, ConflictingIDError, dbosErr.Code, "expected error code to be ConflictingIDError, got %v", dbosErr.Code)
17681759
require.Equal(t, "concurrent-recv-wfid", dbosErr.WorkflowID, "expected workflow ID to be 'concurrent-recv-wfid', got %s", dbosErr.WorkflowID)
17691760
require.True(t, dbosErr.IsBase, "expected error to have IsBase=true")
17701761
require.Contains(t, dbosErr.Message, "Conflicting workflow ID concurrent-recv-wfid", "expected error message to contain conflicting workflow ID")
1771-
1762+
17721763
errorCount++
17731764
}
17741765

@@ -2932,6 +2923,56 @@ func TestWorkflowAtVersion(t *testing.T) {
29322923
assert.Equal(t, version, status.ApplicationVersion, "expected correct application version")
29332924
}
29342925

2926+
func TestWorkflowCancel(t *testing.T) {
2927+
dbosCtx := setupDBOS(t, true, true)
2928+
2929+
blockingEvent := NewEvent()
2930+
2931+
// Workflow that waits for an event, then calls Recv(). Returns raw error if Recv fails
2932+
blockingWorkflow := func(ctx DBOSContext, topic string) (string, error) {
2933+
// Wait for the event
2934+
blockingEvent.Wait()
2935+
2936+
// Now call Recv() - this should fail if the workflow is cancelled
2937+
msg, err := Recv[string](ctx, topic, 5*time.Second)
2938+
if err != nil {
2939+
return "", err // Return the raw error from Recv
2940+
}
2941+
return msg, nil
2942+
}
2943+
RegisterWorkflow(dbosCtx, blockingWorkflow)
2944+
2945+
t.Run("TestWorkflowCancel", func(t *testing.T) {
2946+
topic := "cancel-test-topic"
2947+
2948+
// Start the blocking workflow
2949+
handle, err := RunAsWorkflow(dbosCtx, blockingWorkflow, topic)
2950+
require.NoError(t, err, "failed to start blocking workflow")
2951+
2952+
// Cancel the workflow using DBOS.CancelWorkflow
2953+
err = CancelWorkflow(dbosCtx, handle.GetWorkflowID())
2954+
require.NoError(t, err, "failed to cancel workflow")
2955+
2956+
// Signal the event so the workflow can move on to Recv()
2957+
blockingEvent.Set()
2958+
2959+
// Check the return values of the workflow
2960+
result, err := handle.GetResult()
2961+
require.Error(t, err, "expected error from cancelled workflow")
2962+
assert.Equal(t, "", result, "expected empty result from cancelled workflow")
2963+
2964+
// Check that we get a DBOSError with AwaitedWorkflowCancelled code
2965+
var dbosErr *DBOSError
2966+
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
2967+
assert.Equal(t, WorkflowCancelled, dbosErr.Code, "expected AwaitedWorkflowCancelled error code, got: %v", dbosErr.Code)
2968+
2969+
// Ensure the workflow status is of an error type
2970+
status, err := handle.GetStatus()
2971+
require.NoError(t, err, "failed to get workflow status")
2972+
assert.Equal(t, WorkflowStatusCancelled, status.Status, "expected workflow status to be WorkflowStatusCancelled")
2973+
})
2974+
}
2975+
29352976
var cancelAllBeforeBlockEvent = NewEvent()
29362977

29372978
func cancelAllBeforeBlockingWorkflow(ctx DBOSContext, input string) (string, error) {

0 commit comments

Comments
 (0)