Skip to content

Commit b681350

Browse files
committed
implemented db listener
1 parent 3a6c23b commit b681350

File tree

1 file changed

+92
-0
lines changed

1 file changed

+92
-0
lines changed

server/database/listener.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package database
2+
3+
import (
4+
"bytes"
5+
"database/sql"
6+
"encoding/json"
7+
"fmt"
8+
"github.com/lib/pq"
9+
"log"
10+
"os"
11+
"time"
12+
)
13+
14+
const (
15+
dbhost = "localhost"
16+
dbport = 5432
17+
dbuser = "postgres"
18+
dbname = "notifications"
19+
)
20+
21+
type PostgresDbListener struct { }
22+
23+
func NewPostgresDbListener() *PostgresDbListener {
24+
return &PostgresDbListener{}
25+
}
26+
27+
func (l *PostgresDbListener) getDbListener() *pq.Listener {
28+
connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
29+
dbhost, dbport, dbuser, os.Getenv("POSTGRES_PASSWORD"), dbname)
30+
31+
db, err := sql.Open("postgres", connStr)
32+
if err != nil {
33+
panic(err)
34+
}
35+
36+
err = db.Ping()
37+
if err != nil {
38+
panic(err)
39+
}
40+
41+
err = db.Close()
42+
if err != nil {
43+
panic(err)
44+
}
45+
46+
stateChange := func(ev pq.ListenerEventType, err error) {
47+
if err != nil {
48+
log.Printf("postgres database listener state change: %v", err.Error())
49+
}
50+
}
51+
52+
listener := pq.NewListener(connStr, 10 * time.Second, time.Minute, stateChange)
53+
err = listener.Listen("notifications_data_changed")
54+
if err != nil {
55+
panic(err)
56+
}
57+
58+
return listener
59+
}
60+
61+
func (l *PostgresDbListener) waitForNotification(dbl *pq.Listener) {
62+
for {
63+
select {
64+
case n := <- dbl.Notify:
65+
log.Println("DB listener received data from channel [", n.Channel, "] :")
66+
var prettyJSON bytes.Buffer
67+
err := json.Indent(&prettyJSON, []byte(n.Extra), "", "\t")
68+
if err != nil {
69+
log.Println("DB listener error processing JSON: ", err)
70+
}
71+
log.Println(string(prettyJSON.Bytes()))
72+
case <-time.After(90 * time.Second):
73+
log.Println("DB listener received no notification events for 90 seconds, checking connection")
74+
go func() {
75+
err := dbl.Ping()
76+
if err != nil {
77+
log.Println("listener ping error: ", err)
78+
}
79+
}()
80+
}
81+
}
82+
}
83+
84+
func (l *PostgresDbListener) Listen() {
85+
log.Println("Starting database notifications listener")
86+
dbl := l.getDbListener()
87+
go func() {
88+
for {
89+
l.waitForNotification(dbl)
90+
}
91+
}()
92+
}

0 commit comments

Comments
 (0)