Skip to content

Commit a1f2f3d

Browse files
committed
refactor: use pglogrepl's message parser for WAL parsing
Replace custom BinaryParser with pglogrepl.Parse() for consistency with the upstream library and improved maintainability. Changes: - Replace BinaryParser with Parser using pglogrepl.Parse() - Update WAL struct to use pglogrepl.LSN and uint32 for relationID - Update CreateActionData to accept *pglogrepl.TupleData - Remove custom message types from protocol.go (now uses pglogrepl types) - Rewrite parser tests for new implementation - Update wal tests for new types - Fix main.go to use NewParser() instead of NewBinaryParser() - Fix NatsCredPath validation (not required for all publisher types) - Fix config test expectations for lowercase field names - Fix nats_test.go subject name expectation - Update copilot-instructions.md to document pglogrepl usage Signed-off-by: Tamal Saha <[email protected]>
1 parent 709ba34 commit a1f2f3d

File tree

10 files changed

+409
-1357
lines changed

10 files changed

+409
-1357
lines changed

.github/copilot-instructions.md

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,20 @@
22

33
## Architecture & Data Flow
44

5-
**pgoutbox** captures PostgreSQL changes via logical decoding and publishes to message brokers (NATS, Kafka, RabbitMQ, Google Pub/Sub).
5+
**pgoutbox** captures PostgreSQL changes via logical decoding and publishes to message brokers (NATS JetStream, Kafka, RabbitMQ, Google Pub/Sub).
66

77
```
88
PostgreSQL WAL → pgx replication → Parser.ParseWalMessage() → WAL object → filter → Event → Publisher.Publish() → broker
99
```
1010

1111
### Key Components
12-
- [internal/listener/listener.go](../internal/listener/listener.go) - Replication orchestration, slot/publication lifecycle
13-
- [internal/listener/transaction/parser.go](../internal/listener/transaction/parser.go) - WAL binary protocol decoding
12+
- [internal/listener/listener.go](../internal/listener/listener.go) - Replication orchestration, slot/publication lifecycle, health probes
13+
- [internal/listener/transaction/parser.go](../internal/listener/transaction/parser.go) - WAL message parsing using `pglogrepl.Parse()` (`pgoutput` plugin)
14+
- [internal/listener/transaction/wal.go](../internal/listener/transaction/wal.go) - Transaction state, event filtering via `CreateEventsWithFilter()`
1415
- [internal/publisher/](../internal/publisher/) - Broker adapters implementing `eventPublisher` interface
1516
- [apis/config.go](../apis/config.go) - Config types with Viper loading (env prefix: `WAL_`)
1617
- [apis/event.go](../apis/event.go) - Event struct with `SubjectName()` for topic construction
18+
- [cmd/pgoutbox/init.go](../cmd/pgoutbox/init.go) - Publisher factory, connection setup with retry
1719

1820
## Developer Workflows
1921

