Skip to content

Commit 7f41f20

Browse files
sqlutil: fix deadlock in NOTIFY listener (refactor) (#4299)
* refactor: simplify event listening setup and improve notification handling * fix: pass context to pgx.ConnectConfig in listener setup * fix: enhance error logging for context.Canceled and add resume channel in listener * limit connections during parallel tests
1 parent 856f769 commit 7f41f20

File tree

8 files changed

+96
-230
lines changed

8 files changed

+96
-230
lines changed

app/listenevents.go

Lines changed: 8 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -3,58 +3,17 @@ package app
33
import (
44
"context"
55

6-
"github.com/jackc/pgx/v5/pgconn"
7-
"github.com/pkg/errors"
86
"github.com/target/goalert/permission"
97
"github.com/target/goalert/util/log"
108
"github.com/target/goalert/util/sqlutil"
119
)
1210

13-
func (app *App) listenEvents(ctx context.Context) (<-chan struct{}, error) {
14-
l, err := sqlutil.NewListener(ctx, app.cfg.LegacyLogger, app.db, "/goalert/config-refresh")
15-
if err != nil {
16-
return nil, err
17-
}
18-
app.events = l
19-
go func() {
20-
for {
21-
select {
22-
case <-ctx.Done():
23-
return
24-
case err := <-l.Errors():
25-
log.Log(ctx, errors.Wrap(err, "listen events"))
26-
}
27-
}
28-
}()
29-
30-
doneCh := make(chan struct{})
31-
go func() {
32-
defer close(doneCh)
33-
for {
34-
var n *pgconn.Notification
35-
select {
36-
case n = <-l.Notifications():
37-
case <-ctx.Done():
38-
return
39-
}
40-
if n == nil {
41-
return
42-
}
43-
44-
log.Debugf(log.WithFields(ctx, log.Fields{
45-
"Channel": n.Channel,
46-
"PID": n.PID,
47-
"Payload": n.Payload,
48-
}), "NOTIFY")
49-
50-
switch n.Channel {
51-
case "/goalert/config-refresh":
52-
permission.SudoContext(ctx, func(ctx context.Context) {
53-
log.Log(ctx, app.ConfigStore.Reload(ctx))
54-
})
55-
}
56-
}
57-
}()
58-
59-
return doneCh, nil
11+
func (app *App) setupListenEvents() {
12+
app.events = sqlutil.NewListener(app.pgx)
13+
app.events.Handle("/goalert/config-refresh", func(ctx context.Context, payload string) error {
14+
permission.SudoContext(ctx, func(ctx context.Context) {
15+
log.Log(ctx, app.ConfigStore.Reload(ctx))
16+
})
17+
return nil
18+
})
6019
}

app/runapp.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,7 @@ func (app *App) _Run(ctx context.Context) error {
3131
}
3232
}()
3333

34-
eventCtx, cancel := context.WithCancel(ctx)
35-
defer cancel()
36-
eventDoneCh, err := app.listenEvents(eventCtx)
37-
if err != nil {
38-
return err
39-
}
34+
go app.events.Run(ctx)
4035

4136
if app.sysAPISrv != nil {
4237
app.Logger.InfoContext(ctx, "System API server started.",
@@ -63,17 +58,14 @@ func (app *App) _Run(ctx context.Context) error {
6358
slog.String("address", app.l.Addr().String()),
6459
slog.String("url", app.ConfigStore.Config().PublicURL()),
6560
)
66-
err = app.srv.Serve(app.l)
61+
err := app.srv.Serve(app.l)
6762
if err != nil && !errors.Is(err, http.ErrServerClosed) {
6863
return errors.Wrap(err, "serve HTTP")
6964
}
7065
if app.hSrv != nil {
7166
app.hSrv.Resume()
7267
}
7368

74-
select {
75-
case <-eventDoneCh:
76-
case <-ctx.Done():
77-
}
69+
<-ctx.Done()
7870
return nil
7971
}

app/startup.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,5 +117,7 @@ func (app *App) startup(ctx context.Context) error {
117117
app.Logger.InfoContext(ctx, "SWO Enabled.")
118118
}
119119

120+
app.setupListenEvents()
121+
120122
return nil
121123
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ require (
2020
github.com/hashicorp/yamux v0.1.2
2121
github.com/jackc/pgtype v1.14.4
2222
github.com/jackc/pgx/v5 v5.7.2
23+
github.com/jackc/pgxlisten v0.0.0-20241106001234-1d6f6656415c
2324
github.com/jmespath/go-jmespath v0.4.0
2425
github.com/joho/godotenv v1.5.1
2526
github.com/kffl/speedbump v1.1.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ github.com/jackc/pgx/v4 v4.18.2 h1:xVpYkNR5pk5bMCZGfClbO962UIqVABcAGt7ha1s/FeU=
206206
github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw=
207207
github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI=
208208
github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
209+
github.com/jackc/pgxlisten v0.0.0-20241106001234-1d6f6656415c h1:bTgmg761ac9Ki27HoLx8IBvc+T+Qj6eptBpKahKIRT4=
210+
github.com/jackc/pgxlisten v0.0.0-20241106001234-1d6f6656415c/go.mod h1:N4E1APLOYrbM11HH5kdqAjDa8RJWVwD3JqWpvH22h64=
209211
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
210212
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
211213
github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=

test/smoke/harness/harness.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ func (h *Harness) Start() {
306306
appCfg.JSON = true
307307
appCfg.DBURL = h.dbURL
308308
appCfg.TwilioBaseURL = h.twS.URL
309-
appCfg.DBMaxOpen = 5
309+
appCfg.DBMaxOpen = 3
310310
appCfg.SlackBaseURL = h.slackS.URL
311311
appCfg.SMTPListenAddr = "localhost:0"
312312
appCfg.EmailIntegrationDomain = "smoketest.example.com"
@@ -325,7 +325,7 @@ func (h *Harness) Start() {
325325
if err != nil {
326326
h.t.Fatalf("failed to parse db url: %v", err)
327327
}
328-
poolCfg.MaxConns = 5
328+
poolCfg.MaxConns = 3
329329

330330
h.appPool, err = pgxpool.NewWithConfig(ctx, poolCfg)
331331
require.NoError(h.t, err, "create pgx pool")

util/log/log.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,13 @@ func (l *Logger) Error(ctx context.Context, err error) {
144144
}
145145

146146
ctx = l.addSource(ctx, err)
147-
l.entry(ctx).WithError(err).Errorln()
147+
lg := l.entry(ctx).WithError(err)
148+
if errors.Is(err, context.Canceled) {
149+
lg.Debugln()
150+
return
151+
}
152+
153+
lg.Errorln()
148154
}
149155

150156
// Logf will log application information.

0 commit comments

Comments
 (0)