Skip to content
This repository was archived by the owner on Dec 29, 2025. It is now read-only.

Commit 18ab1f2

Browse files
authored
Revert "Improve shutdown handling from different replicator types (#106)" (#108)
This reverts commit 864b9d1.
1 parent 864b9d1 commit 18ab1f2

File tree

14 files changed

+299
-1150
lines changed

14 files changed

+299
-1150
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ on:
88
jobs:
99
lint:
1010
runs-on: ubuntu-latest
11-
timeout-minutes: 10
1211
steps:
1312
- uses: actions/checkout@v4
1413

@@ -21,12 +20,10 @@ jobs:
2120
run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v2.4.0
2221

2322
- name: Lint
24-
timeout-minutes: 5
2523
run: make lint
2624

2725
test:
2826
runs-on: ubuntu-latest
29-
timeout-minutes: 15
3027
steps:
3128
- uses: actions/checkout@v4
3229

@@ -35,13 +32,11 @@ jobs:
3532
with:
3633
go-version: "1.24"
3734

38-
- name: Test with race detection
39-
timeout-minutes: 10
35+
- name: Test
4036
run: make test
4137
build:
4238
needs: [lint, test]
4339
runs-on: ubuntu-latest
44-
timeout-minutes: 10
4540
steps:
4641
- uses: actions/checkout@v4
4742

@@ -51,8 +46,7 @@ jobs:
5146
go-version: "1.24"
5247

5348
- name: Build
54-
timeout-minutes: 5
55-
run: make build-ci
49+
run: make build
5650

5751
- name: Set up QEMU
5852
uses: docker/setup-qemu-action@v3

.github/workflows/integration.yml

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,15 @@ permissions:
1212
jobs:
1313
build:
1414
runs-on: ubuntu-latest
15-
timeout-minutes: 10
1615
steps:
1716
- uses: actions/checkout@v4
1817

1918
- name: Set up Go
2019
uses: actions/setup-go@v4
2120
with:
22-
go-version: "1.24"
21+
go-version: "1.21"
2322

2423
- name: Build
25-
timeout-minutes: 5
2624
run: make build
2725

2826
- name: Upload binary
@@ -34,7 +32,6 @@ jobs:
3432
tests:
3533
needs: build
3634
runs-on: ubuntu-latest
37-
timeout-minutes: 30
3835
strategy:
3936
fail-fast: false
4037
matrix:
@@ -72,7 +69,6 @@ jobs:
7269
sudo curl -L "https://github.com/docker/compose/releases/download/v2.17.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
7370
sudo chmod +x /usr/local/bin/docker-compose
7471
- name: Run test
75-
timeout-minutes: 25
7672
env:
7773
PG_HOST: localhost
7874
PG_PORT: 5433
@@ -88,8 +84,8 @@ jobs:
8884
docker-compose -f internal/docker-compose.yml up -d
8985
sleep 10
9086
if [[ "${{ matrix.test }}" == "order" || "${{ matrix.test }}" == "resume" || "${{ matrix.test }}" == "postgres_uniqueness" ]]; then
91-
timeout 20m ruby ./internal/scripts/e2e_${{ matrix.test }}_test.rb
87+
ruby ./internal/scripts/e2e_${{ matrix.test }}_test.rb
9288
else
93-
timeout 20m ./internal/scripts/e2e_${{ matrix.test }}.sh
89+
./internal/scripts/e2e_${{ matrix.test }}.sh
9490
fi
9591
docker-compose -f internal/docker-compose.yml down -v

Makefile

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,23 @@
1-
.PHONY: test lint build build-ci build-prod clean check test-short
1+
.PHONY: test lint build clean
22

3-
.DEFAULT_GOAL := build-prod
3+
# Define the default goal
4+
.DEFAULT_GOAL := build
45

5-
build: build-prod
6-
7-
build-ci:
8-
go build -race -o bin/pg_flo
9-
10-
build-prod:
6+
# Build the application
7+
build:
118
go build -o bin/pg_flo
129

10+
# Run tests with race detection and coverage
1311
test:
1412
go test -v -race -coverprofile=coverage.txt -covermode=atomic ./...
1513

16-
test-short:
17-
go test -v -race -short -timeout=5m ./...
18-
14+
# Run linter
1915
lint:
2016
golangci-lint run --timeout=5m
2117

18+
# Clean build artifacts
2219
clean:
2320
rm -rf bin/ coverage.txt
2421

22+
# Run all checks (lint and test)
2523
check: lint test

cmd/root.go

Lines changed: 7 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -428,43 +428,18 @@ func runWorker(cmd *cobra.Command, _ []string) {
428428
sigCh := make(chan os.Signal, 1)
429429
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
430430

431-
errCh := make(chan error, 1)
432431
go func() {
433-
errCh <- w.Start(ctx)
432+
<-sigCh
433+
cancel()
434434
}()
435435

436436
log.Info().Msg("Starting worker...")
437-
438-
select {
439-
case sig := <-sigCh:
440-
log.Info().Str("signal", sig.String()).Msg("Received shutdown signal")
441-
442-
// Create a new context with timeout for graceful shutdown
443-
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
444-
defer shutdownCancel()
445-
446-
// Cancel the main context first to trigger graceful shutdown
447-
cancel()
448-
449-
// Wait for worker to finish gracefully or timeout
450-
select {
451-
case err := <-errCh:
452-
if err == context.Canceled {
453-
log.Info().Msg("Worker shut down gracefully")
454-
} else {
455-
log.Error().Err(err).Msg("Worker encountered an error during shutdown")
456-
}
457-
case <-shutdownCtx.Done():
458-
log.Error().Msg("Worker shutdown timeout exceeded")
459-
os.Exit(1)
460-
}
461-
462-
case err := <-errCh:
463-
if err != nil {
464-
log.Error().Err(err).Msg("Worker error occurred")
465-
os.Exit(1)
437+
if err := w.Start(ctx); err != nil {
438+
if err == context.Canceled {
439+
log.Info().Msg("Worker shut down gracefully")
440+
} else {
441+
log.Error().Err(err).Msg("Worker encountered an error during shutdown")
466442
}
467-
log.Info().Msg("Worker completed successfully")
468443
}
469444
}
470445

internal/scripts/e2e_postgres_data_type.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ set -euo pipefail
44
source "$(dirname "$0")/e2e_common.sh"
55

66
create_test_table() {
7-
log "Creating test table with various data types including array testing..."
7+
log "Creating test table with various data types including comprehensive array testing..."
88

99
# Create required extensions
1010
run_sql "CREATE EXTENSION IF NOT EXISTS hstore;"

internal/scripts/e2e_test_local.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ cleanup() {
2929

3030
trap cleanup EXIT
3131

32-
make build-ci
32+
make build
3333

3434
setup_docker
3535

pkg/pgflonats/pgflonats.go

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -49,21 +49,13 @@ func NewNATSClient(url, stream, group string) (*NATSClient, error) {
4949
nats.MaxReconnects(-1),
5050
nats.ReconnectWait(time.Second),
5151
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
52-
if err != nil {
53-
fmt.Printf("Disconnected due to: %v, will attempt reconnects\n", err)
54-
} else {
55-
fmt.Printf("Disconnected, will attempt reconnects\n")
56-
}
52+
fmt.Printf("Disconnected due to: %s, will attempt reconnects\n", err)
5753
}),
5854
nats.ReconnectHandler(func(nc *nats.Conn) {
5955
fmt.Printf("Reconnected [%s]\n", nc.ConnectedUrl())
6056
}),
6157
nats.ClosedHandler(func(nc *nats.Conn) {
62-
if lastErr := nc.LastError(); lastErr != nil {
63-
fmt.Printf("Exiting: %v\n", lastErr)
64-
} else {
65-
fmt.Printf("Exiting: connection closed\n")
66-
}
58+
fmt.Printf("Exiting: %v\n", nc.LastError())
6759
}),
6860
)
6961
if err != nil {
@@ -121,22 +113,9 @@ func (nc *NATSClient) PublishMessage(subject string, data []byte) error {
121113
return nil
122114
}
123115

124-
// Close gracefully closes the NATS connection by draining pending messages.
116+
// Close closes the NATS connection.
125117
func (nc *NATSClient) Close() error {
126-
if nc.conn == nil {
127-
return nil
128-
}
129-
130-
if nc.conn.IsClosed() {
131-
return nil
132-
}
133-
134-
err := nc.conn.Drain()
135-
if err != nil {
136-
nc.conn.Close()
137-
return fmt.Errorf("failed to drain NATS connection: %w", err)
138-
}
139-
118+
nc.conn.Close()
140119
return nil
141120
}
142121

pkg/pgflonats/pgflonats_test.go

Lines changed: 0 additions & 150 deletions
This file was deleted.

0 commit comments

Comments
 (0)