Skip to content

Commit 3aaf85a

Browse files
authored
Bag o stuff (#56)
Major fix: deadline computation for enqueued workflows. Previously we'd actually set the deadline when the workflow was enqueued, not dequeued. This PR fixes it by considering the timeout when setting the final deadline. (Note: we also revert the change from #53 and let `resumeWorkflow` clear the deadline. It'll be recomputed from the timeout in `RunAsWorkflow`.) Also: - Add a test `TimeoutOnlySetOnDequeue`, to verify the above fix - Record a workflow error when the function is called with the wrong parameter types. This could happen if the client enqueues a workflow with the wrong input type. - Prevent the spawning of child workflows within steps and add a test (`ChildWorkflowCannotBeSpawnedFromStep`) - Add a GHA to perform vulnerability checks and static analysis - Set toolchain to 1.25 (and update test GHA accordingly). Note this is *not* the minimum version required by library consumers. - Run `go vet` in test GHA - Add "join discord" badge to README - Make hardcoded values package level constants - Fix a small bug where returning the admin server object would be missing the port - Handle errors when writing the response in the healthcheck admin endpoint - Set `ReadHeaderTimeout` on the admin server to prevent Slowloris attacks - Handle sub millisecond timeouts (round to next millisecond) - Handle negative timeouts (can happen with propagation delays): set timeout to 1ms - Use a sync.Map for the workflow registry - Check the executable is a regular file before opening it to compute its hash - Remove unused WorkflowFunctionNotFound -- I don't see a use case for it - Use `struct{}{}` instead of `bool` for notification channels (`struct{}{}` use zero space) - Use an input struct for `dequeueWorkflows` - Cleanup handles implementation with a base handler that implement once shared logic (`GetStatus`, `GetWorkflowID`). other handles compose with this struct. - Handle errors when failing to close notification listener connection - Handle error when closing the connection used to set `LISTEN` channels in Postgres - Handle errors (and print a log entry) when failing to run a scheduled workflow - Fix possible integer overflows when converting uint to int (for queue priorities and fork start step) - Complete DLQ test: resume the DLQ-ed workflow and check it can now run to completion. Check that the completed workflow can be ran many times past the retry limit. - Pass `workflowCustomNametoFQN` to new contexts
1 parent 584e8b2 commit 3aaf85a

File tree

15 files changed

+523
-324
lines changed

15 files changed

+523
-324
lines changed

.github/workflows/security.yml

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
name: Security Checks
2+
3+
on:
4+
push:
5+
branches:
6+
- main
7+
pull_request:
8+
branches:
9+
types:
10+
- ready_for_review
11+
- opened
12+
- reopened
13+
- synchronize
14+
workflow_dispatch:
15+
16+
jobs:
17+
security:
18+
runs-on: ubuntu-latest
19+
20+
steps:
21+
- uses: actions/checkout@v4
22+
with:
23+
fetch-depth: 0
24+
25+
- name: Setup Go
26+
uses: actions/setup-go@v5
27+
with:
28+
go-version: '1.25.x'
29+
30+
- name: Cache Go modules
31+
uses: actions/cache@v4
32+
with:
33+
path: |
34+
~/go/pkg/mod
35+
~/.cache/go-build
36+
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
37+
restore-keys: |
38+
${{ runner.os }}-go-
39+
40+
- name: Download dependencies
41+
run: go mod download
42+
43+
- name: Install security tools
44+
run: |
45+
go install golang.org/x/vuln/cmd/govulncheck@latest
46+
go install github.com/securego/gosec/v2/cmd/gosec@latest
47+
48+
- name: Run govulncheck
49+
run: govulncheck ./...
50+
51+
- name: Run gosec
52+
run: gosec ./...

.github/workflows/tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ jobs:
4242
- name: Setup Go
4343
uses: actions/setup-go@v5
4444
with:
45-
go-version: '1.23.x'
45+
go-version: '1.25.x'
4646

4747
- name: Cache Go modules
4848
uses: actions/cache@v4
@@ -61,7 +61,7 @@ jobs:
6161
run: go install gotest.tools/gotestsum@latest
6262

