Skip to content

Commit b66f134

Browse files
committed
Add utility package for graceful shutdown context
Signed-off-by: sami <sami@appscode.com>
1 parent d8292ec commit b66f134

File tree

4 files changed

+77
-41
lines changed

4 files changed

+77
-41
lines changed

cmd/pgoutbox/main.go

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,11 @@ limitations under the License.
1717
package main
1818

1919
import (
20-
"context"
2120
"encoding/binary"
2221
"fmt"
2322
"log/slog"
2423
"os"
25-
"os/signal"
2624
"runtime/debug"
27-
"syscall"
2825
"time"
2926

3027
"kubeops.dev/pgoutbox/apis"
@@ -33,6 +30,7 @@ import (
3330
"kubeops.dev/pgoutbox/internal/telemetry"
3431

3532
"github.com/urfave/cli/v2"
33+
"kubeops.dev/pgoutbox/internal/util"
3634
)
3735

3836
// GetVersion returns latest git hash of commit.
@@ -73,7 +71,7 @@ func main() {
7371
},
7472
},
7573
Action: func(c *cli.Context) error {
76-
ctx, cancel := signal.NotifyContext(c.Context, syscall.SIGINT, syscall.SIGTERM)
74+
ctx, cancel := util.SetupSignalContext()
7775
defer cancel()
7876

7977
cfg, err := apis.InitConfig(c.String("config"))
@@ -91,24 +89,6 @@ func main() {
9189
if err = telemetry.InitMetrics(ctx, version); err != nil {
9290
return fmt.Errorf("initialize telemetry: %w", err)
9391
}
94-
defer func() {
95-
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
96-
defer cancel()
97-
98-
// ref: https://github.com/appscode-cloud/cloud-be/blob/79fd250eeb79d47abdd2d38525a0b9b21e1920cf/common/util/signal.go
99-
shutdownHandler := make(chan os.Signal, 2)
100-
signal.Notify(shutdownHandler, os.Interrupt, syscall.SIGTERM)
101-
go func() {
102-
<-shutdownHandler
103-
cancel()
104-
<-shutdownHandler
105-
os.Exit(1) // force exit upon receiving second signal
106-
}()
107-
108-
if err := telemetry.Shutdown(shutdownCtx); err != nil {
109-
slog.Error("telemetry shutdown failed", "err", err)
110-
}
111-
}()
11292
}
11393

11494
pgxConn, pgConn, err := initPgxConnections(cfg.Database, logger, time.Minute*10)

internal/listener/listener.go

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"strconv"
1212
"sync"
1313
"sync/atomic"
14-
"syscall"
1514
"time"
1615

1716
"kubeops.dev/pgoutbox/apis"
@@ -20,6 +19,7 @@ import (
2019
"github.com/jackc/pglogrepl"
2120
"github.com/prometheus/client_golang/prometheus/promhttp"
2221
"golang.org/x/sync/errgroup"
22+
"kubeops.dev/pgoutbox/internal/telemetry"
2323
)
2424

2525
// Logical decoding plugin.
@@ -136,22 +136,16 @@ func (l *Listener) InitHandlers(ctx context.Context) {
136136

137137
<-ctx.Done()
138138

139-
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
139+
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
140140
defer cancel()
141141

142-
// ref: https://github.com/appscode-cloud/cloud-be/blob/79fd250eeb79d47abdd2d38525a0b9b21e1920cf/common/util/signal.go
143-
shutdownHandler := make(chan os.Signal, 2)
144-
signal.Notify(shutdownHandler, os.Interrupt, syscall.SIGTERM)
145-
go func() {
146-
<-shutdownHandler
147-
cancel()
148-
<-shutdownHandler
149-
os.Exit(1) // force exit upon receiving second signal
150-
}()
151-
152142
if err := srv.Shutdown(shutdownCtx); err != nil {
153143
l.log.Error("http server shutdown error", "err", err)
154144
}
145+
146+
if err := telemetry.Shutdown(shutdownCtx); err != nil {
147+
l.log.Error("telemetry shutdown error", "err", err)
148+
}
155149
}
156150

157151
const contentTypeTextPlain = "text/plain"
@@ -406,7 +400,6 @@ func (l *Listener) processRawMessage(ctx context.Context, rawMsg []byte, txWAL *
406400
return fmt.Errorf("publish: %w", err)
407401
}
408402
l.monitor.RecordPublishDuration(ctx, time.Since(publishStart).Seconds(), subjectName)
409-
410403
l.monitor.IncPublishedEvents(ctx, subjectName, event.Table)
411404

412405
l.log.Info(
@@ -539,7 +532,7 @@ func (l *Listener) readLSN() pglogrepl.LSN {
539532
func (l *Listener) setLSN(ctx context.Context, lsn pglogrepl.LSN) {
540533
l.mu.Lock()
541534
defer l.mu.Unlock()
542-
defer l.monitor.RecordLSN(ctx, int64(l.lsn))
543535

544536
l.lsn = lsn
537+
l.monitor.RecordLSN(ctx, int64(l.lsn))
545538
}

internal/listener/listener_test.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ func TestListener_Stream(t *testing.T) {
539539
tt.setup()
540540

541541
ctx, cancel := context.WithTimeout(context.Background(), tt.args.timeout)
542-
defer cancel()
542+
_ = cancel
543543

544544
w := &Listener{
545545
log: logger,
@@ -643,7 +643,7 @@ func TestListener_Process(t *testing.T) {
643643
setup: func() {
644644
var cancel context.CancelFunc
645645
ctx, cancel = context.WithTimeout(ctx, time.Millisecond*200)
646-
defer cancel()
646+
_ = cancel
647647

648648
setIsReplicationActive("slot1", false, nil)
649649

@@ -681,7 +681,8 @@ func TestListener_Process(t *testing.T) {
681681
setup: func() {
682682
var cancel context.CancelFunc
683683
ctx, cancel = context.WithTimeout(ctx, time.Millisecond*20)
684-
defer cancel()
684+
_ = cancel
685+
685686
setCreatePublication("pgoutbox", errors.New("some err"))
686687
setGetSlotLSN("slot1", "100/200", nil)
687688
setStartReplication(
@@ -716,7 +717,8 @@ func TestListener_Process(t *testing.T) {
716717
setup: func() {
717718
var cancel context.CancelFunc
718719
ctx, cancel = context.WithTimeout(ctx, time.Millisecond*20)
719-
defer cancel()
720+
_ = cancel
721+
720722
setCreatePublication("pgoutbox", nil)
721723
setGetSlotLSN("slot1", "100/200", errors.New("some err"))
722724
},
@@ -739,7 +741,8 @@ func TestListener_Process(t *testing.T) {
739741
setup: func() {
740742
var cancel context.CancelFunc
741743
ctx, cancel = context.WithTimeout(ctx, time.Millisecond*20)
742-
defer cancel()
744+
_ = cancel
745+
743746
setCreatePublication("pgoutbox", nil)
744747
setGetSlotLSN("slot1", "", nil)
745748
setCreateReplicationSlot(

internal/util/signal.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// ref: https://github.com/appscode-cloud/cloud-be/blob/79fd250eeb79d47abdd2d38525a0b9b21e1920cf/common/util/signal.go
2+
3+
package util
4+
5+
import (
6+
"context"
7+
"os"
8+
"os/signal"
9+
"syscall"
10+
)
11+
12+
var (
13+
onlyOneSignalHandler = make(chan struct{})
14+
shutdownHandler chan os.Signal
15+
shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
16+
)
17+
18+
// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
19+
// which is closed on one of these signals. If a second signal is caught, the program
20+
// is terminated with exit code 1.
21+
// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can
22+
// be called once.
23+
func SetupSignalHandler() <-chan struct{} {
24+
c, _ := SetupSignalContext()
25+
return c.Done()
26+
}
27+
28+
// SetupSignalContext is the same as SetupSignalHandler, but a context.Context is returned.
29+
// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can
30+
// be called once.
31+
func SetupSignalContext() (context.Context, context.CancelFunc) {
32+
close(onlyOneSignalHandler) // panics when called twice
33+
34+
shutdownHandler = make(chan os.Signal, 2)
35+
36+
ctx, cancel := context.WithCancel(context.Background())
37+
signal.Notify(shutdownHandler, shutdownSignals...)
38+
go func() {
39+
<-shutdownHandler
40+
cancel()
41+
<-shutdownHandler
42+
os.Exit(1) // second signal. Exit directly.
43+
}()
44+
45+
return ctx, cancel
46+
}
47+
48+
// RequestShutdown emulates a received event that is considered as shutdown signal (SIGTERM/SIGINT)
49+
// This returns whether a handler was notified
50+
func RequestShutdown() bool {
51+
if shutdownHandler != nil {
52+
select {
53+
case shutdownHandler <- shutdownSignals[0]:
54+
return true
55+
default:
56+
}
57+
}
58+
59+
return false
60+
}

0 commit comments

Comments
 (0)