Skip to content

Commit 1660da1

Browse files
committed
add messaging tests with custom schema
1 parent 2c12e86 commit 1660da1

File tree

1 file changed

+113
-32
lines changed

1 file changed

+113
-32
lines changed

dbos/dbos_test.go

Lines changed: 113 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/google/uuid"
1112
"github.com/jackc/pgx/v5/pgxpool"
1213
"github.com/stretchr/testify/assert"
1314
"github.com/stretchr/testify/require"
@@ -326,27 +327,31 @@ func TestConfig(t *testing.T) {
326327

327328
require.NotNil(t, ctx2)
328329
})
330+
}
329331

330-
t.Run("SystemDBMigrationWithCustomSchema", func(t *testing.T) {
331-
t.Setenv("DBOS__APPVERSION", "v1.0.0")
332-
t.Setenv("DBOS__APPID", "test-custom-schema")
333-
t.Setenv("DBOS__VMID", "test-executor-id")
332+
func TestCustomSystemDBSchema(t *testing.T) {
333+
t.Setenv("DBOS__APPVERSION", "v1.0.0")
334+
t.Setenv("DBOS__APPID", "test-custom-schema")
335+
t.Setenv("DBOS__VMID", "test-executor-id")
334336

335-
customSchema := "dbos_custom_test"
336-
ctx, err := NewDBOSContext(context.Background(), Config{
337-
DatabaseURL: databaseURL,
338-
AppName: "test-custom-schema-migration",
339-
DatabaseSchema: customSchema,
340-
})
341-
require.NoError(t, err)
342-
defer func() {
343-
if ctx != nil {
344-
ctx.Shutdown(1 * time.Minute)
345-
}
346-
}()
337+
databaseURL := getDatabaseURL()
338+
customSchema := "dbos_custom_test"
347339

348-
require.NotNil(t, ctx)
340+
ctx, err := NewDBOSContext(context.Background(), Config{
341+
DatabaseURL: databaseURL,
342+
AppName: "test-custom-schema-migration",
343+
DatabaseSchema: customSchema,
344+
})
345+
require.NoError(t, err)
346+
defer func() {
347+
if ctx != nil {
348+
ctx.Shutdown(1 * time.Minute)
349+
}
350+
}()
349351

352+
require.NotNil(t, ctx)
353+
354+
t.Run("CustomSchemaSetup", func(t *testing.T) {
350355
// Get the internal systemDB instance to check tables directly
351356
dbosCtx, ok := ctx.(*dbosContext)
352357
require.True(t, ok, "expected dbosContext")
@@ -414,23 +419,99 @@ func TestConfig(t *testing.T) {
414419
err = sysDB.pool.QueryRow(dbCtx, fmt.Sprintf("SELECT version FROM %s.dbos_migrations", customSchema)).Scan(&version)
415420
require.NoError(t, err)
416421
assert.Equal(t, int64(1), version, "migration version should be 1 (after initial migration)")
422+
})
417423

418-
// Test manual shutdown and recreate with same custom schema
419-
ctx.Shutdown(1 * time.Minute)
424+
// Test workflows for exercising Send/Recv and SetEvent/GetEvent
425+
type testWorkflowInput struct {
426+
PartnerWorkflowID string
427+
Message string
428+
}
429+
430+
// Workflow A: Uses Send() and GetEvent() - waits for workflow B
431+
sendGetEventWorkflow := func(ctx DBOSContext, input testWorkflowInput) (string, error) {
432+
// Send a message to the partner workflow
433+
err := Send(ctx, input.PartnerWorkflowID, input.Message, "test-topic")
434+
if err != nil {
435+
return "", err
436+
}
420437

421-
// Recreate context with same custom schema - should have no error since DB is already migrated
422-
ctx2, err := NewDBOSContext(context.Background(), Config{
423-
DatabaseURL: databaseURL,
424-
AppName: "test-custom-schema-recreate",
425-
DatabaseSchema: customSchema,
426-
})
427-
require.NoError(t, err)
428-
defer func() {
429-
if ctx2 != nil {
430-
ctx2.Shutdown(1 * time.Minute)
431-
}
432-
}()
438+
// Wait for an event from the partner workflow
439+
result, err := GetEvent[string](ctx, input.PartnerWorkflowID, "response-key", 5*time.Second)
440+
if err != nil {
441+
return "", err
442+
}
433443

434-
require.NotNil(t, ctx2)
444+
return result, nil
445+
}
446+
447+
// Workflow B: Uses Recv() and SetEvent() - waits for workflow A
448+
recvSetEventWorkflow := func(ctx DBOSContext, input testWorkflowInput) (string, error) {
449+
// Receive a message from the partner workflow
450+
receivedMsg, err := Recv[string](ctx, "test-topic", 5*time.Second)
451+
if err != nil {
452+
return "", err
453+
}
454+
455+
// Set an event for the partner workflow
456+
err = SetEvent(ctx, "response-key", "response-from-workflow-b")
457+
if err != nil {
458+
return "", err
459+
}
460+
461+
return receivedMsg, nil
462+
}
463+
464+
t.Run("CustomSchemaUsage", func(t *testing.T) {
465+
// Register the test workflows
466+
RegisterWorkflow(ctx, sendGetEventWorkflow)
467+
RegisterWorkflow(ctx, recvSetEventWorkflow)
468+
469+
// Launch the DBOS context
470+
ctx.Launch()
471+
472+
// Test RunWorkflow - start both workflows that will communicate with each other
473+
workflowAID := uuid.NewString()
474+
workflowBID := uuid.NewString()
475+
476+
// Start workflow B first (receiver)
477+
handleB, err := RunWorkflow(ctx, recvSetEventWorkflow, testWorkflowInput{
478+
PartnerWorkflowID: workflowAID,
479+
Message: "test-message-from-b",
480+
}, WithWorkflowID(workflowBID))
481+
require.NoError(t, err, "failed to start recvSetEventWorkflow")
482+
483+
// Small delay to ensure workflow B is ready to receive
484+
time.Sleep(100 * time.Millisecond)
485+
486+
// Start workflow A (sender)
487+
handleA, err := RunWorkflow(ctx, sendGetEventWorkflow, testWorkflowInput{
488+
PartnerWorkflowID: workflowBID,
489+
Message: "test-message-from-a",
490+
}, WithWorkflowID(workflowAID))
491+
require.NoError(t, err, "failed to start sendGetEventWorkflow")
492+
493+
// Wait for both workflows to complete
494+
resultA, err := handleA.GetResult()
495+
require.NoError(t, err, "failed to get result from workflow A")
496+
assert.Equal(t, "response-from-workflow-b", resultA, "workflow A should receive response from workflow B")
497+
498+
resultB, err := handleB.GetResult()
499+
require.NoError(t, err, "failed to get result from workflow B")
500+
assert.Equal(t, "test-message-from-a", resultB, "workflow B should receive message from workflow A")
501+
502+
// Test GetWorkflowSteps
503+
stepsA, err := GetWorkflowSteps(ctx, workflowAID)
504+
require.NoError(t, err, "failed to get workflow A steps")
505+
require.Len(t, stepsA, 3, "workflow A should have 3 steps (Send + GetEvent + Sleep)")
506+
assert.Equal(t, "DBOS.send", stepsA[0].StepName, "first step should be Send")
507+
assert.Equal(t, "DBOS.getEvent", stepsA[1].StepName, "second step should be GetEvent")
508+
assert.Equal(t, "DBOS.sleep", stepsA[2].StepName, "third step should be Sleep")
509+
510+
stepsB, err := GetWorkflowSteps(ctx, workflowBID)
511+
require.NoError(t, err, "failed to get workflow B steps")
512+
require.Len(t, stepsB, 3, "workflow B should have 3 steps (Recv + Sleep + SetEvent)")
513+
assert.Equal(t, "DBOS.recv", stepsB[0].StepName, "first step should be Recv")
514+
assert.Equal(t, "DBOS.sleep", stepsB[1].StepName, "second step should be Sleep")
515+
assert.Equal(t, "DBOS.setEvent", stepsB[2].StepName, "third step should be SetEvent")
435516
})
436517
}

0 commit comments

Comments
 (0)