@@ -25,48 +27,63 @@ make ci # verify + lint + build + unit-tests
2527
make fmt # Format source code
2628
```
2729

28-
All commands run in Docker (`ghcr.io/appscode/golang-dev:1.24`). Output: `bin/pgoutbox-{OS}-{ARCH}`.
30+
All commands run in Docker (`ghcr.io/appscode/golang-dev:1.25`). Output: `bin/pgoutbox-{OS}-{ARCH}`.
2931

3032
**Local PostgreSQL**: `docker/docker-compose.yml` + `docker/scripts/` for test DB setup.
3133

3234
## Critical Patterns
3335

34-
### Interface-Based Mocking
35-
Listener depends on interfaces (`eventPublisher`, `parser`, `replication`, `repository`). Mock implementations in `*_mock_test.go` files:
36+
### Interface-Based Testing
37+
Listener depends on four interfaces defined in [listener.go](../internal/listener/listener.go):
3638
```go
37-
// internal/listener/listener.go
38-
type eventPublisher interface {
39-
Publish(context.Context, string, *apis.Event) error
40-
}
39+
type eventPublisher interface { Publish(context.Context, string, *apis.Event) error }
40+
type parser interface { ParseWalMessage([]byte, *tx.WAL) error }
41+
type replication interface { CreateReplicationSlot(...); StartReplication(...); ReceiveMessage(...) }
42+
type repository interface { CreatePublication(...); GetSlotLSN(...); IsReplicationActive(...) }
4143
```
44+
Mock implementations in `*_mock_test.go` files use `github.com/stretchr/testify/mock`.
45+
46+
### Adding a New Publisher
47+
1. Create `internal/publisher/{broker}.go` implementing `Publish(ctx, subject, *Event) error` and `Close() error`
48+
2. Add type constant in [apis/config.go](../apis/config.go): `PublisherType{Name} PublisherType = "{name}"`
49+
3. Add case in `factoryPublisher()` in [cmd/pgoutbox/init.go](../cmd/pgoutbox/init.go)
4250

4351
### Configuration
44-
- YAML config + environment overrides (prefix `WAL_`, e.g., `WAL_DATABASE_HOST`)
45-
- Struct tags: both `mapstructure` and `json` required
52+
- YAML config + environment overrides (prefix `WAL_`, e.g., `WAL_DATABASE_HOST`, `WAL_LISTENER_SLOTNAME`)
53+
- Struct tags: both `mapstructure` and `json` required on all config fields
4654
- Publisher types: `nats`, `kafka`, `rabbitmq`, `google_pubsub`
4755

4856
### Topic Naming
4957
Format: `{publisher.topic}.{publisher.topicPrefix}schemas.{schema}.tables.{table}`
50-
Override via `listener.topicsMap` in config.
58+
Override specific topics via `listener.topicsMap` in config.
59+
60+
### WAL Message Parsing
61+
Uses `github.com/jackc/pglogrepl` for WAL message parsing. The `Parser.ParseWalMessage()` method uses `pglogrepl.Parse()` to decode messages:
62+
- Message types: `BeginMessage`, `CommitMessage`, `RelationMessage`, `InsertMessage`, `UpdateMessage`, `DeleteMessage`
63+
- Types: `pglogrepl.LSN` for LSN values, `uint32` for relation IDs, `*pglogrepl.TupleData` for row data
64+
- Column flags: `Flags == 1` indicates the column is part of the primary key
5165

5266
### Logging
5367
Use `log/slog` with injected `*slog.Logger`:
5468
```go
5569
l.log.Debug("message", slog.String("key", value))
70+
l.log.Info("event was sent", slog.String("subject", name), slog.String("table", table))
5671
```
5772

5873
## Common Tasks
5974

6075
| Task | Files to Modify |
6176
|------|-----------------|
62-
| Add publisher | `internal/publisher/{broker}.go` (implement `eventPublisher`), `cmd/pgoutbox/init.go` |
63-
| Add filter logic | `apis/config.go` (`FilterStruct`), `internal/listener/transaction/parser.go` |
64-
| Extend WAL parsing | `internal/listener/transaction/parser.go`, `transaction/data.go` |
65-
| Add metrics | `apis/metrics.go` (Prometheus counters) |
77+
| Add publisher | `internal/publisher/{broker}.go`, `apis/config.go` (type const), `cmd/pgoutbox/init.go` (factory) |
78+
| Add filter logic | `apis/config.go` (`FilterStruct`), `internal/listener/transaction/wal.go` (`CreateEventsWithFilter`) |
79+
| Extend WAL parsing | `internal/listener/transaction/parser.go`, `transaction/data.go` (uses `pglogrepl.Parse()`) |
80+
| Add metrics | `apis/metrics.go` (Prometheus counters), update `monitor` interface |
81+
| Add config field | `apis/config.go` (add both `mapstructure` and `json` tags) |
6682

6783
## Standards
6884

6985
- **License**: Apache 2.0 headers required (see `hack/license/`)
7086
- **Errors**: Wrap with `fmt.Errorf("context: %w", err)`
71-
- **Publication name**: Hardcoded as `_pgoutbox_`
87+
- **Publication name**: Hardcoded as `pgoutbox` (auto-created for all tables)
7288
- **Replication slot**: Configured via `listener.slotName`, auto-created on startup
89+
- **Tests**: Table-driven tests with `testify/assert` and `testify/mock`

.github/workflows/ci.yml

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,33 @@
1-
name: ci
1+
name: CI
22

33
on:
4-
push:
5-
branches: [ "master" ]
64
pull_request:
7-
branches: [ "master" ]
5+
branches:
6+
- '*'
7+
push:
8+
branches:
9+
- master
10+
workflow_dispatch:
811

9-
jobs:
12+
concurrency:
13+
group: ${{ github.workflow }}-${{ github.head_ref || github.ref }}
14+
cancel-in-progress: true
1015

16+
jobs:
1117
build:
18+
name: Build
1219
runs-on: ubuntu-24.04
1320
steps:
14-
- uses: actions/checkout@v4
21+
- name: Set up Go 1.25
22+
uses: actions/setup-go@v5
23+
with:
24+
go-version: '1.25'
25+
id: go
1526

16-
- name: Set up Go
17-
uses: actions/setup-go@v5
18-
with:
19-
go-version: '1.25'
27+
- uses: actions/checkout@v4
2028

21-
- name: Build
22-
run: go build -v ./...
29+
- name: Build
30+
run: go build -v ./...
2331

24-
- name: Test
25-
run: go test -v ./...
32+
- name: Test
33+
run: go test -v ./...

apis/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type ListenerCfg struct {
4141
type PublisherCfg struct {
4242
Type PublisherType `valid:"required" json:"type" mapstructure:"type"`
4343
Address string `valid:"required" json:"address" mapstructure:"address"`
44-
NatsCredPath string `valid:"required" json:"natsCredPath" mapstructure:"natsCredPath"`
44+
NatsCredPath string `json:"natsCredPath" mapstructure:"natsCredPath"`
4545
Topic string `valid:"required" json:"topic" mapstructure:"topic"`
4646
TopicPrefix string `json:"topicPrefix" mapstructure:"topicPrefix"`
4747
EnableTLS bool `json:"enableTLS" mapstructure:"enableTlS"`

apis/config_test.go

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,10 @@ func TestConfig_Validate(t *testing.T) {
3939
Password: "pass",
4040
},
4141
Publisher: &PublisherCfg{
42-
Type: "kafka",
43-
Address: "addr",
44-
Topic: "stream",
45-
TopicPrefix: "prefix",
46-
NatsCredPath: "/etc/nats/creds/admin.creds",
42+
Type: "kafka",
43+
Address: "addr",
44+
Topic: "stream",
45+
TopicPrefix: "prefix",
4746
},
4847
},
4948
wantErr: nil,
@@ -66,11 +65,10 @@ func TestConfig_Validate(t *testing.T) {
6665
Password: "pass",
6766
},
6867
Publisher: &PublisherCfg{
69-
Type: "kafka",
70-
Address: "addr",
71-
Topic: "stream",
72-
TopicPrefix: "prefix",
73-
NatsCredPath: "/etc/nats/creds/admin.creds",
68+
Type: "kafka",
69+
Address: "addr",
70+
Topic: "stream",
71+
TopicPrefix: "prefix",
7472
},
7573
},
7674
wantErr: errors.New("Listener.refreshConnection: non zero value required;Listener.slotName: non zero value required"),
@@ -93,11 +91,10 @@ func TestConfig_Validate(t *testing.T) {
9391
Password: "pass",
9492
},
9593
Publisher: &PublisherCfg{
96-
Type: "kafka",
97-
Address: "addr",
98-
Topic: "stream",
99-
TopicPrefix: "prefix",
100-
NatsCredPath: "/etc/nats/creds/admin.creds",
94+
Type: "kafka",
95+
Address: "addr",
96+
Topic: "stream",
97+
TopicPrefix: "prefix",
10198
},
10299
},
103100
wantErr: errors.New("Database.host: non zero value required;Database.port: non zero value required"),
@@ -122,9 +119,8 @@ func TestConfig_Validate(t *testing.T) {
122119
Password: "pass",
123120
},
124121
Publisher: &PublisherCfg{
125-
Topic: "stream",
126-
TopicPrefix: "prefix",
127-
NatsCredPath: "/etc/nats/creds/admin.creds",
122+
Topic: "stream",
123+
TopicPrefix: "prefix",
128124
},
129125
},
130126
wantErr: errors.New("Publisher.address: non zero value required;Publisher.type: non zero value required"),

cmd/pgoutbox/main.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ limitations under the License.
1717
package main
1818

1919
import (
20-
"encoding/binary"
2120
"fmt"
2221
"log/slog"
2322
"os"
@@ -110,7 +109,7 @@ func main() {
110109
listener.NewRepository(pgxConn),
111110
newReplicationConn(pgConn),
112111
pub,
113-
transaction.NewBinaryParser(logger, binary.BigEndian),
112+
transaction.NewParser(logger),
114113
apis.NewMetrics(),
115114
)
116115

0 commit comments

Comments
 (0)