forked from jackc/pgxlisten
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpgxlisten.go
More file actions
198 lines (167 loc) · 6.85 KB
/
pgxlisten.go
File metadata and controls
198 lines (167 loc) · 6.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
// Package pgxlisten provides higher level PostgreSQL LISTEN / NOTIFY tooling built on pgx.
package pgxlisten
import (
"context"
"errors"
"fmt"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
)
const (
defaultKeepaliveTimeout = 30 * time.Second
)
// Listener connects to a PostgreSQL server, listens for notifications, and dispatches them to handlers based on
// channel.
type Listener struct {
// Connect establishes or otherwise gets a connection for the exclusive use of the Listener. Listener takes
// responsibility for closing any connection it receives. Connect is required.
Connect func(ctx context.Context) (*pgx.Conn, error)
// LogError is called by Listen when a non-fatal error occurs. Most errors are non-fatal. For example, a database
// connection failure is considered non-fatal as it may be due to a temporary outage and the connection should be
// attempted again later. LogError is optional.
LogError func(context.Context, error)
LogDebug func(context.Context, string)
// ReconnectDelay configures the amount of time to wait before reconnecting in case the connection to the database
// is lost. If set to 0, the default of 1 minute is used. A negative value disables the timeout entirely.
ReconnectDelay time.Duration
handlers map[string]Handler
KeepaliveTimeout time.Duration
}
func (l *Listener) keepaliveTime() time.Duration {
if l.KeepaliveTimeout == 0 {
return defaultKeepaliveTimeout
}
return l.KeepaliveTimeout
}
// Handle sets the handler for notifications sent to channel.
func (l *Listener) Handle(channel string, handler Handler) {
if l.handlers == nil {
l.handlers = make(map[string]Handler)
}
l.handlers[channel] = handler
}
// Listen listens for and handles notifications. It will only return when ctx is cancelled or a fatal error occurs.
// Because Listen is intended to continue running even when there is a network or database outage most errors are not
// considered fatal. For example, if connecting to the database fails it will wait a while and try to reconnect.
func (l *Listener) Listen(ctx context.Context) error {
if l.Connect == nil {
return errors.New("Listen: Connect is nil")
}
if l.handlers == nil {
return errors.New("Listen: No handlers")
}
reconnectDelay := time.Minute
if l.ReconnectDelay != 0 {
reconnectDelay = l.ReconnectDelay
}
for {
err := l.listen(ctx)
if err != nil {
l.logError(ctx, err)
}
if reconnectDelay < 0 {
if err := ctx.Err(); err != nil {
return err
}
continue
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(reconnectDelay):
// If listenAndSendOneConn returned and ctx has not been cancelled that means there was a fatal database error.
// Wait a while to avoid busy-looping while the database is unreachable.
}
}
}
func (l *Listener) listen(ctx context.Context) error {
conn, err := l.Connect(ctx)
if err != nil {
return fmt.Errorf("connect: %w", err)
}
defer func() {
if err := conn.Close(ctx); err != nil {
l.logError(ctx, err)
}
}()
for channel, handler := range l.handlers {
_, err := conn.Exec(ctx, "listen "+pgx.Identifier{channel}.Sanitize())
if err != nil {
return fmt.Errorf("listen %q: %w", channel, err)
}
if backlogHandler, ok := handler.(BacklogHandler); ok {
err := backlogHandler.HandleBacklog(ctx, channel, conn)
if err != nil {
l.logError(ctx, fmt.Errorf("handle backlog %q: %w", channel, err))
}
}
}
for {
if err := l.waitOnce(ctx, conn); err != nil {
return err
}
}
}
// waitOnce waits for a notification or a keepalive timeout, whichever comes
// first. Note that ONLY the WaitForNotification call takes place with a
// timeout, and all other calls use the parent context. Only the Wait call
// needs a timeout here, and the rest use the parent context.
func (l *Listener) waitOnce(parentCtx context.Context, conn *pgx.Conn) error {
timedCtx, cancel := context.WithTimeout(parentCtx, l.keepaliveTime())
defer cancel()
notification, err := conn.WaitForNotification(timedCtx)
if errors.Is(err, context.DeadlineExceeded) {
if keepaliveErr := conn.Ping(parentCtx); keepaliveErr != nil {
return fmt.Errorf("keepalive failed after timeout (%w): %w", err, keepaliveErr)
}
if l.LogDebug != nil {
l.LogDebug(timedCtx, "keepalive timed out")
}
return nil
} else if err != nil {
return fmt.Errorf("waiting for notification: %w", err)
}
if handler, ok := l.handlers[notification.Channel]; ok {
err := handler.HandleNotification(parentCtx, notification, conn)
if err != nil {
l.logError(parentCtx, fmt.Errorf("handle %s notification: %w", notification.Channel, err))
}
} else {
l.logError(parentCtx, fmt.Errorf("missing handler: %s", notification.Channel))
}
return nil
}
func (l *Listener) logError(ctx context.Context, err error) {
if l.LogError != nil {
l.LogError(ctx, err)
}
}
// Handler is the interface by which notifications are handled.
type Handler interface {
// HandleNotification is synchronously called by Listener to handle a notification. If processing the notification can
// take any significant amount of time this method should process it asynchronously (e.g. via goroutine with a
// different database connection). If an error is returned it will be logged with the Listener.LogError function.
HandleNotification(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error
}
// HandlerFunc is an adapter to allow use of a function as a Handler.
type HandlerFunc func(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error
// HandleNotification calls f(ctx, notificaton, conn).
func (f HandlerFunc) HandleNotification(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error {
return f(ctx, notification, conn)
}
// BacklogHandler is an optional interface that can be implemented by a Handler to process unhandled events that
// occurred before the Listener started. For example, a simple pattern is to insert jobs into a table and to send a
// notification of the new work. When jobs are enqueued but the Listener is not running then HandleBacklog can read from
// that table and handle all jobs.
//
// To ensure that no notifications are lost the Listener starts listening before handling any backlog. This means it is
// possible for HandleBacklog to handle a notification and for HandleNotification still to be called. A Handler must be
// prepared for this situation when it is also a BacklogHandler.
type BacklogHandler interface {
// HandleBacklog is synchronously called by Listener at the beginning of Listen at process any previously queued
// messages or jobs. If processing can take any significant amount of time this method should process it
// asynchronously (e.g. via goroutine with a different database connection). If an error is returned it will be logged
// with the Listener.LogError function.
HandleBacklog(ctx context.Context, channel string, conn *pgx.Conn) error
}