@@ -7,47 +7,50 @@ import (
77 "github.com/hellofresh/goengine/driver/sql/postgres"
88)
99
10- const sqlFuncEventStreamNotify = `CREATE FUNCTION public.event_stream_notify ()
11- RETURNS TRIGGER
12- LANGUAGE plpgsql AS $$
13- DECLARE
14- channel text := TG_ARGV[0];
10+ const sqlFuncEventStreamNotify = `DO LANGUAGE plpgsql $EXIST$
1511BEGIN
16- PERFORM (
17- WITH payload AS
18- (
19- SELECT NEW.no, NEW.event_name, NEW.metadata -> '_aggregate_id' AS aggregate_id
20- )
21- SELECT pg_notify(channel, row_to_json(payload)::text) FROM payload
22- );
23- RETURN NULL;
12+ IF (SELECT to_regprocedure('event_stream_notify()') IS NULL) THEN
13+ CREATE FUNCTION event_stream_notify ()
14+ RETURNS TRIGGER
15+ LANGUAGE plpgsql AS $$
16+ DECLARE
17+ channel text := TG_ARGV[0];
18+ BEGIN
19+ PERFORM (
20+ WITH payload AS
21+ (
22+ SELECT NEW.no, NEW.event_name, NEW.metadata -> '_aggregate_id' AS aggregate_id
23+ )
24+ SELECT pg_notify(channel, row_to_json(payload)::text) FROM payload
25+ );
26+ RETURN NULL;
27+ END;
28+ $$;
29+ END IF;
2430END;
25- $$; `
31+ $EXIST$ `
2632
2733// sqlTriggerEventStreamNotify a helper to create the sql on a event store table
2834func sqlTriggerEventStreamNotifyTemplate (eventStreamName goengine.StreamName , eventStreamTable string ) string {
2935 triggerName := fmt .Sprintf ("%s_notify" , eventStreamTable )
3036 /* #nosec */
3137 return fmt .Sprintf (
32- `DO LANGUAGE plpgsql $$
38+ `DO LANGUAGE plpgsql $EXIST $
3339 BEGIN
3440 IF NOT EXISTS(
35- SELECT * FROM information_schema.triggers
36- WHERE
37- event_object_schema = 'public' AND
38- event_object_table = %s AND
39- trigger_schema = 'public' AND
40- trigger_name = %s
41+ SELECT TRUE FROM pg_trigger WHERE
42+ tgrelid = %[1]s::regclass AND
43+ tgname = %[2]s
4144 )
4245 THEN
43- CREATE TRIGGER %s
46+ CREATE TRIGGER %[3] s
4447 AFTER INSERT
45- ON %s
48+ ON %[4] s
4649 FOR EACH ROW
47- EXECUTE PROCEDURE event_stream_notify(%s);
50+ EXECUTE PROCEDURE event_stream_notify(%[5] s);
4851 END IF;
4952 END;
50- $$` ,
53+ $EXIST $` ,
5154 postgres .QuoteString (eventStreamTable ),
5255 postgres .QuoteString (triggerName ),
5356 postgres .QuoteIdentifier (triggerName ),
0 commit comments