Skip to content

Commit e2e87a0

Browse files
tamalsahaImtiaz246
andauthored
refactor: use pglogrepl's message parser for WAL parsing (#15)
* refactor: use pglogrepl's message parser for WAL parsing Replace custom BinaryParser with pglogrepl.Parse() for consistency with the upstream library and improved maintainability. Changes: - Replace BinaryParser with Parser using pglogrepl.Parse() - Update WAL struct to use pglogrepl.LSN and uint32 for relationID - Update CreateActionData to accept *pglogrepl.TupleData - Remove custom message types from protocol.go (now uses pglogrepl types) - Rewrite parser tests for new implementation - Update wal tests for new types - Fix main.go to use NewParser() instead of NewBinaryParser() - Fix NatsCredPath validation (not required for all publisher types) - Fix config test expectations for lowercase field names - Fix nats_test.go subject name expectation - Update copilot-instructions.md to document pglogrepl usage Signed-off-by: Tamal Saha <tamal@appscode.com> * feat: migrate to go-playground/validator/v10 & refactor tests Signed-off-by: Imtiaz Uddin <imtiaz@appscode.com> --------- Signed-off-by: Tamal Saha <tamal@appscode.com> Signed-off-by: Imtiaz Uddin <imtiaz@appscode.com> Co-authored-by: Imtiaz Uddin <imtiaz@appscode.com>
1 parent 3e93bd9 commit e2e87a0

File tree

117 files changed

+24017
-5280
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

117 files changed

+24017
-5280
lines changed

.github/copilot-instructions.md

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,20 @@
22

33
## Architecture & Data Flow
44

5-
**pgoutbox** captures PostgreSQL changes via logical decoding and publishes to message brokers (NATS, Kafka, RabbitMQ, Google Pub/Sub).
5+
**pgoutbox** captures PostgreSQL changes via logical decoding and publishes to message brokers (NATS JetStream, Kafka, RabbitMQ, Google Pub/Sub).
66

77
```
88
PostgreSQL WAL → pgx replication → Parser.ParseWalMessage() → WAL object → filter → Event → Publisher.Publish() → broker
99
```
1010

1111
### Key Components
12-
- [internal/listener/listener.go](../internal/listener/listener.go) - Replication orchestration, slot/publication lifecycle
13-
- [internal/listener/transaction/parser.go](../internal/listener/transaction/parser.go) - WAL binary protocol decoding
12+
- [internal/listener/listener.go](../internal/listener/listener.go) - Replication orchestration, slot/publication lifecycle, health probes
13+
- [internal/listener/transaction/parser.go](../internal/listener/transaction/parser.go) - WAL message parsing using `pglogrepl.Parse()` (`pgoutput` plugin)
14+
- [internal/listener/transaction/wal.go](../internal/listener/transaction/wal.go) - Transaction state, event filtering via `CreateEventsWithFilter()`
1415
- [internal/publisher/](../internal/publisher/) - Broker adapters implementing `eventPublisher` interface
1516
- [apis/config.go](../apis/config.go) - Config types with Viper loading (env prefix: `WAL_`)
1617
- [apis/event.go](../apis/event.go) - Event struct with `SubjectName()` for topic construction
18+
- [cmd/pgoutbox/init.go](../cmd/pgoutbox/init.go) - Publisher factory, connection setup with retry
1719

1820
## Developer Workflows
1921

