Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
619f468
accept dedup ID and priority
maxdml Aug 13, 2025
192045b
fix
maxdml Aug 13, 2025
8141109
tests
maxdml Aug 13, 2025
aad0db7
style nits
maxdml Aug 13, 2025
fefeaab
nits
maxdml Aug 14, 2025
e040dad
increase test timeout
maxdml Aug 14, 2025
7ed3dc1
add a gha for security scans. Run go vet in test gha
maxdml Aug 14, 2025
f9f1538
handle errors where missing + fix potential overflows
maxdml Aug 14, 2025
a4bffde
set toolchain to 1.25
maxdml Aug 14, 2025
535131e
add read header timeout to mitigate slowloris
maxdml Aug 14, 2025
a8ed0ca
check the binary before opening, but disable G304
maxdml Aug 14, 2025
46cb68a
constants
maxdml Aug 14, 2025
c88b2e5
remove unused error
maxdml Aug 14, 2025
38cf444
simplify handler implementation
maxdml Aug 14, 2025
e2c38a9
discord badge
maxdml Aug 14, 2025
991f060
fix timeouts: set timeout on dequeue + handle sub millisecond timouts…
maxdml Aug 14, 2025
ab763ff
use struct{}{} for notification chans, because they use zero space
maxdml Aug 14, 2025
c3ded62
dequeueWorkflow accepts an input struct
maxdml Aug 14, 2025
048964d
print step name in DBOSContext RunAsStep errors
maxdml Aug 14, 2025
8f37ec3
complete DLQ test
maxdml Aug 14, 2025
038174e
record runtime type errors in the typed erased wrapped workflow function
maxdml Aug 14, 2025
dc036c9
update notes
maxdml Aug 14, 2025
74e381d
use a sync.Map for workflow registry
maxdml Aug 14, 2025
32009f0
prevent the spawning of child workflows within steps
maxdml Aug 14, 2025
db2cb71
fix
maxdml Aug 14, 2025
ff00332
test GHA on 1.25
maxdml Aug 14, 2025
f7df447
comment
maxdml Aug 14, 2025
91d8e90
comment
maxdml Aug 14, 2025
68f07ca
store pointers to sync.Map and also pass workflowCustomNametoFQN to n…
maxdml Aug 14, 2025
3466a54
nit
maxdml Aug 14, 2025
063ee3a
Merge branch 'main' into bag-o-stuff
maxdml Aug 14, 2025
b8255d6
error in GetResult if recording the step fails
maxdml Aug 14, 2025
9d87c70
merge conflict
maxdml Aug 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions .github/workflows/security.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: Security Checks

on:
push:
branches:
- main
pull_request:
branches:
types:
- ready_for_review
- opened
- reopened
- synchronize
workflow_dispatch:

jobs:
security:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: '1.25.x'

- name: Cache Go modules
uses: actions/cache@v4
with:
path: |
~/go/pkg/mod
~/.cache/go-build
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Download dependencies
run: go mod download

- name: Install security tools
run: |
go install golang.org/x/vuln/cmd/govulncheck@latest
go install github.com/securego/gosec/v2/cmd/gosec@latest
- name: Run govulncheck
run: govulncheck ./...

- name: Run gosec
run: gosec ./...
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: '1.23.x'
go-version: '1.25.x'

- name: Cache Go modules
uses: actions/cache@v4
Expand All @@ -61,7 +61,7 @@ jobs:
run: go install gotest.tools/gotestsum@latest

