Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
619f468
accept dedup ID and priority
maxdml Aug 13, 2025
192045b
fix
maxdml Aug 13, 2025
8141109
tests
maxdml Aug 13, 2025
aad0db7
style nits
maxdml Aug 13, 2025
fefeaab
nits
maxdml Aug 14, 2025
e040dad
increase test timeout
maxdml Aug 14, 2025
7ed3dc1
add a gha for security scans. Run go vet in test gha
maxdml Aug 14, 2025
f9f1538
handle errors where missing + fix potential overflows
maxdml Aug 14, 2025
a4bffde
set toolchain to 1.25
maxdml Aug 14, 2025
535131e
add read header timeout to mitigate slowloris
maxdml Aug 14, 2025
a8ed0ca
check the binary before opening, but disable G304
maxdml Aug 14, 2025
46cb68a
constants
maxdml Aug 14, 2025
c88b2e5
remove unused error
maxdml Aug 14, 2025
38cf444
simplify handler implementation
maxdml Aug 14, 2025
e2c38a9
discord badge
maxdml Aug 14, 2025
991f060
fix timeouts: set timeout on dequeue + handle sub millisecond timouts…
maxdml Aug 14, 2025
ab763ff
use struct{}{} for notification chans, because they use zero space
maxdml Aug 14, 2025
c3ded62
dequeueWorkflow accepts an input struct
maxdml Aug 14, 2025
048964d
print step name in DBOSContext RunAsStep errors
maxdml Aug 14, 2025
8f37ec3
complete DLQ test
maxdml Aug 14, 2025
038174e
record runtime type errors in the typed erased wrapped workflow function
maxdml Aug 14, 2025
dc036c9
update notes
maxdml Aug 14, 2025
74e381d
use a sync.Map for workflow registry
maxdml Aug 14, 2025
32009f0
prevent the spawning of child workflows within steps
maxdml Aug 14, 2025
db2cb71
fix
maxdml Aug 14, 2025
ff00332
test GHA on 1.25
maxdml Aug 14, 2025
f7df447
comment
maxdml Aug 14, 2025
91d8e90
comment
maxdml Aug 14, 2025
68f07ca
store pointers to sync.Map and also pass workflowCustomNametoFQN to n…
maxdml Aug 14, 2025
3466a54
nit
maxdml Aug 14, 2025
063ee3a
Merge branch 'main' into bag-o-stuff
maxdml Aug 14, 2025
b8255d6
error in GetResult if recording the step fails
maxdml Aug 14, 2025
9d87c70
merge conflict
maxdml Aug 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions .github/workflows/security.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: Security Checks

on:
push:
branches:
- main
pull_request:
branches:
types:
- ready_for_review
- opened
- reopened
- synchronize
workflow_dispatch:

jobs:
security:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: '1.25.x'

- name: Cache Go modules
uses: actions/cache@v4
with:
path: |
~/go/pkg/mod
~/.cache/go-build
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-

- name: Download dependencies
run: go mod download

- name: Install security tools
run: |
go install golang.org/x/vuln/cmd/govulncheck@latest
go install github.com/securego/gosec/v2/cmd/gosec@latest

- name: Run govulncheck
run: govulncheck ./...

- name: Run gosec
run: gosec ./...
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: '1.23.x'
go-version: '1.25.x'

- name: Cache Go modules
uses: actions/cache@v4
Expand All @@ -61,7 +61,7 @@ jobs:
run: go install gotest.tools/gotestsum@latest

