22
33## Architecture & Data Flow
44
5- ** pgoutbox** captures PostgreSQL changes via logical decoding and publishes to message brokers (NATS, Kafka, RabbitMQ, Google Pub/Sub).
5+ ** pgoutbox** captures PostgreSQL changes via logical decoding and publishes to message brokers (NATS JetStream , Kafka, RabbitMQ, Google Pub/Sub).
66
77```
88PostgreSQL WAL → pgx replication → Parser.ParseWalMessage() → WAL object → filter → Event → Publisher.Publish() → broker
99```
1010
1111### Key Components
12- - [ internal/listener/listener.go] ( ../internal/listener/listener.go ) - Replication orchestration, slot/publication lifecycle
13- - [ internal/listener/transaction/parser.go] ( ../internal/listener/transaction/parser.go ) - WAL binary protocol decoding
12+ - [ internal/listener/listener.go] ( ../internal/listener/listener.go ) - Replication orchestration, slot/publication lifecycle, health probes
13+ - [ internal/listener/transaction/parser.go] ( ../internal/listener/transaction/parser.go ) - WAL message parsing using ` pglogrepl.Parse() ` (` pgoutput ` plugin)
14+ - [ internal/listener/transaction/wal.go] ( ../internal/listener/transaction/wal.go ) - Transaction state, event filtering via ` CreateEventsWithFilter() `
1415- [ internal/publisher/] ( ../internal/publisher/ ) - Broker adapters implementing ` eventPublisher ` interface
1516- [ apis/config.go] ( ../apis/config.go ) - Config types with Viper loading (env prefix: ` WAL_ ` )
1617- [ apis/event.go] ( ../apis/event.go ) - Event struct with ` SubjectName() ` for topic construction
18+ - [ cmd/pgoutbox/init.go] ( ../cmd/pgoutbox/init.go ) - Publisher factory, connection setup with retry
1719
1820## Developer Workflows
1921
@@ -25,48 +27,63 @@ make ci # verify + lint + build + unit-tests
2527make fmt # Format source code
2628```
2729
28- All commands run in Docker (` ghcr.io/appscode/golang-dev:1.24 ` ). Output: ` bin/pgoutbox-{OS}-{ARCH} ` .
30+ All commands run in Docker (` ghcr.io/appscode/golang-dev:1.25 ` ). Output: ` bin/pgoutbox-{OS}-{ARCH} ` .
2931
3032** Local PostgreSQL** : ` docker/docker-compose.yml ` + ` docker/scripts/ ` for test DB setup.
3133
3234## Critical Patterns
3335
34- ### Interface-Based Mocking
35- Listener depends on interfaces ( ` eventPublisher ` , ` parser ` , ` replication ` , ` repository ` ). Mock implementations in ` *_mock_test .go` files :
36+ ### Interface-Based Testing
37+ Listener depends on four interfaces defined in [ listener .go] ( ../internal/listener/listener.go ) :
3638``` go
37- // internal/listener/listener.go
38- type eventPublisher interface {
39- Publish (context. Context , string , *apis. Event ) error
40- }
39+ type eventPublisher interface { Publish (context. Context , string , *apis. Event ) error }
40+ type parser interface { ParseWalMessage ([] byte , *tx. WAL ) error }
41+ type replication interface { CreateReplicationSlot (...); StartReplication (...); ReceiveMessage (...) }
42+ type repository interface { CreatePublication (...); GetSlotLSN (...); IsReplicationActive (...) }
4143```
44+ Mock implementations in ` *_mock_test.go ` files use ` github.com/stretchr/testify/mock ` .
45+
46+ ### Adding a New Publisher
47+ 1 . Create ` internal/publisher/{broker}.go ` implementing ` Publish(ctx, subject, *Event) error ` and ` Close() error `
48+ 2 . Add type constant in [ apis/config.go] ( ../apis/config.go ) : ` PublisherType{Name} PublisherType = "{name}" `
49+ 3 . Add case in ` factoryPublisher() ` in [ cmd/pgoutbox/init.go] ( ../cmd/pgoutbox/init.go )
4250
4351### Configuration
44- - YAML config + environment overrides (prefix ` WAL_ ` , e.g., ` WAL_DATABASE_HOST ` )
45- - Struct tags: both ` mapstructure ` and ` json ` required
52+ - YAML config + environment overrides (prefix ` WAL_ ` , e.g., ` WAL_DATABASE_HOST ` , ` WAL_LISTENER_SLOTNAME ` )
53+ - Struct tags: both ` mapstructure ` and ` json ` required on all config fields
4654- Publisher types: ` nats ` , ` kafka ` , ` rabbitmq ` , ` google_pubsub `
4755
4856### Topic Naming
4957Format: ` {publisher.topic}.{publisher.topicPrefix}schemas.{schema}.tables.{table} `
50- Override via ` listener.topicsMap ` in config.
58+ Override specific topics via ` listener.topicsMap ` in config.
59+
60+ ### WAL Message Parsing
61+ Uses ` github.com/jackc/pglogrepl ` for WAL message parsing. The ` Parser.ParseWalMessage() ` method uses ` pglogrepl.Parse() ` to decode messages:
62+ - Message types: ` BeginMessage ` , ` CommitMessage ` , ` RelationMessage ` , ` InsertMessage ` , ` UpdateMessage ` , ` DeleteMessage `
63+ - Types: ` pglogrepl.LSN ` for LSN values, ` uint32 ` for relation IDs, ` *pglogrepl.TupleData ` for row data
64+ - Column flags: ` Flags == 1 ` indicates the column is part of the primary key
5165
5266### Logging
5367Use ` log/slog ` with injected ` *slog.Logger ` :
5468``` go
5569l.log .Debug (" message" , slog.String (" key" , value))
70+ l.log .Info (" event was sent" , slog.String (" subject" , name), slog.String (" table" , table))
5671```
5772
5873## Common Tasks
5974
6075| Task | Files to Modify |
6176| ------| -----------------|
62- | Add publisher | ` internal/publisher/{broker}.go ` (implement ` eventPublisher ` ), ` cmd/pgoutbox/init.go ` |
63- | Add filter logic | ` apis/config.go ` (` FilterStruct ` ), ` internal/listener/transaction/parser.go ` |
64- | Extend WAL parsing | ` internal/listener/transaction/parser.go ` , ` transaction/data.go ` |
65- | Add metrics | ` apis/metrics.go ` (Prometheus counters) |
77+ | Add publisher | ` internal/publisher/{broker}.go ` , ` apis/config.go ` (type const), ` cmd/pgoutbox/init.go ` (factory) |
78+ | Add filter logic | ` apis/config.go ` (` FilterStruct ` ), ` internal/listener/transaction/wal.go ` (` CreateEventsWithFilter ` ) |
79+ | Extend WAL parsing | ` internal/listener/transaction/parser.go ` , ` transaction/data.go ` (uses ` pglogrepl.Parse() ` ) |
80+ | Add metrics | ` apis/metrics.go ` (Prometheus counters), update ` monitor ` interface |
81+ | Add config field | ` apis/config.go ` (add both ` mapstructure ` and ` json ` tags) |
6682
6783## Standards
6884
6985- ** License** : Apache 2.0 headers required (see ` hack/license/ ` )
7086- ** Errors** : Wrap with ` fmt.Errorf("context: %w", err) `
71- - ** Publication name** : Hardcoded as ` _pgoutbox_ `
87+ - ** Publication name** : Hardcoded as ` pgoutbox ` (auto-created for all tables)
7288- ** Replication slot** : Configured via ` listener.slotName ` , auto-created on startup
89+ - ** Tests** : Table-driven tests with ` testify/assert ` and ` testify/mock `
0 commit comments