Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions chaos_tests/chaos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func setupDBOS(t *testing.T) dbos.DBOSContext {
// Register cleanup to run after test completes
t.Cleanup(func() {
if dbosCtx != nil {
dbosCtx.Shutdown(30 * time.Second)
dbos.Shutdown(dbosCtx, 30*time.Second)
}
})

Expand Down Expand Up @@ -246,7 +246,7 @@ func TestChaosWorkflow(t *testing.T) {
// Register scheduled workflow to run every second for chaos testing
dbos.RegisterWorkflow(dbosCtx, scheduledWorkflow, dbos.WithSchedule("* * * * * *"), dbos.WithWorkflowName("ScheduledChaosTest"))

err := dbosCtx.Launch()
err := dbos.Launch(dbosCtx)
require.NoError(t, err)

// Run multiple workflows
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestChaosRecv(t *testing.T) {
// Register the workflow
dbos.RegisterWorkflow(dbosCtx, recvWorkflow)

err := dbosCtx.Launch()
err := dbos.Launch(dbosCtx)
require.NoError(t, err)

// Run multiple workflows with send/recv
Expand Down Expand Up @@ -364,7 +364,7 @@ func TestChaosEvents(t *testing.T) {
// Register the workflow
dbos.RegisterWorkflow(dbosCtx, eventWorkflow)

err := dbosCtx.Launch()
err := dbos.Launch(dbosCtx)
require.NoError(t, err)

// Run multiple workflows with events
Expand Down Expand Up @@ -455,7 +455,7 @@ func TestChaosQueues(t *testing.T) {
dbos.RegisterWorkflow(dbosCtx, stepTwo)
dbos.RegisterWorkflow(dbosCtx, workflow)

err := dbosCtx.Launch()
err := dbos.Launch(dbosCtx)
require.NoError(t, err)

// Run multiple workflows
Expand Down
24 changes: 12 additions & 12 deletions dbos/admin_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ func TestAdminServer(t *testing.T) {
})
require.NoError(t, err)

err = ctx.Launch()
err = Launch(ctx)
require.NoError(t, err)
// Ensure cleanup
defer func() {
if ctx != nil {
ctx.Shutdown(1 * time.Minute)
Shutdown(ctx, 1*time.Minute)
}
}()

Expand Down Expand Up @@ -65,13 +65,13 @@ func TestAdminServer(t *testing.T) {
})
require.NoError(t, err)

err = ctx.Launch()
err = Launch(ctx)
require.NoError(t, err)

// Ensure cleanup
defer func() {
if ctx != nil {
ctx.Shutdown(1 * time.Minute)
Shutdown(ctx, 1*time.Minute)
}
}()

Expand Down Expand Up @@ -253,13 +253,13 @@ func TestAdminServer(t *testing.T) {
}
RegisterWorkflow(ctx, structWorkflow)

err = ctx.Launch()
err = Launch(ctx)
require.NoError(t, err)

// Ensure cleanup
defer func() {
if ctx != nil {
ctx.Shutdown(1 * time.Minute)
Shutdown(ctx, 1*time.Minute)
}
}()

Expand Down Expand Up @@ -381,13 +381,13 @@ func TestAdminServer(t *testing.T) {
}
RegisterWorkflow(ctx, testWorkflow)

err = ctx.Launch()
err = Launch(ctx)
require.NoError(t, err)

// Ensure cleanup
defer func() {
if ctx != nil {
ctx.Shutdown(1 * time.Minute)
Shutdown(ctx, 1*time.Minute)
}
}()

Expand Down Expand Up @@ -569,14 +569,14 @@ func TestAdminServer(t *testing.T) {
}
RegisterWorkflow(ctx, regularWorkflow)

err = ctx.Launch()
err = Launch(ctx)
require.NoError(t, err)

// Ensure cleanup
defer func() {
close(blockingChan) // Unblock any blocked workflows
if ctx != nil {
ctx.Shutdown(1 * time.Minute)
Shutdown(ctx, 1*time.Minute)
}
}()

Expand Down Expand Up @@ -745,15 +745,15 @@ func TestAdminServer(t *testing.T) {
return fmt.Sprintf("executed at %v", scheduledTime), nil
}, WithSchedule("* * * * * *")) // Every second

err = ctx.Launch()
err = Launch(ctx)
require.NoError(t, err)

client := &http.Client{Timeout: 5 * time.Second}

// Ensure cleanup
defer func() {
if ctx != nil {
ctx.Shutdown(1 * time.Minute)
Shutdown(ctx, 1*time.Minute)
}
if client.Transport != nil {
client.Transport.(*http.Transport).CloseIdleConnections()
Expand Down
10 changes: 5 additions & 5 deletions dbos/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestEnqueue(t *testing.T) {
RegisterWorkflow(serverCtx, priorityWorkflow, WithWorkflowName("PriorityWorkflow"))

// Launch the server context to start processing tasks
err := serverCtx.Launch()
err := Launch(serverCtx)
require.NoError(t, err)

// Setup client - this will enqueue tasks
Expand Down Expand Up @@ -319,7 +319,7 @@ func TestCancelResume(t *testing.T) {
RegisterWorkflow(serverCtx, timeoutBlockingWorkflow, WithWorkflowName("TimeoutBlockingWorkflow"))

// Launch the server context to start processing tasks
err := serverCtx.Launch()
err := Launch(serverCtx)
require.NoError(t, err)

// Setup client - this will enqueue tasks
Expand Down Expand Up @@ -570,7 +570,7 @@ func TestForkWorkflow(t *testing.T) {
RegisterWorkflow(serverCtx, parentWorkflow, WithWorkflowName("ParentWorkflow"))

// Launch the server context to start processing tasks
err := serverCtx.Launch()
err := Launch(serverCtx)
require.NoError(t, err)

// Setup client
Expand Down Expand Up @@ -706,7 +706,7 @@ func TestListWorkflows(t *testing.T) {
// Register cleanup for server context
t.Cleanup(func() {
if serverCtx != nil {
serverCtx.Shutdown(30 * time.Second)
Shutdown(serverCtx, 30*time.Second)
}
})

Expand All @@ -728,7 +728,7 @@ func TestListWorkflows(t *testing.T) {
RegisterWorkflow(serverCtx, simpleWorkflow, WithWorkflowName("SimpleWorkflow"))

// Launch server
err = serverCtx.Launch()
err = Launch(serverCtx)
require.NoError(t, err)

// Setup client with same custom schema
Expand Down
30 changes: 15 additions & 15 deletions dbos/dbos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestConfig(t *testing.T) {
require.NoError(t, err)
defer func() {
if ctx != nil {
ctx.Shutdown(1 * time.Minute)
Shutdown(ctx, 1*time.Minute)
}
}() // Clean up executor

Expand Down Expand Up @@ -100,7 +100,7 @@ func TestConfig(t *testing.T) {
require.NoError(t, err)
defer func() {
if ctx != nil {
ctx.Shutdown(1 * time.Minute)
Shutdown(ctx, 1*time.Minute)
}
}()

Expand All @@ -121,7 +121,7 @@ func TestConfig(t *testing.T) {
require.NoError(t, err)
defer func() {
if ctx != nil {
ctx.Shutdown(1 * time.Minute)
Shutdown(ctx, 1*time.Minute)
}
}()

Expand All @@ -143,7 +143,7 @@ func TestConfig(t *testing.T) {
require.NoError(t, err)
defer func() {
if ctx != nil {
ctx.Shutdown(1 * time.Minute)
Shutdown(ctx, 1*time.Minute)
}
}()

Expand All @@ -168,7 +168,7 @@ func TestConfig(t *testing.T) {
require.NoError(t, err)
defer func() {
if ctx != nil {
ctx.Shutdown(1 * time.Minute)
Shutdown(ctx, 1*time.Minute)
}
}()

Expand All @@ -190,7 +190,7 @@ func TestConfig(t *testing.T) {
require.NoError(t, err)
defer func() {
if ctx != nil {
ctx.Shutdown(1 * time.Minute)
Shutdown(ctx, 1*time.Minute)
}
}()

Expand Down Expand Up @@ -262,7 +262,7 @@ func TestConfig(t *testing.T) {
assert.Equal(t, int64(1), version, "migration version should be 1 (after initial migration)")

// Test manual shutdown and recreate
ctx.Shutdown(1 * time.Minute)
Shutdown(ctx, 1*time.Minute)

// Recreate context - should have no error since DB is already migrated
ctx2, err := NewDBOSContext(context.Background(), Config{
Expand All @@ -272,7 +272,7 @@ func TestConfig(t *testing.T) {
require.NoError(t, err)
defer func() {
if ctx2 != nil {
ctx2.Shutdown(1 * time.Minute)
Shutdown(ctx2, 1*time.Minute)
}
}()

Expand Down Expand Up @@ -301,7 +301,7 @@ func TestCustomSystemDBSchema(t *testing.T) {
require.NoError(t, err)
defer func() {
if ctx != nil {
ctx.Shutdown(1 * time.Minute)
Shutdown(ctx, 1*time.Minute)
}
}()

Expand Down Expand Up @@ -425,7 +425,7 @@ func TestCustomSystemDBSchema(t *testing.T) {
RegisterWorkflow(ctx, recvSetEventWorkflow)

// Launch the DBOS context
ctx.Launch()
Launch(ctx)

// Test RunWorkflow - start both workflows that will communicate with each other
workflowAID := uuid.NewString()
Expand Down Expand Up @@ -548,7 +548,7 @@ func TestCustomPool(t *testing.T) {
require.NotNil(t, customdbosContext)

dbosCtx, ok := customdbosContext.(*dbosContext)
defer dbosCtx.Shutdown(10 * time.Second)
defer Shutdown(dbosCtx, 10*time.Second)
require.True(t, ok)

sysDB, ok := dbosCtx.systemDB.(*sysDB)
Expand All @@ -570,9 +570,9 @@ func TestCustomPool(t *testing.T) {
RegisterWorkflow(customdbosContext, recvSetEventWorkflowCustom)

// Launch the DBOS context
err = customdbosContext.Launch()
err = Launch(customdbosContext)
require.NoError(t, err)
defer dbosCtx.Shutdown(1 * time.Minute)
defer Shutdown(dbosCtx, 1*time.Minute)

// Test RunWorkflow - start both workflows that will communicate with each other
workflowAID := uuid.NewString()
Expand Down Expand Up @@ -643,9 +643,9 @@ func TestCustomPool(t *testing.T) {
RegisterWorkflow(dbosCtx, wf)

// Launch the DBOS context
err = dbosCtx.Launch()
err = Launch(dbosCtx)
require.NoError(t, err)
defer dbosCtx.Shutdown(1 * time.Minute)
defer Shutdown(dbosCtx, 1*time.Minute)

// Run a workflow
_, err = RunWorkflow(dbosCtx, wf, "test-input")
Expand Down
8 changes: 4 additions & 4 deletions dbos/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ func TestLogger(t *testing.T) {
AppName: "test-app",
}) // Create executor with default logger
require.NoError(t, err)
err = dbosCtx.Launch()
err = Launch(dbosCtx)
require.NoError(t, err)
t.Cleanup(func() {
if dbosCtx != nil {
dbosCtx.Shutdown(10 * time.Second)
Shutdown(dbosCtx, 10*time.Second)
}
})

Expand Down Expand Up @@ -59,11 +59,11 @@ func TestLogger(t *testing.T) {
Logger: slogLogger,
})
require.NoError(t, err)
err = dbosCtx.Launch()
err = Launch(dbosCtx)
require.NoError(t, err)
t.Cleanup(func() {
if dbosCtx != nil {
dbosCtx.Shutdown(10 * time.Second)
Shutdown(dbosCtx, 10*time.Second)
}
})

Expand Down
Loading
Loading