Skip to content

Commit de6488a

Browse files
committed
fix
1 parent 370d33e commit de6488a

File tree

1 file changed

+37
-37
lines changed

1 file changed

+37
-37
lines changed

dbos/workflow.go

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1350,10 +1350,16 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
13501350
/****************************************/
13511351

13521352
func (c *dbosContext) Send(_ DBOSContext, destinationID string, message any, topic string) error {
1353+
// Serialize the message before sending
1354+
serializer := newGobSerializer[any]()
1355+
encodedMessage, err := serializer.Encode(message)
1356+
if err != nil {
1357+
return fmt.Errorf("failed to serialize message: %w", err)
1358+
}
13531359
return retry(c, func() error {
13541360
return c.systemDB.send(c, WorkflowSendInput{
13551361
DestinationID: destinationID,
1356-
Message: message,
1362+
Message: &encodedMessage,
13571363
Topic: topic,
13581364
})
13591365
}, withRetrierLogger(c.logger))
@@ -1372,13 +1378,7 @@ func Send[P any](ctx DBOSContext, destinationID string, message P, topic string)
13721378
if ctx == nil {
13731379
return errors.New("ctx cannot be nil")
13741380
}
1375-
// Serialize the message before sending
1376-
serializer := newGobSerializer[P]()
1377-
encodedMessage, err := serializer.Encode(message)
1378-
if err != nil {
1379-
return fmt.Errorf("failed to serialize message: %w", err)
1380-
}
1381-
return ctx.Send(ctx, destinationID, &encodedMessage, topic)
1381+
return ctx.Send(ctx, destinationID, message, topic)
13821382
}
13831383