- name: Run tests
run: gotestsum --format github-action -- -race ./...
run: go vet ./... && gotestsum --format github-action -- -race ./...
working-directory: ./dbos
env:
PGPASSWORD: a!b@c$d()e*_,/:;=?@ff[]22
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
[![Go Reference](https://pkg.go.dev/badge/github.com/dbos-inc/dbos-transact-go.svg)](https://pkg.go.dev/github.com/dbos-inc/dbos-transact-go)
[![Go Report Card](https://goreportcard.com/badge/github.com/dbos-inc/dbos-transact-go)](https://goreportcard.com/report/github.com/dbos-inc/dbos-transact-go)
[![GitHub release (latest SemVer)](https://img.shields.io/github/v/release/dbos-inc/dbos-transact-go?sort=semver)](https://github.com/dbos-inc/dbos-transact-go/releases)
[![Join Discord](https://img.shields.io/badge/Discord-Join%20Chat-5865F2?logo=discord&logoColor=white)](https://discord.com/invite/jsmC6pXGgX)


# DBOS Transact: Lightweight Durable Workflow Orchestration with Postgres

#### [Documentation](https://docs.dbos.dev/)   •   [Examples](https://docs.dbos.dev/examples)   •   [Github](https://github.com/dbos-inc)   •   [Discord](https://discord.com/invite/jsmC6pXGgX)
#### [Documentation](https://docs.dbos.dev/)   •   [Examples](https://docs.dbos.dev/examples)   •   [Github](https://github.com/dbos-inc)
</div>

#### This Golang version of DBOS Transact is in Alpha!
Expand Down
35 changes: 23 additions & 12 deletions dbos/admin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,37 @@ import (
)

const (
healthCheckPath = "/dbos-healthz"
workflowRecoveryPath = "/dbos-workflow-recovery"
workflowQueuesMetadataPath = "/dbos-workflow-queues-metadata"
_HEALTHCHECK_PATH = "/dbos-healthz"
_WORKFLOW_RECOVERY_PATH = "/dbos-workflow-recovery"
_WORKFLOW_QUEUES_METADATA_PATH = "/dbos-workflow-queues-metadata"

_ADMIN_SERVER_READ_HEADER_TIMEOUT = 5 * time.Second
_ADMIN_SERVER_SHUTDOWN_TIMEOUT = 10 * time.Second
)

type adminServer struct {
server *http.Server
logger *slog.Logger
port int
}

func newAdminServer(ctx *dbosContext, port int) *adminServer {
mux := http.NewServeMux()

// Health endpoint
mux.HandleFunc(healthCheckPath, func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc(_HEALTHCHECK_PATH, func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status":"healthy"}`))
_, err := w.Write([]byte(`{"status":"healthy"}`))
if err != nil {
ctx.logger.Error("Error writing health check response", "error", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
})

// Recovery endpoint
mux.HandleFunc(workflowRecoveryPath, func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc(_WORKFLOW_RECOVERY_PATH, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
Expand Down Expand Up @@ -67,7 +76,7 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
})

// Queue metadata endpoint
mux.HandleFunc(workflowQueuesMetadataPath, func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc(_WORKFLOW_QUEUES_METADATA_PATH, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
Expand All @@ -84,18 +93,20 @@ func newAdminServer(ctx *dbosContext, port int) *adminServer {
})

server := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
ReadHeaderTimeout: _ADMIN_SERVER_READ_HEADER_TIMEOUT,
}

return &adminServer{
server: server,
logger: ctx.logger,
port: port,
}
}

func (as *adminServer) Start() error {
as.logger.Info("Starting admin server", "port", 3001)
as.logger.Info("Starting admin server", "port", as.port)

go func() {
if err := as.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
Expand All @@ -109,8 +120,8 @@ func (as *adminServer) Start() error {
func (as *adminServer) Shutdown(ctx context.Context) error {
as.logger.Info("Shutting down admin server")

// XXX consider moving the grace period to DBOSContext.Shutdown()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
// Note: consider moving the grace period to DBOSContext.Shutdown()
ctx, cancel := context.WithTimeout(ctx, _ADMIN_SERVER_SHUTDOWN_TIMEOUT)
defer cancel()

if err := as.server.Shutdown(ctx); err != nil {
Expand Down
14 changes: 7 additions & 7 deletions dbos/admin_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestAdminServer(t *testing.T) {

// Verify admin server is not running
client := &http.Client{Timeout: 1 * time.Second}
_, err = client.Get("http://localhost:3001" + healthCheckPath)
_, err = client.Get("http://localhost:3001" + _HEALTHCHECK_PATH)
require.Error(t, err, "Expected request to fail when admin server is not started")

// Verify the DBOS executor doesn't have an admin server instance
Expand Down Expand Up @@ -89,13 +89,13 @@ func TestAdminServer(t *testing.T) {
{
name: "Health endpoint responds correctly",
method: "GET",
endpoint: "http://localhost:3001" + healthCheckPath,
endpoint: "http://localhost:3001" + _HEALTHCHECK_PATH,
expectedStatus: http.StatusOK,
},
{
name: "Recovery endpoint responds correctly with valid JSON",
method: "POST",
endpoint: "http://localhost:3001" + workflowRecoveryPath,
endpoint: "http://localhost:3001" + _WORKFLOW_RECOVERY_PATH,
body: bytes.NewBuffer(mustMarshal([]string{"executor1", "executor2"})),
contentType: "application/json",
expectedStatus: http.StatusOK,
Expand All @@ -109,21 +109,21 @@ func TestAdminServer(t *testing.T) {
{
name: "Recovery endpoint rejects invalid methods",
method: "GET",
endpoint: "http://localhost:3001" + workflowRecoveryPath,
endpoint: "http://localhost:3001" + _WORKFLOW_RECOVERY_PATH,
expectedStatus: http.StatusMethodNotAllowed,
},
{
name: "Recovery endpoint rejects invalid JSON",
method: "POST",
endpoint: "http://localhost:3001" + workflowRecoveryPath,
endpoint: "http://localhost:3001" + _WORKFLOW_RECOVERY_PATH,
body: strings.NewReader(`{"invalid": json}`),
contentType: "application/json",
expectedStatus: http.StatusBadRequest,
},
{
name: "Queue metadata endpoint responds correctly",
method: "GET",
endpoint: "http://localhost:3001" + workflowQueuesMetadataPath,
endpoint: "http://localhost:3001" + _WORKFLOW_QUEUES_METADATA_PATH,
expectedStatus: http.StatusOK,
validateResp: func(t *testing.T, resp *http.Response) {
var queueMetadata []WorkflowQueue
Expand All @@ -149,7 +149,7 @@ func TestAdminServer(t *testing.T) {
{
name: "Queue metadata endpoint rejects invalid methods",
method: "POST",
endpoint: "http://localhost:3001" + workflowQueuesMetadataPath,
endpoint: "http://localhost:3001" + _WORKFLOW_QUEUES_METADATA_PATH,
expectedStatus: http.StatusMethodNotAllowed,
},
}
Expand Down
Loading
Loading