- name: Run tests
run: gotestsum --format github-action -- -race ./...
run: go vet ./... && gotestsum --format github-action -- -race ./...
working-directory: ./dbos
env:
PGPASSWORD: a!b@c$d()e*_,/:;=?@ff[]22
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
[![Go Reference](https://pkg.go.dev/badge/github.com/dbos-inc/dbos-transact-go.svg)](https://pkg.go.dev/github.com/dbos-inc/dbos-transact-go)
[![Go Report Card](https://goreportcard.com/badge/github.com/dbos-inc/dbos-transact-go)](https://goreportcard.com/report/github.com/dbos-inc/dbos-transact-go)
[![GitHub release (latest SemVer)](https://img.shields.io/github/v/release/dbos-inc/dbos-transact-go?sort=semver)](https://github.com/dbos-inc/dbos-transact-go/releases)
[![Join Discord](https://img.shields.io/badge/Discord-Join%20Chat-5865F2?logo=discord&logoColor=white)](https://discord.com/invite/jsmC6pXGgX)


# DBOS Transact: Lightweight Durable Workflow Orchestration with Postgres

#### [Documentation](https://docs.dbos.dev/)      [Examples](https://docs.dbos.dev/examples)      [Github](https://github.com/dbos-inc)      [Discord](https://discord.com/invite/jsmC6pXGgX)
#### [Documentation](https://docs.dbos.dev/)      [Examples](https://docs.dbos.dev/examples)      [Github](https://github.com/dbos-inc)
</div>

#### This Golang version of DBOS Transact is in Alpha!
Expand Down
35 changes: 23 additions & 12 deletions dbos/admin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,37 @@ import (
)

const (
healthCheckPath = "/dbos-healthz"
workflowRecoveryPath = "/dbos-workflow-recovery"
workflowQueuesMetadataPath = "/dbos-workflow-queues-metadata"
_HEALTHCHECK_PATH = "/dbos-healthz"
_WORKFLOW_RECOVERY_PATH = "/dbos-workflow-recovery"
_WORKFLOW_QUEUES_METADATA_PATH = "/dbos-workflow-queues-metadata"

_ADMIN_SERVER_READ_HEADER_TIMEOUT = 5 * time.Second
_ADMIN_SERVER_SHUTDOWN_TIMEOUT = 10 * time.Second
)

type adminServer struct {
server *http.Server
logger *slog.Logger
port int
}

func newAdminServer(ctx *dbosContext, port int) *adminServer {
mux := http.NewServeMux()

// Health endpoint
mux.HandleFunc(healthCheckPath, func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc(_HEALTHCHECK_PATH, func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status":"healthy"}`))
_, err := w.Write([]byte(`{"status":"healthy"}`))
if err != nil {
ctx.logger.Error("Error writing health check response", "error", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
})

// Recovery endpoint
mux.HandleFunc(workflowRecoveryPath, func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc(_WORKFLOW_RECOVERY_PATH, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
Expand Down Expand Up @@ -67,7 +76,7 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
})

// Queue metadata endpoint
mux.HandleFunc(workflowQueuesMetadataPath, func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc(_WORKFLOW_QUEUES_METADATA_PATH, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
Expand All @@ -84,18 +93,20 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
})

server := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
ReadHeaderTimeout: _ADMIN_SERVER_READ_HEADER_TIMEOUT,
}

return &adminServer{
server: server,
logger: ctx.logger,
port: port,
}
}

func (as *adminServer) Start() error {
as.logger.Info("Starting admin server", "port", 3001)
as.logger.Info("Starting admin server", "port", as.port)

go func() {
if err := as.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
Expand All @@ -109,8 +120,8 @@ func (as *adminServer) Start() error {
func (as *adminServer) Shutdown(ctx context.Context) error {
as.logger.Info("Shutting down admin server")

// XXX consider moving the grace period to DBOSContext.Shutdown()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
// Note: consider moving the grace period to DBOSContext.Shutdown()
ctx, cancel := context.WithTimeout(ctx, _ADMIN_SERVER_SHUTDOWN_TIMEOUT)
defer cancel()

if err := as.server.Shutdown(ctx); err != nil {
Expand Down
14 changes: 7 additions & 7 deletions dbos/admin_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestAdminServer(t *testing.T) {

// Verify admin server is not running
client := &http.Client{Timeout: 1 * time.Second}
_, err = client.Get("http://localhost:3001" + healthCheckPath)
_, err = client.Get("http://localhost:3001" + _HEALTHCHECK_PATH)
require.Error(t, err, "Expected request to fail when admin server is not started")

// Verify the DBOS executor doesn't have an admin server instance
Expand Down Expand Up @@ -89,13 +89,13 @@ func TestAdminServer(t *testing.T) {
{
name: "Health endpoint responds correctly",
method: "GET",
endpoint: "http://localhost:3001" + healthCheckPath,
endpoint: "http://localhost:3001" + _HEALTHCHECK_PATH,
expectedStatus: http.StatusOK,
},
{
name: "Recovery endpoint responds correctly with valid JSON",
method: "POST",
endpoint: "http://localhost:3001" + workflowRecoveryPath,
endpoint: "http://localhost:3001" + _WORKFLOW_RECOVERY_PATH,
body: bytes.NewBuffer(mustMarshal([]string{"executor1", "executor2"})),
contentType: "application/json",
expectedStatus: http.StatusOK,
Expand All @@ -109,21 +109,21 @@ func TestAdminServer(t *testing.T) {
{
name: "Recovery endpoint rejects invalid methods",
method: "GET",
endpoint: "http://localhost:3001" + workflowRecoveryPath,
endpoint: "http://localhost:3001" + _WORKFLOW_RECOVERY_PATH,
expectedStatus: http.StatusMethodNotAllowed,
},
{
name: "Recovery endpoint rejects invalid JSON",
method: "POST",
endpoint: "http://localhost:3001" + workflowRecoveryPath,
endpoint: "http://localhost:3001" + _WORKFLOW_RECOVERY_PATH,
body: strings.NewReader(`{"invalid": json}`),
contentType: "application/json",
expectedStatus: http.StatusBadRequest,
},
{
name: "Queue metadata endpoint responds correctly",
method: "GET",
endpoint: "http://localhost:3001" + workflowQueuesMetadataPath,
endpoint: "http://localhost:3001" + _WORKFLOW_QUEUES_METADATA_PATH,
expectedStatus: http.StatusOK,
validateResp: func(t *testing.T, resp *http.Response) {
var queueMetadata []WorkflowQueue
Expand All @@ -149,7 +149,7 @@ func TestAdminServer(t *testing.T) {
{
name: "Queue metadata endpoint rejects invalid methods",
method: "POST",
endpoint: "http://localhost:3001" + workflowQueuesMetadataPath,
endpoint: "http://localhost:3001" + _WORKFLOW_QUEUES_METADATA_PATH,
expectedStatus: http.StatusMethodNotAllowed,
},
}
Expand Down
8 changes: 4 additions & 4 deletions dbos/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,10 +473,6 @@ func TestCancelResume(t *testing.T) {
// Verify the deadline was reset (should be different from original)
assert.False(t, resumeStatus.Deadline.Equal(originalDeadline), "expected deadline to be reset after resume, but it remained the same: %v", originalDeadline)

// The new deadline should be after resumeStart + workflowTimeout
expectedDeadline := resumeStart.Add(workflowTimeout - 100*time.Millisecond) // Allow some leeway for processing time
assert.True(t, resumeStatus.Deadline.After(expectedDeadline), "deadline %v is too early (expected around %v)", resumeStatus.Deadline, expectedDeadline)

// Wait for the workflow to complete
_, err = resumeHandle.GetResult()
require.Error(t, err, "expected timeout error, but got none")
Expand All @@ -491,6 +487,10 @@ func TestCancelResume(t *testing.T) {
finalStatus, err := resumeHandle.GetStatus()
require.NoError(t, err, "failed to get final workflow status")

// The new deadline should have been set after resumeStart + workflowTimeout
expectedDeadline := resumeStart.Add(workflowTimeout - 100*time.Millisecond) // Allow some leeway for processing time
assert.True(t, finalStatus.Deadline.After(expectedDeadline), "deadline %v is too early (expected around %v)", resumeStatus.Deadline, expectedDeadline)

assert.Equal(t, WorkflowStatusCancelled, finalStatus.Status, "expected final workflow status to be CANCELLED")

require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after cancel/resume timeout test")
Expand Down
87 changes: 50 additions & 37 deletions dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"io"
"log/slog"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -119,10 +120,9 @@ type dbosContext struct {
// Wait group for workflow goroutines
workflowsWg *sync.WaitGroup

// Workflow registry
workflowRegistry map[string]workflowRegistryEntry
workflowRegMutex *sync.RWMutex
workflowCustomNametoFQN sync.Map // Maps fully qualified workflow names to custom names. Usefor when client enqueues a workflow by name because registry is indexed by FQN.
// Workflow registry - read-mostly sync.Map since registration happens only before launch
workflowRegistry *sync.Map // map[string]workflowRegistryEntry
workflowCustomNametoFQN *sync.Map // Maps fully qualified workflow names to custom names. Usefor when client enqueues a workflow by name because registry is indexed by FQN.

// Workflow scheduler
workflowScheduler *cron.Cron
Expand Down Expand Up @@ -158,15 +158,15 @@ func WithValue(ctx DBOSContext, key, val any) DBOSContext {
// Will do nothing if the concrete type is not dbosContext
if dbosCtx, ok := ctx.(*dbosContext); ok {
return &dbosContext{
ctx: context.WithValue(dbosCtx.ctx, key, val), // Spawn a new child context with the value set
logger: dbosCtx.logger,
systemDB: dbosCtx.systemDB,
workflowsWg: dbosCtx.workflowsWg,
workflowRegistry: dbosCtx.workflowRegistry,
workflowRegMutex: dbosCtx.workflowRegMutex,
applicationVersion: dbosCtx.applicationVersion,
executorID: dbosCtx.executorID,
applicationID: dbosCtx.applicationID,
ctx: context.WithValue(dbosCtx.ctx, key, val), // Spawn a new child context with the value set
logger: dbosCtx.logger,
systemDB: dbosCtx.systemDB,
workflowsWg: dbosCtx.workflowsWg,
workflowRegistry: dbosCtx.workflowRegistry,
workflowCustomNametoFQN: dbosCtx.workflowCustomNametoFQN,
applicationVersion: dbosCtx.applicationVersion,
executorID: dbosCtx.executorID,
applicationID: dbosCtx.applicationID,
}
}
return nil
Expand All @@ -181,15 +181,15 @@ func WithoutCancel(ctx DBOSContext) DBOSContext {
}
if dbosCtx, ok := ctx.(*dbosContext); ok {
return &dbosContext{
ctx: context.WithoutCancel(dbosCtx.ctx),
logger: dbosCtx.logger,
systemDB: dbosCtx.systemDB,
workflowsWg: dbosCtx.workflowsWg,
workflowRegistry: dbosCtx.workflowRegistry,
workflowRegMutex: dbosCtx.workflowRegMutex,
applicationVersion: dbosCtx.applicationVersion,
executorID: dbosCtx.executorID,
applicationID: dbosCtx.applicationID,
ctx: context.WithoutCancel(dbosCtx.ctx),
logger: dbosCtx.logger,
systemDB: dbosCtx.systemDB,
workflowsWg: dbosCtx.workflowsWg,
workflowRegistry: dbosCtx.workflowRegistry,
workflowCustomNametoFQN: dbosCtx.workflowCustomNametoFQN,
applicationVersion: dbosCtx.applicationVersion,
executorID: dbosCtx.executorID,
applicationID: dbosCtx.applicationID,
}
}
return nil
Expand All @@ -205,15 +205,15 @@ func WithTimeout(ctx DBOSContext, timeout time.Duration) (DBOSContext, context.C
if dbosCtx, ok := ctx.(*dbosContext); ok {
newCtx, cancelFunc := context.WithTimeoutCause(dbosCtx.ctx, timeout, errors.New("DBOS context timeout"))
return &dbosContext{
ctx: newCtx,
logger: dbosCtx.logger,
systemDB: dbosCtx.systemDB,
workflowsWg: dbosCtx.workflowsWg,
workflowRegistry: dbosCtx.workflowRegistry,
workflowRegMutex: dbosCtx.workflowRegMutex,
applicationVersion: dbosCtx.applicationVersion,
executorID: dbosCtx.executorID,
applicationID: dbosCtx.applicationID,
ctx: newCtx,
logger: dbosCtx.logger,
systemDB: dbosCtx.systemDB,
workflowsWg: dbosCtx.workflowsWg,
workflowRegistry: dbosCtx.workflowRegistry,
workflowCustomNametoFQN: dbosCtx.workflowCustomNametoFQN,
applicationVersion: dbosCtx.applicationVersion,
executorID: dbosCtx.executorID,
applicationID: dbosCtx.applicationID,
}, cancelFunc
}
return nil, func() {}
Expand Down Expand Up @@ -261,11 +261,11 @@ func (c *dbosContext) GetApplicationID() string {
func NewDBOSContext(inputConfig Config) (DBOSContext, error) {
ctx, cancelFunc := context.WithCancelCause(context.Background())
initExecutor := &dbosContext{
workflowsWg: &sync.WaitGroup{},
ctx: ctx,
ctxCancelFunc: cancelFunc,
workflowRegistry: make(map[string]workflowRegistryEntry),
workflowRegMutex: &sync.RWMutex{},
workflowsWg: &sync.WaitGroup{},
ctx: ctx,
ctxCancelFunc: cancelFunc,
workflowRegistry: &sync.Map{},
workflowCustomNametoFQN: &sync.Map{},
}

// Load and process the configuration
Expand Down Expand Up @@ -438,7 +438,20 @@ func getBinaryHash() (string, error) {
return "", err
}

file, err := os.Open(execPath)
execPath, err = filepath.EvalSymlinks(execPath)
if err != nil {
return "", fmt.Errorf("resolve self path: %w", err)
}

fi, err := os.Lstat(execPath)
if err != nil {
return "", err
}
if !fi.Mode().IsRegular() {
return "", fmt.Errorf("executable is not a regular file")
}

file, err := os.Open(execPath) // #nosec G304 -- opening our own executable, not user-supplied
if err != nil {
return "", err
}
Expand Down
Loading
Loading