13841384
type recvInput struct {
@@ -1410,52 +1410,59 @@ func (c *dbosContext) Recv(_ DBOSContext, topic string, timeout time.Duration) (
14101410
// return err
14111411
// }
14121412
// log.Printf("Received: %s", message)
1413-
func Recv[T any](ctx DBOSContext, topic string, timeout time.Duration) (T, error) {
1413+
func Recv[R any](ctx DBOSContext, topic string, timeout time.Duration) (R, error) {
14141414
if ctx == nil {
1415-
return *new(T), errors.New("ctx cannot be nil")
1415+
return *new(R), errors.New("ctx cannot be nil")
14161416
}
14171417
msg, err := ctx.Recv(ctx, topic, timeout)
14181418
if err != nil {
1419-
return *new(T), err
1419+
return *new(R), err
14201420
}
14211421

14221422
// Handle nil message
14231423
if msg == nil {
1424-
return *new(T), nil
1424+
return *new(R), nil
14251425
}
14261426

1427-
var typedMessage T
1427+
var typedMessage R
14281428
// Check if we're in a real DBOS context (not a mock)
14291429
if _, ok := ctx.(*dbosContext); ok {
14301430
encodedMsg, ok := msg.(*string)
14311431
if !ok {
14321432
workflowID, _ := GetWorkflowID(ctx) // Must be within a workflow so we can ignore the error
1433-
return *new(T), newWorkflowUnexpectedResultType(workflowID, "string (encoded)", fmt.Sprintf("%T", msg))
1433+
return *new(R), newWorkflowUnexpectedResultType(workflowID, "string (encoded)", fmt.Sprintf("%T", msg))
14341434
}
1435-
serializer := newGobSerializer[T]()
1435+
serializer := newGobSerializer[R]()
14361436
var decodeErr error
14371437
typedMessage, decodeErr = serializer.Decode(encodedMsg)
14381438
if decodeErr != nil {
1439-
return *new(T), fmt.Errorf("decoding received message to type %T: %w", *new(T), decodeErr)
1439+
return *new(R), fmt.Errorf("decoding received message to type %T: %w", *new(R), decodeErr)
14401440
}
14411441
return typedMessage, nil
14421442
} else {
14431443
// Fallback for testing/mocking scenarios where serializer is nil
14441444
var ok bool
1445-
typedMessage, ok = msg.(T)
1445+
typedMessage, ok = msg.(R)
14461446
if !ok {
14471447
workflowID, _ := GetWorkflowID(ctx) // Must be within a workflow so we can ignore the error
1448-
return *new(T), newWorkflowUnexpectedResultType(workflowID, fmt.Sprintf("%T", new(T)), fmt.Sprintf("%T", msg))
1448+
return *new(R), newWorkflowUnexpectedResultType(workflowID, fmt.Sprintf("%T", new(R)), fmt.Sprintf("%T", msg))
14491449
}
14501450
}
14511451
return typedMessage, nil
14521452
}
14531453

14541454
func (c *dbosContext) SetEvent(_ DBOSContext, key string, message any) error {
1455+
// Serialize the event value before storing
1456+
serializer := newGobSerializer[any]()
1457+
encodedMessage, err := serializer.Encode(message)
1458+
if err != nil {
1459+
return fmt.Errorf("failed to serialize event value: %w", err)
1460+
}
1461+
14551462
return retry(c, func() error {
14561463
return c.systemDB.setEvent(c, WorkflowSetEventInput{
14571464
Key: key,
1458-
Message: message,
1465+
Message: &encodedMessage,
14591466
})
14601467
}, withRetrierLogger(c.logger))
14611468
}
@@ -1474,14 +1481,7 @@ func SetEvent[P any](ctx DBOSContext, key string, message P) error {
14741481
if ctx == nil {
14751482
return errors.New("ctx cannot be nil")
14761483
}
1477-
// Serialize the event value before storing
1478-
serializer := newGobSerializer[P]()
1479-
encodedMessage, err := serializer.Encode(message)
1480-
if err != nil {
1481-
return fmt.Errorf("failed to serialize event value: %w", err)
1482-
}
1483-
1484-
return ctx.SetEvent(ctx, key, &encodedMessage)
1484+
return ctx.SetEvent(ctx, key, message)
14851485
}
14861486

14871487
type getEventInput struct {
@@ -1515,40 +1515,40 @@ func (c *dbosContext) GetEvent(_ DBOSContext, targetWorkflowID, key string, time
15151515
// return err
15161516
// }
15171517
// log.Printf("Status: %s", status)
1518-
func GetEvent[T any](ctx DBOSContext, targetWorkflowID, key string, timeout time.Duration) (T, error) {
1518+
func GetEvent[R any](ctx DBOSContext, targetWorkflowID, key string, timeout time.Duration) (R, error) {
15191519
if ctx == nil {
1520-
return *new(T), errors.New("ctx cannot be nil")
1520+
return *new(R), errors.New("ctx cannot be nil")
15211521
}
15221522
value, err := ctx.GetEvent(ctx, targetWorkflowID, key, timeout)
15231523
if err != nil {
1524-
return *new(T), err
1524+
return *new(R), err
15251525
}
15261526
if value == nil {
1527-
return *new(T), nil
1527+
return *new(R), nil
15281528
}
15291529

1530-
var typedValue T
1530+
var typedValue R
15311531
// Check if we're in a real DBOS context (not a mock)
15321532
if _, ok := ctx.(*dbosContext); ok {
15331533
encodedValue, ok := value.(*string)
15341534
if !ok {
15351535
workflowID, _ := GetWorkflowID(ctx) // Must be within a workflow so we can ignore the error
1536-
return *new(T), newWorkflowUnexpectedResultType(workflowID, "string (encoded)", fmt.Sprintf("%T", value))
1536+
return *new(R), newWorkflowUnexpectedResultType(workflowID, "string (encoded)", fmt.Sprintf("%T", value))
15371537
}
15381538

1539-
serializer := newGobSerializer[T]()
1539+
serializer := newGobSerializer[R]()
15401540
var decodeErr error
15411541
typedValue, decodeErr = serializer.Decode(encodedValue)
15421542
if decodeErr != nil {
1543-
return *new(T), fmt.Errorf("decoding event value to type %T: %w", *new(T), decodeErr)
1543+
return *new(R), fmt.Errorf("decoding event value to type %T: %w", *new(R), decodeErr)
15441544
}
15451545
return typedValue, nil
15461546
} else {
15471547
var ok bool
1548-
typedValue, ok = value.(T)
1548+
typedValue, ok = value.(R)
15491549
if !ok {
15501550
workflowID, _ := GetWorkflowID(ctx) // Must be within a workflow so we can ignore the error
1551-
return *new(T), newWorkflowUnexpectedResultType(workflowID, fmt.Sprintf("%T", new(T)), fmt.Sprintf("%T", value))
1551+
return *new(R), newWorkflowUnexpectedResultType(workflowID, fmt.Sprintf("%T", new(R)), fmt.Sprintf("%T", value))
15521552
}
15531553
}
15541554
return typedValue, nil

0 commit comments

Comments
 (0)