Skip to content

Commit baaf9aa

Browse files
committed
socket foundation
1 parent a796ef1 commit baaf9aa

File tree

6 files changed

+195
-13
lines changed

6 files changed

+195
-13
lines changed

cmd/go-postgres-sockets/main.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,3 @@
1-
/*
2-
* User API
3-
*
4-
* A notifications proof of concept API
5-
*
6-
* API version: 0.1.0
7-
* Contact: [email protected]
8-
* Generated by: OpenAPI Generator (https://openapi-generator.tech)
9-
*/
10-
111
package main
122

133
import (

server/database/listener.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"database/sql"
66
"encoding/json"
77
"fmt"
8+
"github.com/bebo-dot-dev/go-postgres-sockets/server/socket"
89
"github.com/lib/pq"
910
"log"
1011
"os"
@@ -18,10 +19,14 @@ const (
1819
dbname = "notifications"
1920
)
2021

21-
type PostgresDbListener struct { }
22+
type PostgresDbListener struct {
23+
socketHub *socket.Hub
24+
}
2225

23-
func NewPostgresDbListener() *PostgresDbListener {
24-
return &PostgresDbListener{}
26+
func NewPostgresDbListener(hub *socket.Hub) *PostgresDbListener {
27+
return &PostgresDbListener{
28+
socketHub: hub,
29+
}
2530
}
2631

2732
func (l *PostgresDbListener) getDbListener() *pq.Listener {
@@ -69,6 +74,7 @@ func (l *PostgresDbListener) waitForNotification(dbl *pq.Listener) {
6974
log.Println("DB listener error processing JSON: ", err)
7075
}
7176
log.Println(string(prettyJSON.Bytes()))
77+
l.socketHub.Broadcast <- prettyJSON.Bytes()
7278
case <-time.After(90 * time.Second):
7379
log.Println("DB listener received no notification events for 90 seconds, checking connection")
7480
go func() {

server/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/go-playground/universal-translator v0.17.0 // indirect
77
github.com/go-playground/validator v9.31.0+incompatible
88
github.com/gorilla/mux v1.7.3
9+
github.com/gorilla/websocket v1.4.2
910
github.com/leodido/go-urn v1.2.1 // indirect
1011
github.com/lib/pq v1.10.2
1112
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect

server/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ github.com/go-playground/validator v9.31.0+incompatible h1:UA72EPEogEnq76ehGdEDp
88
github.com/go-playground/validator v9.31.0+incompatible/go.mod h1:yrEkQXlcI+PugkyDjY2bRrL/UBU4f3rvrgkN3V8JEig=
99
github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw=
1010
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
11+
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
12+
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
1113
github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w=
1214
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
1315
github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8=

server/socket/client.go

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package socket
2+
3+
import (
4+
"bytes"
5+
"log"
6+
"net/http"
7+
"time"
8+
9+
"github.com/gorilla/websocket"
10+
)
11+
12+
const (
13+
// Time allowed to write a message to the peer.
14+
writeWait = 10 * time.Second
15+
16+
// Time allowed to read the next pong message from the peer.
17+
pongWait = 60 * time.Second
18+
19+
// Send pings to peer with this period. Must be less than pongWait.
20+
pingPeriod = (pongWait * 9) / 10
21+
22+
// Maximum message size allowed from peer.
23+
maxMessageSize = 512
24+
)
25+
26+
var (
27+
newline = []byte{'\n'}
28+
space = []byte{' '}
29+
)
30+
31+
var upgrader = websocket.Upgrader{
32+
ReadBufferSize: 1024,
33+
WriteBufferSize: 1024,
34+
}
35+
36+
// Client is a middleman between the websocket connection and the hub.
37+
type Client struct {
38+
hub *Hub
39+
40+
// The websocket connection.
41+
conn *websocket.Conn
42+
43+
// Buffered channel of outbound messages.
44+
send chan []byte
45+
}
46+
47+
// readPump pumps messages from the websocket connection to the hub.
48+
//
49+
// The application runs readPump in a per-connection goroutine. The application
50+
// ensures that there is at most one reader on a connection by executing all
51+
// reads from this goroutine.
52+
func (c *Client) readPump() {
53+
defer func() {
54+
c.hub.unregister <- c
55+
c.conn.Close()
56+
}()
57+
c.conn.SetReadLimit(maxMessageSize)
58+
c.conn.SetReadDeadline(time.Now().Add(pongWait))
59+
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
60+
for {
61+
_, message, err := c.conn.ReadMessage()
62+
if err != nil {
63+
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
64+
log.Printf("error: %v", err)
65+
}
66+
break
67+
}
68+
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
69+
c.hub.Broadcast <- message
70+
}
71+
}
72+
73+
// writePump pumps messages from the hub to the websocket connection.
74+
//
75+
// A goroutine running writePump is started for each connection. The
76+
// application ensures that there is at most one writer to a connection by
77+
// executing all writes from this goroutine.
78+
func (c *Client) writePump() {
79+
ticker := time.NewTicker(pingPeriod)
80+
defer func() {
81+
ticker.Stop()
82+
c.conn.Close()
83+
}()
84+
for {
85+
select {
86+
case message, ok := <-c.send:
87+
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
88+
if !ok {
89+
// The hub closed the channel.
90+
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
91+
return
92+
}
93+
94+
w, err := c.conn.NextWriter(websocket.TextMessage)
95+
if err != nil {
96+
return
97+
}
98+
w.Write(message)
99+
100+
// Add queued chat messages to the current websocket message.
101+
n := len(c.send)
102+
for i := 0; i < n; i++ {
103+
w.Write(newline)
104+
w.Write(<-c.send)
105+
}
106+
107+
if err := w.Close(); err != nil {
108+
return
109+
}
110+
case <-ticker.C:
111+
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
112+
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
113+
return
114+
}
115+
}
116+
}
117+
}
118+
119+
// ServeSocket handles a websocket request from a client and registers the client with the socket hub
120+
func ServeSocket(hub *Hub, w http.ResponseWriter, r *http.Request) {
121+
conn, err := upgrader.Upgrade(w, r, nil)
122+
if err != nil {
123+
log.Println(err)
124+
return
125+
}
126+
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
127+
client.hub.register <- client
128+
129+
// Allow collection of memory referenced by the caller by doing all work in
130+
// new goroutines.
131+
go client.writePump()
132+
go client.readPump()
133+
}

server/socket/hub.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package socket
2+
3+
// Hub maintains the set of active clients and enables messages to be broadcast to clients
4+
type Hub struct {
5+
// Registered clients
6+
Clients map[*Client]bool
7+
8+
// a channel to broadcast messages to clients
9+
Broadcast chan []byte
10+
11+
// Register requests from clients
12+
register chan *Client
13+
14+
// Unregister requests from clients
15+
unregister chan *Client
16+
}
17+
18+
func NewSocketHub() *Hub {
19+
hub := &Hub{
20+
Clients: make(map[*Client]bool),
21+
Broadcast: make(chan []byte),
22+
register: make(chan *Client),
23+
unregister: make(chan *Client),
24+
}
25+
go hub.run()
26+
return hub
27+
}
28+
29+
func (h *Hub) run() {
30+
for {
31+
select {
32+
case client := <-h.register:
33+
h.Clients[client] = true
34+
case client := <-h.unregister:
35+
if _, ok := h.Clients[client]; ok {
36+
delete(h.Clients, client)
37+
close(client.send)
38+
}
39+
case message := <-h.Broadcast:
40+
for client := range h.Clients {
41+
select {
42+
case client.send <- message:
43+
default:
44+
close(client.send)
45+
delete(h.Clients, client)
46+
}
47+
}
48+
}
49+
}
50+
}

0 commit comments

Comments
 (0)