Skip to content

Commit d3658fc

Browse files
authored
Send recv (#18)
First implementation of send/recv. Few things might move around in the future (e.g., were we create the notifier connection). Also this PR does not handle breaks on the connection checking for NOTIFY. How it works: - Send/Recv functions are special durable steps - Recv, before checking in the DB if the message exists, will wait for PG notifications for a set timeout. - A notification listener loop is created with the system database, and "launched" with it. Some details: - Use the builtin PGX `WaitForNotification` and `OnNotification` mechanisms. - A recv call will create a new channel and share it with the notification listener. Upon receiving a notification, the notification listener will check if a channel was shared for a given payload, and write a value on the channel to signal the receiver. About what can be sent: to send non built-in types, users must register them with `encoding/gob` because that's what we use to serialize the message. Usual constraints apply (e.g., no exported fields <-> not serializable). Also note that `encoding/gob` cannot encode raw `nil` values.
1 parent 6fafdb5 commit d3658fc

File tree

7 files changed

+680
-17
lines changed

7 files changed

+680
-17
lines changed

dbos/dbos.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ func Launch() error {
103103
}
104104
fmt.Println("DBOS: System database initialized")
105105

106+
systemDB.Launch(context.Background())
107+
106108
// Create context with cancel function for queue runner
107109
ctx, cancel := context.WithCancel(context.Background())
108110

dbos/migrations/000001_initial_dbos_schema.down.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
-- 001_initial_dbos_schema.down.sql
22

3+
-- Drop trigger first
4+
DROP TRIGGER IF EXISTS dbos_notifications_trigger ON dbos.notifications;
5+
6+
-- Drop function
7+
DROP FUNCTION IF EXISTS dbos.notifications_function();
8+
39
-- Drop tables in reverse order to respect foreign key constraints
410
DROP TABLE IF EXISTS dbos.workflow_events;
511
DROP TABLE IF EXISTS dbos.notifications;

dbos/migrations/000001_initial_dbos_schema.up.sql

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,21 @@ CREATE TABLE dbos.notifications (
6868
-- Create index for notifications
6969
CREATE INDEX idx_workflow_topic ON dbos.notifications (destination_uuid, topic);
7070

71+
-- Create notification function
72+
CREATE OR REPLACE FUNCTION dbos.notifications_function() RETURNS TRIGGER AS $$
73+
DECLARE
74+
payload text := NEW.destination_uuid || '::' || NEW.topic;
75+
BEGIN
76+
PERFORM pg_notify('dbos_notifications_channel', payload);
77+
RETURN NEW;
78+
END;
79+
$$ LANGUAGE plpgsql;
80+
81+
-- Create notification trigger
82+
CREATE TRIGGER dbos_notifications_trigger
83+
AFTER INSERT ON dbos.notifications
84+
FOR EACH ROW EXECUTE FUNCTION dbos.notifications_function();
85+
7186
-- Create workflow_events table
7287
CREATE TABLE dbos.workflow_events (
7388
workflow_uuid TEXT NOT NULL,

0 commit comments

Comments
 (0)