Skip to content

Conversation

@maxdml
Copy link
Collaborator

@maxdml maxdml commented Jul 16, 2025

First implementation of send/recv. Few things might move around in the future (e.g., were we create the notifier connection). Also this PR does not handle breaks on the connection checking for NOTIFY.

How it works:

  • Send/Recv functions are special durable steps
  • Recv, before checking in the DB if the message exists, will wait for PG notifications for a set timeout.
  • A notification listener loop is created with the system database, and "launched" with it.

Some details:

  • Use the builtin PGX WaitForNotification and OnNotification mechanisms.
  • A recv call will create a new channel and share it with the notification listener. Upon receiving a notification, the notification listener will check if a channel was shared for a given payload, and write a value on the channel to signal the receiver.

About what can be sent: to send non built-in types, users must register them with encoding/gob because that's what we use to serialize the message. Usual constraints apply (e.g., no exported fields <-> not serializable). Also note that encoding/gob cannot encode raw nil values.

err = tx.QueryRow(ctx, query, destinationID, topic).Scan(&messageString)
if err != nil {
if err == pgx.ErrNoRows {
// No message found, record nil result
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should never happen for reasons explained below

@maxdml maxdml marked this pull request as ready for review July 16, 2025 18:47
t.Fatalf("failed to start receive workflow: %v", err)
}
result, err := receiveHandle.GetResult(context.Background())
if result != "--" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is --? Shouldn't it be nil?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's just the artifact of receiveWorkflow. It'll concatenate - with empty messages...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return a timeout error -- in the case of go, where nil are not going to be valid messages, this is easier than in other languages where undefined or None seem to be valid messages.

_, loaded := s.notificationsMap.LoadOrStore(payload, c)
if loaded {
close(c)
fmt.Println("Receive already called for workflow ", destinationID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this check is better happening before checking the notifications table (see Python https://github.com/dbos-inc/dbos-transact-py/blob/main/dbos/_sys_db.py#L1307)

The reason is to fail faster to avoid more load on the database.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@maxdml maxdml merged commit d3658fc into main Jul 17, 2025
1 check passed
@maxdml maxdml deleted the send-recv branch July 17, 2025 18:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants