Skip to content

Commit 2cc3f59

Browse files
tamalsahaImtiaz246
andauthored
Migrate from github.com/jackc/pgx to github.com/jackc/pglogrepl (#12)
* Migrate from github.com/jackc/pgx to github.com/jackc/pglogrepl This commit migrates the PostgreSQL logical replication implementation from the legacy pgx v3 API to the modern pglogrepl library with pgx/v5. ## Changes ### Dependencies (go.mod) - Replaced github.com/jackc/pgx v3.6.2+incompatible with: - github.com/jackc/pglogrepl v0.0.0-20251213150135-2e8d0df862c1 - github.com/jackc/pgx/v5 v5.7.4 - Removed obsolete indirect dependencies (cockroachdb/apd, jackc/fake, etc.) ### Main Application (cmd/pgoutbox/init.go) - Created replicationConn wrapper struct that adapts *pgconn.PgConn to the replication interface expected by the listener - Wrapper implements: CreateReplicationSlot, DropReplicationSlot, StartReplication, ReceiveMessage, SendStandbyStatusUpdate, IsAlive, Close - Updated imports to use pgx/v5 and pglogrepl packages ### Listener (internal/listener/listener.go) - Changed lsn field type from uint64 to pglogrepl.LSN - Updated replication interface to use pglogrepl types: - CreateReplicationSlot returns pglogrepl.CreateReplicationSlotResult - StartReplication uses pglogrepl.LSN and pglogrepl.StartReplicationOptions - SendStandbyStatusUpdate uses pglogrepl.StandbyStatusUpdate - ReceiveMessage returns raw []byte instead of structured messages - Rewrote processRawMessage to handle raw byte messages using pglogrepl.ParseXLogData and pglogrepl.ParsePrimaryKeepaliveMessage - Updated SendStandbyStatus and AckWalMessage to take context parameter ### Repository (internal/listener/repository.go) - Updated to pgx/v5 API (QueryRow instead of QueryRowEx) - Close now takes context parameter - IsAlive uses IsClosed() method ### Tests - Updated all mock implementations for new interface signatures - Rewrote test helper functions to use pglogrepl types - Simplified test cases by removing obsolete NewStandbyStatus calls All listener tests pass. --------- Signed-off-by: Tamal Saha <tamal@appscode.com> Signed-off-by: Imtiaz Uddin <imtiaz@appscode.com> Co-authored-by: Imtiaz Uddin <imtiaz@appscode.com>
1 parent 2b67fe1 commit 2cc3f59

File tree

324 files changed

+35158
-26758
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

324 files changed

+35158
-26758
lines changed

.github/copilot-instructions.md

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# pgoutbox - AI Coding Instructions
2+
3+
## Architecture & Data Flow
4+
5+
**pgoutbox** captures PostgreSQL changes via logical decoding and publishes to message brokers (NATS, Kafka, RabbitMQ, Google Pub/Sub).
6+
7+
```
8+
PostgreSQL WAL → pgx replication → Parser.ParseWalMessage() → WAL object → filter → Event → Publisher.Publish() → broker
9+
```
10+
11+
### Key Components
12+
- [internal/listener/listener.go](../internal/listener/listener.go) - Replication orchestration, slot/publication lifecycle
13+
- [internal/listener/transaction/parser.go](../internal/listener/transaction/parser.go) - WAL binary protocol decoding
14+
- [internal/publisher/](../internal/publisher/) - Broker adapters implementing `eventPublisher` interface
15+
- [apis/config.go](../apis/config.go) - Config types with Viper loading (env prefix: `WAL_`)
16+
- [apis/event.go](../apis/event.go) - Event struct with `SubjectName()` for topic construction
17+
18+
## Developer Workflows
19+
20+
```bash
21+
make build # Build for current OS/ARCH (Docker containerized)
22+
make test # Unit tests with race detector
23+
make lint # golangci-lint with gofmt, goimports, unparam
24+
make ci # verify + lint + build + unit-tests
25+
make fmt # Format source code
26+
```
27+
28+
All commands run in Docker (`ghcr.io/appscode/golang-dev:1.24`). Output: `bin/pgoutbox-{OS}-{ARCH}`.
29+
30+
**Local PostgreSQL**: `docker/docker-compose.yml` + `docker/scripts/` for test DB setup.
31+
32+
## Critical Patterns
33+
34+
### Interface-Based Mocking
35+
Listener depends on interfaces (`eventPublisher`, `parser`, `replication`, `repository`). Mock implementations in `*_mock_test.go` files:
36+
```go
37+
// internal/listener/listener.go
38+
type eventPublisher interface {
39+
Publish(context.Context, string, *apis.Event) error
40+
}
41+
```
42+
43+
### Configuration
44+
- YAML config + environment overrides (prefix `WAL_`, e.g., `WAL_DATABASE_HOST`)
45+
- Struct tags: both `mapstructure` and `json` required
46+
- Publisher types: `nats`, `kafka`, `rabbitmq`, `google_pubsub`
47+
48+
### Topic Naming
49+
Format: `{publisher.topic}.{publisher.topicPrefix}schemas.{schema}.tables.{table}`
50+
Override via `listener.topicsMap` in config.
51+
52+
### Logging
53+
Use `log/slog` with injected `*slog.Logger`:
54+
```go
55+
l.log.Debug("message", slog.String("key", value))
56+
```
57+
58+
## Common Tasks
59+
60+
| Task | Files to Modify |
61+
|------|-----------------|
62+
| Add publisher | `internal/publisher/{broker}.go` (implement `eventPublisher`), `cmd/pgoutbox/init.go` |
63+
| Add filter logic | `apis/config.go` (`FilterStruct`), `internal/listener/transaction/parser.go` |
64+
| Extend WAL parsing | `internal/listener/transaction/parser.go`, `transaction/data.go` |
65+
| Add metrics | `apis/metrics.go` (Prometheus counters) |
66+
67+
## Standards
68+
69+
- **License**: Apache 2.0 headers required (see `hack/license/`)
70+
- **Errors**: Wrap with `fmt.Errorf("context: %w", err)`
71+
- **Publication name**: Hardcoded as `_pgoutbox_`
72+
- **Replication slot**: Configured via `listener.slotName`, auto-created on startup

.github/workflows/ci.yml

Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,25 @@
1-
name: CI
1+
name: ci
22

33
on:
4-
pull_request:
5-
branches:
6-
- "*"
74
push:
8-
branches:
9-
- master
10-
workflow_dispatch:
11-
12-
concurrency:
13-
group: ${{ github.workflow }}-${{ github.head_ref || github.ref }}
14-
cancel-in-progress: true
5+
branches: [ "master" ]
6+
pull_request:
7+
branches: [ "master" ]
158

169
jobs:
10+
1711
build:
18-
name: Build
1912
runs-on: ubuntu-24.04
2013
steps:
21-
- name: Set up Go 1.24
22-
uses: actions/setup-go@v5
23-
with:
24-
go-version: '1.24'
25-
id: go
14+
- uses: actions/checkout@v4
2615

27-
- name: Use Node.js 20.x
28-
uses: actions/setup-node@v4
29-
with:
30-
node-version: '20'
31-
check-latest: true
16+
- name: Set up Go
17+
uses: actions/setup-go@v5
18+
with:
19+
go-version: '1.25'
3220

33-
- name: Check out code into the Go module directory
34-
uses: actions/checkout@v4
21+
- name: Build
22+
run: go build -v ./...
3523

36-
- name: Prepare git
37-
env:
38-
GITHUB_USER: 1gtm
39-
GITHUB_TOKEN: ${{ secrets.LGTM_GITHUB_TOKEN }}
40-
run: |
41-
set -x
42-
git config --global user.name "${GITHUB_USER}"
43-
git config --global user.email "${GITHUB_USER}@appscode.com"
44-
git config --global \
45-
url."https://${GITHUB_USER}:${GITHUB_TOKEN}@github.com".insteadOf \
46-
"https://github.com"
24+
- name: Test
25+
run: go test -v ./...

apis/config_test.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,11 @@ func TestConfig_Validate(t *testing.T) {
3939
Password: "pass",
4040
},
4141
Publisher: &PublisherCfg{
42-
Type: "kafka",
43-
Address: "addr",
44-
Topic: "stream",
45-
TopicPrefix: "prefix",
42+
Type: "kafka",
43+
Address: "addr",
44+
Topic: "stream",
45+
TopicPrefix: "prefix",
46+
NatsCredPath: "/etc/nats/creds/admin.creds",
4647
},
4748
},
4849
wantErr: nil,
@@ -65,13 +66,14 @@ func TestConfig_Validate(t *testing.T) {
6566
Password: "pass",
6667
},
6768
Publisher: &PublisherCfg{
68-
Type: "kafka",
69-
Address: "addr",
70-
Topic: "stream",
71-
TopicPrefix: "prefix",
69+
Type: "kafka",
70+
Address: "addr",
71+
Topic: "stream",
72+
TopicPrefix: "prefix",
73+
NatsCredPath: "/etc/nats/creds/admin.creds",
7274
},
7375
},
74-
wantErr: errors.New("Listener.RefreshConnection: non zero value required;Listener.SlotName: non zero value required"),
76+
wantErr: errors.New("Listener.refreshConnection: non zero value required;Listener.slotName: non zero value required"),
7577
},
7678
{
7779
name: "bad db cfg",
@@ -91,13 +93,14 @@ func TestConfig_Validate(t *testing.T) {
9193
Password: "pass",
9294
},
9395
Publisher: &PublisherCfg{
94-
Type: "kafka",
95-
Address: "addr",
96-
Topic: "stream",
97-
TopicPrefix: "prefix",
96+
Type: "kafka",
97+
Address: "addr",
98+
Topic: "stream",
99+
TopicPrefix: "prefix",
100+
NatsCredPath: "/etc/nats/creds/admin.creds",
98101
},
99102
},
100-
wantErr: errors.New("Database.Host: non zero value required;Database.Port: non zero value required"),
103+
wantErr: errors.New("Database.host: non zero value required;Database.port: non zero value required"),
101104
},
102105
{
103106
name: "empty publisher kind",
@@ -119,11 +122,12 @@ func TestConfig_Validate(t *testing.T) {
119122
Password: "pass",
120123
},
121124
Publisher: &PublisherCfg{
122-
Topic: "stream",
123-
TopicPrefix: "prefix",
125+
Topic: "stream",
126+
TopicPrefix: "prefix",
127+
NatsCredPath: "/etc/nats/creds/admin.creds",
124128
},
125129
},
126-
wantErr: errors.New("Publisher.Type: non zero value required"),
130+
wantErr: errors.New("Publisher.address: non zero value required;Publisher.type: non zero value required"),
127131
},
128132
}
129133

cmd/pgoutbox/init.go

Lines changed: 66 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,35 +25,83 @@ import (
2525
"kubeops.dev/pgoutbox/apis"
2626
"kubeops.dev/pgoutbox/internal/publisher"
2727

28-
"github.com/jackc/pgx"
28+
"github.com/jackc/pglogrepl"
29+
"github.com/jackc/pgx/v5"
30+
"github.com/jackc/pgx/v5/pgconn"
31+
"github.com/jackc/pgx/v5/pgproto3"
2932
"github.com/nats-io/nats.go"
3033
"k8s.io/apimachinery/pkg/util/wait"
3134
)
3235

36+
// replicationConn wraps pgconn.PgConn to implement the replication interface expected by the listener.
37+
type replicationConn struct {
38+
conn *pgconn.PgConn
39+
}
40+
41+
func newReplicationConn(conn *pgconn.PgConn) *replicationConn {
42+
return &replicationConn{conn: conn}
43+
}
44+
45+
func (r *replicationConn) CreateReplicationSlot(ctx context.Context, slotName, outputPlugin string) (pglogrepl.CreateReplicationSlotResult, error) {
46+
return pglogrepl.CreateReplicationSlot(ctx, r.conn, slotName, outputPlugin, pglogrepl.CreateReplicationSlotOptions{})
47+
}
48+
49+
func (r *replicationConn) DropReplicationSlot(ctx context.Context, slotName string) error {
50+
return pglogrepl.DropReplicationSlot(ctx, r.conn, slotName, pglogrepl.DropReplicationSlotOptions{})
51+
}
52+
53+
func (r *replicationConn) StartReplication(ctx context.Context, slotName string, startLsn pglogrepl.LSN, options pglogrepl.StartReplicationOptions) error {
54+
return pglogrepl.StartReplication(ctx, r.conn, slotName, startLsn, options)
55+
}
56+
57+
func (r *replicationConn) ReceiveMessage(ctx context.Context) ([]byte, error) {
58+
rawMsg, err := r.conn.ReceiveMessage(ctx)
59+
if err != nil {
60+
return nil, err
61+
}
62+
63+
if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok {
64+
return nil, fmt.Errorf("received error from postgres: %s", errMsg.Message)
65+
}
66+
67+
msg, ok := rawMsg.(*pgproto3.CopyData)
68+
if !ok {
69+
return nil, nil
70+
}
71+
72+
return msg.Data, nil
73+
}
74+
75+
func (r *replicationConn) SendStandbyStatusUpdate(ctx context.Context, status pglogrepl.StandbyStatusUpdate) error {
76+
return pglogrepl.SendStandbyStatusUpdate(ctx, r.conn, status)
77+
}
78+
79+
func (r *replicationConn) IsAlive() bool {
80+
return !r.conn.IsClosed()
81+
}
82+
83+
func (r *replicationConn) Close() error {
84+
return r.conn.Close(context.Background())
85+
}
86+
3387
// initPgxConnections initialise db and replication connections.
34-
func initPgxConnections(cfg *apis.DatabaseCfg, logger *slog.Logger, timeout time.Duration) (*pgx.Conn, *pgx.ReplicationConn, error) {
88+
func initPgxConnections(cfg *apis.DatabaseCfg, logger *slog.Logger, timeout time.Duration) (*pgx.Conn, *pgconn.PgConn, error) {
3589
var pgConn *pgx.Conn
36-
var pgReplicationConn *pgx.ReplicationConn
37-
38-
pgxConf := pgx.ConnConfig{
39-
LogLevel: pgx.LogLevelInfo,
40-
Logger: pgxLogger{logger},
41-
Host: cfg.Host,
42-
Port: cfg.Port,
43-
Database: cfg.Name,
44-
User: cfg.User,
45-
Password: cfg.Password,
46-
}
90+
var pgReplConn *pgconn.PgConn
91+
92+
connString := fmt.Sprintf("host=%s port=%d dbname=%s user=%s password=%s",
93+
cfg.Host, cfg.Port, cfg.Name, cfg.User, cfg.Password)
94+
replConnString := connString + " replication=database"
4795

4896
err := wait.PollUntilContextTimeout(context.TODO(), 5*time.Second, timeout, true, func(ctx context.Context) (bool, error) {
4997
var err error
50-
pgConn, err = pgx.Connect(pgxConf)
98+
pgConn, err = pgx.Connect(ctx, connString)
5199
if err != nil {
52100
logger.Error("db connection:", slog.String("error", err.Error()))
53101
return false, nil
54102
}
55103

56-
pgReplicationConn, err = pgx.ReplicationConnect(pgxConf)
104+
pgReplConn, err = pgconn.Connect(ctx, replConnString)
57105
if err != nil {
58106
logger.Error("db replication connection:", slog.String("error", err.Error()))
59107
return false, nil
@@ -65,12 +113,12 @@ func initPgxConnections(cfg *apis.DatabaseCfg, logger *slog.Logger, timeout time
65113
return nil, nil, fmt.Errorf("wait for db connection: %w", err)
66114
}
67115

68-
return pgConn, pgReplicationConn, nil
116+
return pgConn, pgReplConn, nil
69117
}
70118

71-
func configureReplicaIdentityToFull(pgConn *pgx.Conn, filterTables apis.FilterStruct) error {
119+
func configureReplicaIdentityToFull(ctx context.Context, pgConn *pgx.Conn, filterTables apis.FilterStruct) error {
72120
for table := range filterTables.Tables {
73-
_, err := pgConn.Exec(fmt.Sprintf("ALTER TABLE %s REPLICA IDENTITY FULL;", table))
121+
_, err := pgConn.Exec(ctx, fmt.Sprintf("ALTER TABLE %s REPLICA IDENTITY FULL;", table))
74122
if err != nil {
75123
return fmt.Errorf("change replica identity to FULL for table %s: %w", table, err)
76124
}
@@ -79,15 +127,6 @@ func configureReplicaIdentityToFull(pgConn *pgx.Conn, filterTables apis.FilterSt
79127
return nil
80128
}
81129

82-
type pgxLogger struct {
83-
logger *slog.Logger
84-
}
85-
86-
// Log DB message.
87-
func (l pgxLogger) Log(_ pgx.LogLevel, msg string, _ map[string]any) {
88-
l.logger.Debug(msg)
89-
}
90-
91130
type eventPublisher interface {
92131
Publish(context.Context, string, *apis.Event) error
93132
Close() error

cmd/pgoutbox/main.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import (
3535

3636
// GetVersion returns latest git hash of commit.
3737
func GetVersion() string {
38-
var version = "unknown"
38+
version := "unknown"
3939

4040
info, ok := debug.ReadBuildInfo()
4141
if ok {
@@ -85,12 +85,12 @@ func main() {
8585

8686
logger := apis.InitSlog(cfg.Logger, version, false)
8787

88-
conn, rConn, err := initPgxConnections(cfg.Database, logger, time.Minute*10)
88+
pgxConn, pgConn, err := initPgxConnections(cfg.Database, logger, time.Minute*10)
8989
if err != nil {
9090
return fmt.Errorf("pgx connection: %w", err)
9191
}
9292

93-
if err = configureReplicaIdentityToFull(conn, cfg.Listener.Filter); err != nil {
93+
if err = configureReplicaIdentityToFull(ctx, pgxConn, cfg.Listener.Filter); err != nil {
9494
return fmt.Errorf("configure replica identity: %w", err)
9595
}
9696
pub, err := factoryPublisher(ctx, cfg.Publisher, logger)
@@ -107,8 +107,8 @@ func main() {
107107
svc := listener.NewWalListener(
108108
cfg,
109109
logger,
110-
listener.NewRepository(conn),
111-
rConn,
110+
listener.NewRepository(pgxConn),
111+
newReplicationConn(pgConn),
112112
pub,
113113
transaction.NewBinaryParser(logger, binary.BigEndian),
114114
apis.NewMetrics(),

0 commit comments

Comments
 (0)