Skip to content

Commit 45a6dd1

Browse files
committed
Add utility package for graceful shutdown context
Signed-off-by: sami <sami@appscode.com>
1 parent d8292ec commit 45a6dd1

File tree

4 files changed

+40
-31
lines changed

4 files changed

+40
-31
lines changed

cmd/pgoutbox/main.go

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package main
1818

1919
import (
20-
"context"
2120
"encoding/binary"
2221
"fmt"
2322
"log/slog"
@@ -33,6 +32,7 @@ import (
3332
"kubeops.dev/pgoutbox/internal/telemetry"
3433

3534
"github.com/urfave/cli/v2"
35+
"kubeops.dev/pgoutbox/internal/utils"
3636
)
3737

3838
// GetVersion returns latest git hash of commit.
@@ -92,19 +92,8 @@ func main() {
9292
return fmt.Errorf("initialize telemetry: %w", err)
9393
}
9494
defer func() {
95-
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
95+
shutdownCtx, cancel := utils.ShutdownCtx(5 * time.Second)
9696
defer cancel()
97-
98-
// ref: https://github.com/appscode-cloud/cloud-be/blob/79fd250eeb79d47abdd2d38525a0b9b21e1920cf/common/util/signal.go
99-
shutdownHandler := make(chan os.Signal, 2)
100-
signal.Notify(shutdownHandler, os.Interrupt, syscall.SIGTERM)
101-
go func() {
102-
<-shutdownHandler
103-
cancel()
104-
<-shutdownHandler
105-
os.Exit(1) // force exit upon receiving second signal
106-
}()
107-
10897
if err := telemetry.Shutdown(shutdownCtx); err != nil {
10998
slog.Error("telemetry shutdown failed", "err", err)
11099
}

internal/listener/listener.go

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"strconv"
1212
"sync"
1313
"sync/atomic"
14-
"syscall"
1514
"time"
1615

1716
"kubeops.dev/pgoutbox/apis"
@@ -20,6 +19,7 @@ import (
2019
"github.com/jackc/pglogrepl"
2120
"github.com/prometheus/client_golang/prometheus/promhttp"
2221
"golang.org/x/sync/errgroup"
22+
"kubeops.dev/pgoutbox/internal/utils"
2323
)
2424

2525
// Logical decoding plugin.
@@ -136,19 +136,9 @@ func (l *Listener) InitHandlers(ctx context.Context) {
136136

137137
<-ctx.Done()
138138

139-
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
139+
shutdownCtx, cancel := utils.ShutdownCtx(5 * time.Second)
140140
defer cancel()
141141

142-
// ref: https://github.com/appscode-cloud/cloud-be/blob/79fd250eeb79d47abdd2d38525a0b9b21e1920cf/common/util/signal.go
143-
shutdownHandler := make(chan os.Signal, 2)
144-
signal.Notify(shutdownHandler, os.Interrupt, syscall.SIGTERM)
145-
go func() {
146-
<-shutdownHandler
147-
cancel()
148-
<-shutdownHandler
149-
os.Exit(1) // force exit upon receiving second signal
150-
}()
151-
152142
if err := srv.Shutdown(shutdownCtx); err != nil {
153143
l.log.Error("http server shutdown error", "err", err)
154144
}
@@ -539,7 +529,7 @@ func (l *Listener) readLSN() pglogrepl.LSN {
539529
func (l *Listener) setLSN(ctx context.Context, lsn pglogrepl.LSN) {
540530
l.mu.Lock()
541531
defer l.mu.Unlock()
542-
defer l.monitor.RecordLSN(ctx, int64(l.lsn))
543532

544533
l.lsn = lsn
534+
l.monitor.RecordLSN(ctx, int64(l.lsn))
545535
}

internal/listener/listener_test.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ func TestListener_Stream(t *testing.T) {
539539
tt.setup()
540540

541541
ctx, cancel := context.WithTimeout(context.Background(), tt.args.timeout)
542-
defer cancel()
542+
_ = cancel
543543

544544
w := &Listener{
545545
log: logger,
@@ -643,7 +643,7 @@ func TestListener_Process(t *testing.T) {
643643
setup: func() {
644644
var cancel context.CancelFunc
645645
ctx, cancel = context.WithTimeout(ctx, time.Millisecond*200)
646-
defer cancel()
646+
_ = cancel
647647

648648
setIsReplicationActive("slot1", false, nil)
649649

@@ -681,7 +681,8 @@ func TestListener_Process(t *testing.T) {
681681
setup: func() {
682682
var cancel context.CancelFunc
683683
ctx, cancel = context.WithTimeout(ctx, time.Millisecond*20)
684-
defer cancel()
684+
_ = cancel
685+
685686
setCreatePublication("pgoutbox", errors.New("some err"))
686687
setGetSlotLSN("slot1", "100/200", nil)
687688
setStartReplication(
@@ -716,7 +717,8 @@ func TestListener_Process(t *testing.T) {
716717
setup: func() {
717718
var cancel context.CancelFunc
718719
ctx, cancel = context.WithTimeout(ctx, time.Millisecond*20)
719-
defer cancel()
720+
_ = cancel
721+
720722
setCreatePublication("pgoutbox", nil)
721723
setGetSlotLSN("slot1", "100/200", errors.New("some err"))
722724
},
@@ -739,7 +741,8 @@ func TestListener_Process(t *testing.T) {
739741
setup: func() {
740742
var cancel context.CancelFunc
741743
ctx, cancel = context.WithTimeout(ctx, time.Millisecond*20)
742-
defer cancel()
744+
_ = cancel
745+
743746
setCreatePublication("pgoutbox", nil)
744747
setGetSlotLSN("slot1", "", nil)
745748
setCreateReplicationSlot(

internal/utils/shutdown_ctx.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package utils
2+
3+
import (
4+
"context"
5+
"os"
6+
"os/signal"
7+
"syscall"
8+
"time"
9+
)
10+
11+
// ShutdownCtx returns a context with timeout that is cancelled on SIGINT or SIGTERM signals.
12+
// With a forced shutdown upon receiving a second signal.
13+
// ref: https://github.com/appscode-cloud/cloud-be/blob/79fd250eeb79d47abdd2d38525a0b9b21e1920cf/common/util/signal.go
14+
func ShutdownCtx(timeout time.Duration) (context.Context, context.CancelFunc) {
15+
shutdownCtx, cancel := context.WithTimeout(context.Background(), timeout)
16+
17+
shutdownHandler := make(chan os.Signal, 2)
18+
signal.Notify(shutdownHandler, os.Interrupt, syscall.SIGTERM)
19+
go func() {
20+
<-shutdownHandler
21+
cancel()
22+
<-shutdownHandler
23+
os.Exit(1) // force exit upon receiving second signal
24+
}()
25+
26+
return shutdownCtx, cancel
27+
}

0 commit comments

Comments
 (0)