Skip to content
45 changes: 45 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Contributing to DBOS Transact Python

Thank you for considering contributing to DBOS Transact. We welcome contributions from everyone, including bug fixes, feature enhancements, documentation improvements, or any other form of contribution.

## How to Contribute

To get started with DBOS Transact, please read the [README](README.md).

You can contribute in many ways. Some simple ways are:
* Use the SDK and open issues to report any bugs, questions, concern with the SDK, samples or documentation.
* Respond to issues with advice or suggestions.
* Participate in discussions in our [Discord](https://discord.gg/fMwQjeW5zg) channel.
* Contribute fixes and improvement to code, samples or documentation.

### To contribute code, please follow these steps:

1. Fork this github repository to your own account.

2. Clone the forked repository to your local machine.

3. Create a branch.

4. Make the necessary change to code, samples or documentation.

5. Write tests.

6. Commit the changes to your forked repository.

7. Submit a pull request to this repository.
In the PR description please include:
* Description of the fix/feature.
* Brief description of implementation.
* Description of how you tested the fix.

## Requesting features

If you have a feature request or an idea for an enhancement, feel free to open an issue on GitHub. Describe the feature or enhancement you'd like to see and why it would be valuable. Discuss it with the community on the [Discord](https://discord.gg/fMwQjeW5zg) channel.

## Discuss with the community

If you are stuck, need help, or wondering if a certain contribution will be welcome, please ask! You can reach out to us on [Discord](https://discord.gg/fMwQjeW5zg) or Github discussions.

## Code of conduct

It is important to us that contributing to DBOS will be a pleasant experience, if necessary, please refer to our [code of conduct](CODE_OF_CONDUCT.md) for participation guidelines.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
[![Go Report Card](https://goreportcard.com/badge/github.com/dbos-inc/dbos-transact-go)](https://goreportcard.com/report/github.com/dbos-inc/dbos-transact-go)
[![GitHub release (latest SemVer)](https://img.shields.io/github/v/release/dbos-inc/dbos-transact-go?sort=semver)](https://github.com/dbos-inc/dbos-transact-go/releases)
[![Join Discord](https://img.shields.io/badge/Discord-Join%20Chat-5865F2?logo=discord&logoColor=white)](https://discord.com/invite/jsmC6pXGgX)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](LICENSE)


# DBOS Transact: Lightweight Durable Workflow Orchestration with Postgres
Expand Down
3 changes: 2 additions & 1 deletion dbos/admin_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ func TestAdminServer(t *testing.T) {
// Verify the DBOS executor doesn't have an admin server instance
require.NotNil(t, ctx, "Expected DBOS instance to be created")

exec := ctx.(*dbosContext)
exec, ok := ctx.(*dbosContext)
require.True(t, ok, "Expected ctx to be of type *dbosContext")
require.Nil(t, exec.adminServer, "Expected admin server to be nil when not configured")
})

Expand Down
8 changes: 4 additions & 4 deletions dbos/dbos.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ type DBOSContext interface {
Cancel() // Gracefully shutdown the DBOS runtime, waiting for workflows to complete and cleaning up resources

// Workflow operations
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution
RunAsStep(_ DBOSContext, fn stepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
RunAsWorkflow(_ DBOSContext, fn workflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution
Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow
Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow
Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow
SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow
GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow
GetEvent(_ DBOSContext, targetWorkflowID string, key string, timeout time.Duration) (any, error) // Get a key-value event from a target workflow
Sleep(_ DBOSContext, duration time.Duration) (time.Duration, error) // Durable sleep that survives workflow recovery
GetWorkflowID() (string, error) // Get the current workflow ID (only available within workflows)
GetStepID() (int, error) // Get the current step ID (only available within workflows)
Expand Down
3 changes: 2 additions & 1 deletion dbos/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func TestLogger(t *testing.T) {
}
})

ctx := dbosCtx.(*dbosContext)
ctx, ok := dbosCtx.(*dbosContext)
require.True(t, ok, "Expected dbosCtx to be of type *dbosContext")
require.NotNil(t, ctx.logger)

// Test logger access
Expand Down
6 changes: 5 additions & 1 deletion dbos/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ func queueEntriesAreCleanedUp(ctx DBOSContext) bool {
success := false
for range maxTries {
// Begin transaction
exec := ctx.(*dbosContext)
exec, ok := ctx.(*dbosContext)
if !ok {
fmt.Println("Expected ctx to be of type *dbosContext in queueEntriesAreCleanedUp")
return false
}
tx, err := exec.systemDB.(*sysDB).pool.Begin(ctx)
if err != nil {
return false
Expand Down
80 changes: 39 additions & 41 deletions dbos/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,17 +212,17 @@ func (h *workflowPollingHandle[R]) GetResult() (R, error) {
/**********************************/
/******* WORKFLOW REGISTRY *******/
/**********************************/
type GenericWrappedWorkflowFunc[P any, R any] func(ctx DBOSContext, input P, opts ...WorkflowOption) (WorkflowHandle[R], error)
type WrappedWorkflowFunc func(ctx DBOSContext, input any, opts ...WorkflowOption) (WorkflowHandle[any], error)
type WrappedWorkflowFunc[P any, R any] func(ctx DBOSContext, input P, opts ...WorkflowOption) (WorkflowHandle[R], error)
type wrappedWorkflowFunc func(ctx DBOSContext, input any, opts ...WorkflowOption) (WorkflowHandle[any], error)

type workflowRegistryEntry struct {
wrappedFunction WrappedWorkflowFunc
wrappedFunction wrappedWorkflowFunc
maxRetries int
name string
}

// Register adds a workflow function to the registry (thread-safe, only once per name)
func registerWorkflow(ctx DBOSContext, workflowFQN string, fn WrappedWorkflowFunc, maxRetries int, customName string) {
func registerWorkflow(ctx DBOSContext, workflowFQN string, fn wrappedWorkflowFunc, maxRetries int, customName string) {
// Skip if we don't have a concrete dbosContext
c, ok := ctx.(*dbosContext)
if !ok {
Expand Down Expand Up @@ -257,7 +257,7 @@ func registerWorkflow(ctx DBOSContext, workflowFQN string, fn WrappedWorkflowFun
}
}

func registerScheduledWorkflow(ctx DBOSContext, workflowName string, fn WorkflowFunc, cronSchedule string) {
func registerScheduledWorkflow(ctx DBOSContext, workflowName string, fn workflowFunc, cronSchedule string) {
// Skip if we don't have a concrete dbosContext
c, ok := ctx.(*dbosContext)
if !ok {
Expand Down Expand Up @@ -365,7 +365,7 @@ func WithWorkflowName(name string) workflowRegistrationOption {
// dbos.WithMaxRetries(5),
// dbos.WithSchedule("0 0 * * *")) // daily at midnight
// dbos.WithWorkflowName("MyCustomWorkflowName") // Custom name for the workflow
func RegisterWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R], opts ...workflowRegistrationOption) {
func RegisterWorkflow[P any, R any](ctx DBOSContext, fn WorkflowFunc[P, R], opts ...workflowRegistrationOption) {
if ctx == nil {
panic("ctx cannot be nil")
}
Expand All @@ -391,7 +391,7 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R
gob.Register(r)

// Register a type-erased version of the durable workflow for recovery
typedErasedWorkflow := WorkflowFunc(func(ctx DBOSContext, input any) (any, error) {
typedErasedWorkflow := workflowFunc(func(ctx DBOSContext, input any) (any, error) {
typedInput, ok := input.(P)
if !ok {
wfID, err := ctx.GetWorkflowID()
Expand All @@ -411,7 +411,7 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R
return fn(ctx, typedInput)
})

typeErasedWrapper := WrappedWorkflowFunc(func(ctx DBOSContext, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) {
typeErasedWrapper := wrappedWorkflowFunc(func(ctx DBOSContext, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) {
opts = append(opts, withWorkflowName(fqn)) // Append the name so ctx.RunAsWorkflow can look it up from the registry to apply registration-time options
handle, err := ctx.RunAsWorkflow(ctx, typedErasedWorkflow, input, opts...)
if err != nil {
Expand All @@ -438,13 +438,13 @@ type DBOSContextKey string

const workflowStateKey DBOSContextKey = "workflowState"

// GenericWorkflowFunc represents a type-safe workflow function with specific input and output types.
// WorkflowFunc represents a type-safe workflow function with specific input and output types.
// P is the input parameter type and R is the return type.
// All workflow functions must accept a DBOSContext as their first parameter.
type GenericWorkflowFunc[P any, R any] func(ctx DBOSContext, input P) (R, error)
type WorkflowFunc[P any, R any] func(ctx DBOSContext, input P) (R, error)

// WorkflowFunc represents a type-erased workflow function used internally.
type WorkflowFunc func(ctx DBOSContext, input any) (any, error)
// workflowFunc represents a type-erased workflow function used internally.
type workflowFunc func(ctx DBOSContext, input any) (any, error)

type workflowParams struct {
workflowName string
Expand Down Expand Up @@ -524,19 +524,19 @@ func withWorkflowName(name string) WorkflowOption {
// } else {
// log.Printf("Result: %v", result)
// }
func RunAsWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R], input P, opts ...WorkflowOption) (WorkflowHandle[R], error) {
func RunAsWorkflow[P any, R any](ctx DBOSContext, fn WorkflowFunc[P, R], input P, opts ...WorkflowOption) (WorkflowHandle[R], error) {
if ctx == nil {
return nil, fmt.Errorf("ctx cannot be nil")
}

// Add the fn name to the options so we can communicate it with DBOSContext.RunAsWorkflow
opts = append(opts, withWorkflowName(runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()))

typedErasedWorkflow := WorkflowFunc(func(ctx DBOSContext, input any) (any, error) {
typedErasedWorkflow := workflowFunc(func(ctx DBOSContext, input any) (any, error) {
return fn(ctx, input.(P))
})

handle, err := ctx.(*dbosContext).RunAsWorkflow(ctx, typedErasedWorkflow, input, opts...)
handle, err := ctx.RunAsWorkflow(ctx, typedErasedWorkflow, input, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -581,7 +581,7 @@ func RunAsWorkflow[P any, R any](ctx DBOSContext, fn GenericWorkflowFunc[P, R],
return nil, fmt.Errorf("unexpected workflow handle type: %T", handle)
}

func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) {
func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn workflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) {
// Apply options to build params
params := workflowParams{
applicationVersion: c.GetApplicationVersion(),
Expand Down Expand Up @@ -825,14 +825,14 @@ func (c *dbosContext) RunAsWorkflow(_ DBOSContext, fn WorkflowFunc, input any, o
/******* STEP FUNCTIONS *******/
/******************************/

// StepFunc represents a type-erased step function used internally.
type StepFunc func(ctx context.Context) (any, error)
// stepFunc represents a type-erased step function used internally.
type stepFunc func(ctx context.Context) (any, error)

// GenericStepFunc represents a type-safe step function with a specific output type R.
type GenericStepFunc[R any] func(ctx context.Context) (R, error)
// StepFunc represents a type-safe step function with a specific output type R.
type StepFunc[R any] func(ctx context.Context) (R, error)

// stepOptions holds the configuration for step execution using functional options pattern.
type stepOptions struct {
// StepOptions holds the configuration for step execution using functional options pattern.
type StepOptions struct {
MaxRetries int // Maximum number of retry attempts (0 = no retries)
BackoffFactor float64 // Exponential backoff multiplier between retries (default: 2.0)
BaseInterval time.Duration // Initial delay between retries (default: 100ms)
Expand All @@ -841,7 +841,7 @@ type stepOptions struct {
}

// setDefaults applies default values to stepOptions
func (opts *stepOptions) setDefaults() {
func (opts *StepOptions) setDefaults() {
if opts.BackoffFactor == 0 {
opts.BackoffFactor = _DEFAULT_STEP_BACKOFF_FACTOR
}
Expand All @@ -854,13 +854,13 @@ func (opts *stepOptions) setDefaults() {
}

// StepOption is a functional option for configuring step execution parameters.
type StepOption func(*stepOptions)
type StepOption func(*StepOptions)

// WithStepName sets a custom name for the step. If the step name has already been set
// by a previous call to WithStepName, this option will be ignored to allow
// multiple WithStepName calls without overriding the first one.
func WithStepName(name string) StepOption {
return func(opts *stepOptions) {
return func(opts *StepOptions) {
if opts.StepName == "" {
opts.StepName = name
}
Expand All @@ -870,7 +870,7 @@ func WithStepName(name string) StepOption {
// WithStepMaxRetries sets the maximum number of retry attempts for the step.
// A value of 0 means no retries (default behavior).
func WithStepMaxRetries(maxRetries int) StepOption {
return func(opts *stepOptions) {
return func(opts *StepOptions) {
opts.MaxRetries = maxRetries
}
}
Expand All @@ -879,23 +879,23 @@ func WithStepMaxRetries(maxRetries int) StepOption {
// The delay between retries is calculated as: BaseInterval * (BackoffFactor^(retry-1))
// Default value is 2.0.
func WithBackoffFactor(factor float64) StepOption {
return func(opts *stepOptions) {
return func(opts *StepOptions) {
opts.BackoffFactor = factor
}
}

// WithBaseInterval sets the initial delay between retries.
// Default value is 100ms.
func WithBaseInterval(interval time.Duration) StepOption {
return func(opts *stepOptions) {
return func(opts *StepOptions) {
opts.BaseInterval = interval
}
}

// WithMaxInterval sets the maximum delay between retries.
// Default value is 5s.
func WithMaxInterval(interval time.Duration) StepOption {
return func(opts *stepOptions) {
return func(opts *StepOptions) {
opts.MaxInterval = interval
}
}
Expand Down Expand Up @@ -940,7 +940,7 @@ func WithMaxInterval(interval time.Duration) StepOption {
// Note that the function passed to RunAsStep must accept a context.Context as its first parameter
// and this context *must* be the one specified in the function's signature (not the context passed to RunAsStep).
// Under the hood, DBOS will augment the step's context and pass it to the function when executing it durably.
func RunAsStep[R any](ctx DBOSContext, fn GenericStepFunc[R], opts ...StepOption) (R, error) {
func RunAsStep[R any](ctx DBOSContext, fn StepFunc[R], opts ...StepOption) (R, error) {
if ctx == nil {
return *new(R), newStepExecutionError("", "", "ctx cannot be nil")
}
Expand All @@ -954,7 +954,7 @@ func RunAsStep[R any](ctx DBOSContext, fn GenericStepFunc[R], opts ...StepOption
opts = append(opts, WithStepName(stepName))

// Type-erase the function
typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) })
typeErasedFn := stepFunc(func(ctx context.Context) (any, error) { return fn(ctx) })

result, err := ctx.RunAsStep(ctx, typeErasedFn, opts...)
// Step function could return a nil result
Expand All @@ -969,9 +969,9 @@ func RunAsStep[R any](ctx DBOSContext, fn GenericStepFunc[R], opts ...StepOption
return typedResult, err
}

func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) {
func (c *dbosContext) RunAsStep(_ DBOSContext, fn stepFunc, opts ...StepOption) (any, error) {
// Process functional options
stepOpts := &stepOptions{}
stepOpts := &StepOptions{}
for _, opt := range opts {
opt(stepOpts)
}
Expand Down Expand Up @@ -1354,16 +1354,14 @@ func RetrieveWorkflow[R any](ctx DBOSContext, workflowID string) (*workflowPolli
var r R
gob.Register(r)

workflowStatus, err := ctx.(*dbosContext).systemDB.listWorkflows(ctx, listWorkflowsDBInput{
workflowIDs: []string{workflowID},
})
// Call the interface method
handle, err := ctx.RetrieveWorkflow(ctx, workflowID)
if err != nil {
return nil, fmt.Errorf("failed to retrieve workflow status: %w", err)
}
if len(workflowStatus) == 0 {
return nil, newNonExistentWorkflowError(workflowID)
return nil, err
}
return newWorkflowPollingHandle[R](ctx, workflowID), nil

// Convert to typed polling handle
return newWorkflowPollingHandle[R](ctx, handle.GetWorkflowID()), nil
}

type EnqueueOptions struct {
Expand Down
Loading