6363
- name: Run tests
64-
run: gotestsum --format github-action -- -race ./...
64+
run: go vet ./... && gotestsum --format github-action -- -race ./...
6565
working-directory: ./dbos
6666
env:
6767
PGPASSWORD: a!b@c$d()e*_,/:;=?@ff[]22

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
[![Go Reference](https://pkg.go.dev/badge/github.com/dbos-inc/dbos-transact-go.svg)](https://pkg.go.dev/github.com/dbos-inc/dbos-transact-go)
44
[![Go Report Card](https://goreportcard.com/badge/github.com/dbos-inc/dbos-transact-go)](https://goreportcard.com/report/github.com/dbos-inc/dbos-transact-go)
55
[![GitHub release (latest SemVer)](https://img.shields.io/github/v/release/dbos-inc/dbos-transact-go?sort=semver)](https://github.com/dbos-inc/dbos-transact-go/releases)
6+
[![Join Discord](https://img.shields.io/badge/Discord-Join%20Chat-5865F2?logo=discord&logoColor=white)](https://discord.com/invite/jsmC6pXGgX)
67

78

89
# DBOS Transact: Lightweight Durable Workflow Orchestration with Postgres
910

10-
#### [Documentation](https://docs.dbos.dev/)      [Examples](https://docs.dbos.dev/examples)      [Github](https://github.com/dbos-inc)      [Discord](https://discord.com/invite/jsmC6pXGgX)
11+
#### [Documentation](https://docs.dbos.dev/)      [Examples](https://docs.dbos.dev/examples)      [Github](https://github.com/dbos-inc)
1112
</div>
1213

1314
#### This Golang version of DBOS Transact is in Alpha!

dbos/admin_server.go

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,28 +10,37 @@ import (
1010
)
1111

1212
const (
13-
healthCheckPath = "/dbos-healthz"
14-
workflowRecoveryPath = "/dbos-workflow-recovery"
15-
workflowQueuesMetadataPath = "/dbos-workflow-queues-metadata"
13+
_HEALTHCHECK_PATH = "/dbos-healthz"
14+
_WORKFLOW_RECOVERY_PATH = "/dbos-workflow-recovery"
15+
_WORKFLOW_QUEUES_METADATA_PATH = "/dbos-workflow-queues-metadata"
16+
17+
_ADMIN_SERVER_READ_HEADER_TIMEOUT = 5 * time.Second
18+
_ADMIN_SERVER_SHUTDOWN_TIMEOUT = 10 * time.Second
1619
)
1720

1821
type adminServer struct {
1922
server *http.Server
2023
logger *slog.Logger
24+
port int
2125
}
2226

2327
func newAdminServer(ctx *dbosContext, port int) *adminServer {
2428
mux := http.NewServeMux()
2529

2630
// Health endpoint
27-
mux.HandleFunc(healthCheckPath, func(w http.ResponseWriter, r *http.Request) {
31+
mux.HandleFunc(_HEALTHCHECK_PATH, func(w http.ResponseWriter, r *http.Request) {
2832
w.Header().Set("Content-Type", "application/json")
2933
w.WriteHeader(http.StatusOK)
30-
w.Write([]byte(`{"status":"healthy"}`))
34+
_, err := w.Write([]byte(`{"status":"healthy"}`))
35+
if err != nil {
36+
ctx.logger.Error("Error writing health check response", "error", err)
37+
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
38+
return
39+
}
3140
})
3241

3342
// Recovery endpoint
34-
mux.HandleFunc(workflowRecoveryPath, func(w http.ResponseWriter, r *http.Request) {
43+
mux.HandleFunc(_WORKFLOW_RECOVERY_PATH, func(w http.ResponseWriter, r *http.Request) {
3544
if r.Method != http.MethodPost {
3645
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
3746
return
@@ -67,7 +76,7 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
6776
})
6877

6978
// Queue metadata endpoint
70-
mux.HandleFunc(workflowQueuesMetadataPath, func(w http.ResponseWriter, r *http.Request) {
79+
mux.HandleFunc(_WORKFLOW_QUEUES_METADATA_PATH, func(w http.ResponseWriter, r *http.Request) {
7180
if r.Method != http.MethodGet {
7281
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
7382
return
@@ -84,18 +93,20 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
8493
})
8594

8695
server := &http.Server{
87-
Addr: fmt.Sprintf(":%d", port),
88-
Handler: mux,
96+
Addr: fmt.Sprintf(":%d", port),
97+
Handler: mux,
98+
ReadHeaderTimeout: _ADMIN_SERVER_READ_HEADER_TIMEOUT,
8999
}
90100

91101
return &adminServer{
92102
server: server,
93103
logger: ctx.logger,
104+
port: port,
94105
}
95106
}
96107

97108
func (as *adminServer) Start() error {
98-
as.logger.Info("Starting admin server", "port", 3001)
109+
as.logger.Info("Starting admin server", "port", as.port)
99110

100111
go func() {
101112
if err := as.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
@@ -109,8 +120,8 @@ func (as *adminServer) Start() error {
109120
func (as *adminServer) Shutdown(ctx context.Context) error {
110121
as.logger.Info("Shutting down admin server")
111122

112-
// XXX consider moving the grace period to DBOSContext.Shutdown()
113-
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
123+
// Note: consider moving the grace period to DBOSContext.Shutdown()
124+
ctx, cancel := context.WithTimeout(ctx, _ADMIN_SERVER_SHUTDOWN_TIMEOUT)
114125
defer cancel()
115126

116127
if err := as.server.Shutdown(ctx); err != nil {

dbos/admin_server_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func TestAdminServer(t *testing.T) {
3838

3939
// Verify admin server is not running
4040
client := &http.Client{Timeout: 1 * time.Second}
41-
_, err = client.Get("http://localhost:3001" + healthCheckPath)
41+
_, err = client.Get("http://localhost:3001" + _HEALTHCHECK_PATH)
4242
require.Error(t, err, "Expected request to fail when admin server is not started")
4343

4444
// Verify the DBOS executor doesn't have an admin server instance
@@ -89,13 +89,13 @@ func TestAdminServer(t *testing.T) {
8989
{
9090
name: "Health endpoint responds correctly",
9191
method: "GET",
92-
endpoint: "http://localhost:3001" + healthCheckPath,
92+
endpoint: "http://localhost:3001" + _HEALTHCHECK_PATH,
9393
expectedStatus: http.StatusOK,
9494
},
9595
{
9696
name: "Recovery endpoint responds correctly with valid JSON",
9797
method: "POST",
98-
endpoint: "http://localhost:3001" + workflowRecoveryPath,
98+
endpoint: "http://localhost:3001" + _WORKFLOW_RECOVERY_PATH,
9999
body: bytes.NewBuffer(mustMarshal([]string{"executor1", "executor2"})),
100100
contentType: "application/json",
101101
expectedStatus: http.StatusOK,
@@ -109,21 +109,21 @@ func TestAdminServer(t *testing.T) {
109109
{
110110
name: "Recovery endpoint rejects invalid methods",
111111
method: "GET",
112-
endpoint: "http://localhost:3001" + workflowRecoveryPath,
112+
endpoint: "http://localhost:3001" + _WORKFLOW_RECOVERY_PATH,
113113
expectedStatus: http.StatusMethodNotAllowed,
114114
},
115115
{
116116
name: "Recovery endpoint rejects invalid JSON",
117117
method: "POST",
118-
endpoint: "http://localhost:3001" + workflowRecoveryPath,
118+
endpoint: "http://localhost:3001" + _WORKFLOW_RECOVERY_PATH,
119119
body: strings.NewReader(`{"invalid": json}`),
120120
contentType: "application/json",
121121
expectedStatus: http.StatusBadRequest,
122122
},
123123
{
124124
name: "Queue metadata endpoint responds correctly",
125125
method: "GET",
126-
endpoint: "http://localhost:3001" + workflowQueuesMetadataPath,
126+
endpoint: "http://localhost:3001" + _WORKFLOW_QUEUES_METADATA_PATH,
127127
expectedStatus: http.StatusOK,
128128
validateResp: func(t *testing.T, resp *http.Response) {
129129
var queueMetadata []WorkflowQueue
@@ -149,7 +149,7 @@ func TestAdminServer(t *testing.T) {
149149
{
150150
name: "Queue metadata endpoint rejects invalid methods",
151151
method: "POST",
152-
endpoint: "http://localhost:3001" + workflowQueuesMetadataPath,
152+
endpoint: "http://localhost:3001" + _WORKFLOW_QUEUES_METADATA_PATH,
153153
expectedStatus: http.StatusMethodNotAllowed,
154154
},
155155
}

dbos/client_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -473,10 +473,6 @@ func TestCancelResume(t *testing.T) {
473473
// Verify the deadline was reset (should be different from original)
474474
assert.False(t, resumeStatus.Deadline.Equal(originalDeadline), "expected deadline to be reset after resume, but it remained the same: %v", originalDeadline)
475475

476-
// The new deadline should be after resumeStart + workflowTimeout
477-
expectedDeadline := resumeStart.Add(workflowTimeout - 100*time.Millisecond) // Allow some leeway for processing time
478-
assert.True(t, resumeStatus.Deadline.After(expectedDeadline), "deadline %v is too early (expected around %v)", resumeStatus.Deadline, expectedDeadline)
479-
480476
// Wait for the workflow to complete
481477
_, err = resumeHandle.GetResult()
482478
require.Error(t, err, "expected timeout error, but got none")
@@ -491,6 +487,10 @@ func TestCancelResume(t *testing.T) {
491487
finalStatus, err := resumeHandle.GetStatus()
492488
require.NoError(t, err, "failed to get final workflow status")
493489

490+
// The new deadline should have been set after resumeStart + workflowTimeout
491+
expectedDeadline := resumeStart.Add(workflowTimeout - 100*time.Millisecond) // Allow some leeway for processing time
492+
assert.True(t, finalStatus.Deadline.After(expectedDeadline), "deadline %v is too early (expected around %v)", resumeStatus.Deadline, expectedDeadline)
493+
494494
assert.Equal(t, WorkflowStatusCancelled, finalStatus.Status, "expected final workflow status to be CANCELLED")
495495

496496
require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after cancel/resume timeout test")

dbos/dbos.go

Lines changed: 50 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"io"
1616
"log/slog"
1717
"os"
18+
"path/filepath"
1819
"sync"
1920
"sync/atomic"
2021
"time"
@@ -119,10 +120,9 @@ type dbosContext struct {
119120
// Wait group for workflow goroutines
120121
workflowsWg *sync.WaitGroup
121122

122-
// Workflow registry
123-
workflowRegistry map[string]workflowRegistryEntry
124-
workflowRegMutex *sync.RWMutex
125-
workflowCustomNametoFQN sync.Map // Maps fully qualified workflow names to custom names. Usefor when client enqueues a workflow by name because registry is indexed by FQN.
123+
// Workflow registry - read-mostly sync.Map since registration happens only before launch
124+
workflowRegistry *sync.Map // map[string]workflowRegistryEntry
125+
workflowCustomNametoFQN *sync.Map // Maps fully qualified workflow names to custom names. Usefor when client enqueues a workflow by name because registry is indexed by FQN.
126126

127127
// Workflow scheduler
128128
workflowScheduler *cron.Cron
@@ -158,15 +158,15 @@ func WithValue(ctx DBOSContext, key, val any) DBOSContext {
158158
// Will do nothing if the concrete type is not dbosContext
159159
if dbosCtx, ok := ctx.(*dbosContext); ok {
160160
return &dbosContext{
161-
ctx: context.WithValue(dbosCtx.ctx, key, val), // Spawn a new child context with the value set
162-
logger: dbosCtx.logger,
163-
systemDB: dbosCtx.systemDB,
164-
workflowsWg: dbosCtx.workflowsWg,
165-
workflowRegistry: dbosCtx.workflowRegistry,
166-
workflowRegMutex: dbosCtx.workflowRegMutex,
167-
applicationVersion: dbosCtx.applicationVersion,
168-
executorID: dbosCtx.executorID,
169-
applicationID: dbosCtx.applicationID,
161+
ctx: context.WithValue(dbosCtx.ctx, key, val), // Spawn a new child context with the value set
162+
logger: dbosCtx.logger,
163+
systemDB: dbosCtx.systemDB,
164+
workflowsWg: dbosCtx.workflowsWg,
165+
workflowRegistry: dbosCtx.workflowRegistry,
166+
workflowCustomNametoFQN: dbosCtx.workflowCustomNametoFQN,
167+
applicationVersion: dbosCtx.applicationVersion,
168+
executorID: dbosCtx.executorID,
169+
applicationID: dbosCtx.applicationID,
170170
}
171171
}
172172
return nil
@@ -181,15 +181,15 @@ func WithoutCancel(ctx DBOSContext) DBOSContext {
181181
}
182182
if dbosCtx, ok := ctx.(*dbosContext); ok {
183183
return &dbosContext{
184-
ctx: context.WithoutCancel(dbosCtx.ctx),
185-
logger: dbosCtx.logger,
186-
systemDB: dbosCtx.systemDB,
187-
workflowsWg: dbosCtx.workflowsWg,
188-
workflowRegistry: dbosCtx.workflowRegistry,
189-
workflowRegMutex: dbosCtx.workflowRegMutex,
190-
applicationVersion: dbosCtx.applicationVersion,
191-
executorID: dbosCtx.executorID,
192-
applicationID: dbosCtx.applicationID,
184+
ctx: context.WithoutCancel(dbosCtx.ctx),
185+
logger: dbosCtx.logger,
186+
systemDB: dbosCtx.systemDB,
187+
workflowsWg: dbosCtx.workflowsWg,
188+
workflowRegistry: dbosCtx.workflowRegistry,
189+
workflowCustomNametoFQN: dbosCtx.workflowCustomNametoFQN,
190+
applicationVersion: dbosCtx.applicationVersion,
191+
executorID: dbosCtx.executorID,
192+
applicationID: dbosCtx.applicationID,
193193
}
194194
}
195195
return nil
@@ -205,15 +205,15 @@ func WithTimeout(ctx DBOSContext, timeout time.Duration) (DBOSContext, context.C
205205
if dbosCtx, ok := ctx.(*dbosContext); ok {
206206
newCtx, cancelFunc := context.WithTimeoutCause(dbosCtx.ctx, timeout, errors.New("DBOS context timeout"))
207207
return &dbosContext{
208-
ctx: newCtx,
209-
logger: dbosCtx.logger,
210-
systemDB: dbosCtx.systemDB,
211-
workflowsWg: dbosCtx.workflowsWg,
212-
workflowRegistry: dbosCtx.workflowRegistry,
213-
workflowRegMutex: dbosCtx.workflowRegMutex,
214-
applicationVersion: dbosCtx.applicationVersion,
215-
executorID: dbosCtx.executorID,
216-
applicationID: dbosCtx.applicationID,
208+
ctx: newCtx,
209+
logger: dbosCtx.logger,
210+
systemDB: dbosCtx.systemDB,
211+
workflowsWg: dbosCtx.workflowsWg,
212+
workflowRegistry: dbosCtx.workflowRegistry,
213+
workflowCustomNametoFQN: dbosCtx.workflowCustomNametoFQN,
214+
applicationVersion: dbosCtx.applicationVersion,
215+
executorID: dbosCtx.executorID,
216+
applicationID: dbosCtx.applicationID,
217217
}, cancelFunc
218218
}
219219
return nil, func() {}
@@ -261,11 +261,11 @@ func (c *dbosContext) GetApplicationID() string {
261261
func NewDBOSContext(inputConfig Config) (DBOSContext, error) {
262262
ctx, cancelFunc := context.WithCancelCause(context.Background())
263263
initExecutor := &dbosContext{
264-
workflowsWg: &sync.WaitGroup{},
265-
ctx: ctx,
266-
ctxCancelFunc: cancelFunc,
267-
workflowRegistry: make(map[string]workflowRegistryEntry),
268-
workflowRegMutex: &sync.RWMutex{},
264+
workflowsWg: &sync.WaitGroup{},
265+
ctx: ctx,
266+
ctxCancelFunc: cancelFunc,
267+
workflowRegistry: &sync.Map{},
268+
workflowCustomNametoFQN: &sync.Map{},
269269
}
270270

271271
// Load and process the configuration
@@ -438,7 +438,20 @@ func getBinaryHash() (string, error) {
438438
return "", err
439439
}
440440

441-
file, err := os.Open(execPath)
441+
execPath, err = filepath.EvalSymlinks(execPath)
442+
if err != nil {
443+
return "", fmt.Errorf("resolve self path: %w", err)
444+
}
445+
446+
fi, err := os.Lstat(execPath)
447+
if err != nil {
448+
return "", err
449+
}
450+
if !fi.Mode().IsRegular() {
451+
return "", fmt.Errorf("executable is not a regular file")
452+
}
453+
454+
file, err := os.Open(execPath) // #nosec G304 -- opening our own executable, not user-supplied
442455
if err != nil {
443456
return "", err
444457
}

0 commit comments

Comments
 (0)