@@ -25,48 +27,63 @@ make ci # verify + lint + build + unit-tests
2527
make fmt # Format source code
2628
```
2729

28-
All commands run in Docker (`ghcr.io/appscode/golang-dev:1.24`). Output: `bin/pgoutbox-{OS}-{ARCH}`.
30+
All commands run in Docker (`ghcr.io/appscode/golang-dev:1.25`). Output: `bin/pgoutbox-{OS}-{ARCH}`.
2931

3032
**Local PostgreSQL**: `docker/docker-compose.yml` + `docker/scripts/` for test DB setup.
3133

3234
## Critical Patterns
3335

34-
### Interface-Based Mocking
35-
Listener depends on interfaces (`eventPublisher`, `parser`, `replication`, `repository`). Mock implementations in `*_mock_test.go` files:
36+
### Interface-Based Testing
37+
Listener depends on four interfaces defined in [listener.go](../internal/listener/listener.go):
3638
```go
37-
// internal/listener/listener.go
38-
type eventPublisher interface {
39-
Publish(context.Context, string, *apis.Event) error
40-
}
39+
type eventPublisher interface { Publish(context.Context, string, *apis.Event) error }
40+
type parser interface { ParseWalMessage([]byte, *tx.WAL) error }
41+
type replication interface { CreateReplicationSlot(...); StartReplication(...); ReceiveMessage(...) }
42+
type repository interface { CreatePublication(...); GetSlotLSN(...); IsReplicationActive(...) }
4143
```
44+
Mock implementations in `*_mock_test.go` files use `github.com/stretchr/testify/mock`.
45+
46+
### Adding a New Publisher
47+
1. Create `internal/publisher/{broker}.go` implementing `Publish(ctx, subject, *Event) error` and `Close() error`
48+
2. Add type constant in [apis/config.go](../apis/config.go): `PublisherType{Name} PublisherType = "{name}"`
49+
3. Add case in `factoryPublisher()` in [cmd/pgoutbox/init.go](../cmd/pgoutbox/init.go)
4250

4351
### Configuration
44-
- YAML config + environment overrides (prefix `WAL_`, e.g., `WAL_DATABASE_HOST`)
45-
- Struct tags: both `mapstructure` and `json` required
52+
- YAML config + environment overrides (prefix `WAL_`, e.g., `WAL_DATABASE_HOST`, `WAL_LISTENER_SLOTNAME`)
53+
- Struct tags: both `mapstructure` and `json` required on all config fields
4654
- Publisher types: `nats`, `kafka`, `rabbitmq`, `google_pubsub`
4755

4856
### Topic Naming
4957
Format: `{publisher.topic}.{publisher.topicPrefix}schemas.{schema}.tables.{table}`
50-
Override via `listener.topicsMap` in config.
58+
Override specific topics via `listener.topicsMap` in config.
59+
60+
### WAL Message Parsing
61+
Uses `github.com/jackc/pglogrepl` for WAL message parsing. The `Parser.ParseWalMessage()` method uses `pglogrepl.Parse()` to decode messages:
62+
- Message types: `BeginMessage`, `CommitMessage`, `RelationMessage`, `InsertMessage`, `UpdateMessage`, `DeleteMessage`
63+
- Types: `pglogrepl.LSN` for LSN values, `uint32` for relation IDs, `*pglogrepl.TupleData` for row data
64+
- Column flags: `Flags == 1` indicates the column is part of the primary key
5165

5266
### Logging
5367
Use `log/slog` with injected `*slog.Logger`:
5468
```go
5569
l.log.Debug("message", slog.String("key", value))
70+
l.log.Info("event was sent", slog.String("subject", name), slog.String("table", table))
5671
```
5772

5873
## Common Tasks
5974

6075
| Task | Files to Modify |
6176
|------|-----------------|
62-
| Add publisher | `internal/publisher/{broker}.go` (implement `eventPublisher`), `cmd/pgoutbox/init.go` |
63-
| Add filter logic | `apis/config.go` (`FilterStruct`), `internal/listener/transaction/parser.go` |
64-
| Extend WAL parsing | `internal/listener/transaction/parser.go`, `transaction/data.go` |
65-
| Add metrics | `apis/metrics.go` (Prometheus counters) |
77+
| Add publisher | `internal/publisher/{broker}.go`, `apis/config.go` (type const), `cmd/pgoutbox/init.go` (factory) |
78+
| Add filter logic | `apis/config.go` (`FilterStruct`), `internal/listener/transaction/wal.go` (`CreateEventsWithFilter`) |
79+
| Extend WAL parsing | `internal/listener/transaction/parser.go`, `transaction/data.go` (uses `pglogrepl.Parse()`) |
80+
| Add metrics | `apis/metrics.go` (Prometheus counters), update `monitor` interface |
81+
| Add config field | `apis/config.go` (add both `mapstructure` and `json` tags) |
6682

6783
## Standards
6884

6985
- **License**: Apache 2.0 headers required (see `hack/license/`)
7086
- **Errors**: Wrap with `fmt.Errorf("context: %w", err)`
71-
- **Publication name**: Hardcoded as `_pgoutbox_`
87+
- **Publication name**: Hardcoded as `pgoutbox` (auto-created for all tables)
7288
- **Replication slot**: Configured via `listener.slotName`, auto-created on startup
89+
- **Tests**: Table-driven tests with `testify/assert` and `testify/mock`

.github/workflows/ci.yml

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,33 @@
1-
name: ci
1+
name: CI
22

33
on:
4-
push:
5-
branches: [ "master" ]
64
pull_request:
7-
branches: [ "master" ]
5+
branches:
6+
- '*'
7+
push:
8+
branches:
9+
- master
10+
workflow_dispatch:
811

9-
jobs:
12+
concurrency:
13+
group: ${{ github.workflow }}-${{ github.head_ref || github.ref }}
14+
cancel-in-progress: true
1015

16+
jobs:
1117
build:
1218
name: Build
1319
runs-on: ubuntu-24.04
1420
steps:
15-
- uses: actions/checkout@v4
21+
- name: Set up Go 1.25
22+
uses: actions/setup-go@v5
23+
with:
24+
go-version: '1.25'
25+
id: go
1626

17-
- name: Set up Go
18-
uses: actions/setup-go@v5
19-
with:
20-
go-version: '1.25'
27+
- uses: actions/checkout@v4
2128

22-
- name: Build
23-
run: go build -v ./...
29+
- name: Build
30+
run: go build -v ./...
2431

25-
- name: Test
26-
run: go test -v ./...
32+
- name: Test
33+
run: go test -v ./...

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ SRC_REG ?=
3333
git_branch := $(shell git rev-parse --abbrev-ref HEAD)
3434
git_tag := $(shell git describe --exact-match --abbrev=0 2>/dev/null || echo "")
3535
commit_hash := $(shell git rev-parse --verify HEAD)
36-
commit_timestamp := $(shell date --date="@$$(git show -s --format=%ct)" --utc +%FT%T)
36+
commit_timestamp := $(shell date -u -r $$(git show -s --format=%ct) +%FT%T 2>/dev/null || date --date="@$$(git show -s --format=%ct)" --utc +%FT%T 2>/dev/null || echo "")
3737

3838
VERSION := $(shell git describe --tags --always --dirty)
3939
version_strategy := commit_hash

apis/config.go

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"strings"
66
"time"
77

8-
"github.com/asaskevich/govalidator"
8+
"github.com/go-playground/validator/v10"
99
"github.com/spf13/viper"
1010
)
1111

@@ -20,10 +20,10 @@ const (
2020

2121
// Config for pgoutbox.
2222
type Config struct {
23-
Listener *ListenerCfg `valid:"required" json:"listener" mapstructure:"listener"`
24-
Database *DatabaseCfg `valid:"required" json:"database" mapstructure:"database"`
25-
Publisher *PublisherCfg `valid:"required" json:"publisher" mapstructure:"publisher"`
26-
Logger *Logger `valid:"required" json:"logger" mapstructure:"logger"`
23+
Listener *ListenerCfg `validate:"required" json:"listener" mapstructure:"listener"`
24+
Database *DatabaseCfg `validate:"required" json:"database" mapstructure:"database"`
25+
Publisher *PublisherCfg `validate:"required" json:"publisher" mapstructure:"publisher"`
26+
Logger *Logger `validate:"required" json:"logger" mapstructure:"logger"`
2727
Telemetry *TelemetryCfg `json:"telemetry" mapstructure:"telemetry"`
2828
}
2929

@@ -36,36 +36,36 @@ type TelemetryCfg struct {
3636

3737
// ListenerCfg path of the listener config.
3838
type ListenerCfg struct {
39-
SlotName string `valid:"required" json:"slotName" mapstructure:"slotName"`
39+
SlotName string `validate:"required" json:"slotName" mapstructure:"slotName"`
4040
ServerPort int `json:"serverPort" mapstructure:"serverPort"`
4141
AckTimeout time.Duration `json:"ackTimeout" mapstructure:"ackTimeout"`
42-
RefreshConnection time.Duration `json:"refreshConnection" valid:"required" mapstructure:"refreshConnection"`
43-
HeartbeatInterval time.Duration `json:"heartbeatInterval" valid:"required" mapstructure:"heartbeatInterval"`
42+
RefreshConnection time.Duration `validate:"required" json:"refreshConnection" mapstructure:"refreshConnection"`
43+
HeartbeatInterval time.Duration `validate:"required" json:"heartbeatInterval" mapstructure:"heartbeatInterval"`
4444
Filter FilterStruct `json:"filter" mapstructure:"filter"`
4545
TopicsMap map[string]string `json:"topicsMap" mapstructure:"topicsMap"`
4646
}
4747

4848
// PublisherCfg represent configuration for any publisher types.
4949
type PublisherCfg struct {
50-
Type PublisherType `valid:"required" json:"type" mapstructure:"type"`
51-
Address string `valid:"required" json:"address" mapstructure:"address"`
52-
NatsCredPath string `valid:"required" json:"natsCredPath" mapstructure:"natsCredPath"`
53-
Topic string `valid:"required" json:"topic" mapstructure:"topic"`
50+
Type PublisherType `validate:"required,oneof=nats kafka rabbitmq google_pubsub" json:"type" mapstructure:"type"`
51+
Address string `validate:"required" json:"address" mapstructure:"address"`
52+
NatsCredPath string `validate:"required_if=Type nats" json:"natsCredPath" mapstructure:"natsCredPath"`
53+
Topic string `validate:"required" json:"topic" mapstructure:"topic"`
5454
TopicPrefix string `json:"topicPrefix" mapstructure:"topicPrefix"`
5555
EnableTLS bool `json:"enableTLS" mapstructure:"enableTlS"`
56-
ClientCert string `json:"clientCert" mapstructure:"clientCert"`
57-
ClientKey string `json:"clientKey" mapstructure:"clientKey"`
58-
CACert string `json:"CACert" mapstructure:"caCert"`
59-
PubSubProjectID string `json:"pubSubProjectID" mapstructure:"pubSubProductId"`
56+
ClientCert string `validate:"required_if=EnableTLS true" json:"clientCert" mapstructure:"clientCert"`
57+
ClientKey string `validate:"required_if=EnableTLS true" json:"clientKey" mapstructure:"clientKey"`
58+
CACert string `validate:"required_if=EnableTLS true" json:"CACert" mapstructure:"caCert"`
59+
PubSubProjectID string `validate:"required_if=Type google_pubsub" json:"pubSubProjectID" mapstructure:"pubSubProductId"`
6060
}
6161

6262
// DatabaseCfg path of the PostgreSQL DB config.
6363
type DatabaseCfg struct {
64-
Host string `valid:"required" json:"host" mapstructure:"host"`
65-
Port uint16 `valid:"required" json:"port" mapstructure:"port"`
66-
Name string `valid:"required" json:"name" mapstructure:"name"`
67-
User string `valid:"required" json:"user" mapstructure:"user"`
68-
Password string `valid:"required" json:"password" mapstructure:"password"`
64+
Host string `validate:"required" json:"host" mapstructure:"host"`
65+
Port uint16 `validate:"required" json:"port" mapstructure:"port"`
66+
Name string `validate:"required" json:"name" mapstructure:"name"`
67+
User string `validate:"required" json:"user" mapstructure:"user"`
68+
Password string `validate:"required" json:"password" mapstructure:"password"`
6969
Debug bool `json:"debug" mapstructure:"debug"`
7070
}
7171

@@ -74,10 +74,11 @@ type FilterStruct struct {
7474
Tables map[string][]string `json:"tables" yaml:"tables" mapstructure:"tables"`
7575
}
7676

77+
var validate = validator.New()
78+
7779
// Validate config data.
7880
func (c Config) Validate() error {
79-
_, err := govalidator.ValidateStruct(c)
80-
return err
81+
return validate.Struct(c)
8182
}
8283

8384
// InitConfig load config from file.

0 commit comments

Comments
 (0)