Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 35 additions & 18 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@

## Architecture & Data Flow

**pgoutbox** captures PostgreSQL changes via logical decoding and publishes to message brokers (NATS, Kafka, RabbitMQ, Google Pub/Sub).
**pgoutbox** captures PostgreSQL changes via logical decoding and publishes to message brokers (NATS JetStream, Kafka, RabbitMQ, Google Pub/Sub).

```
PostgreSQL WAL → pgx replication → Parser.ParseWalMessage() → WAL object → filter → Event → Publisher.Publish() → broker
```

### Key Components
- [internal/listener/listener.go](../internal/listener/listener.go) - Replication orchestration, slot/publication lifecycle
- [internal/listener/transaction/parser.go](../internal/listener/transaction/parser.go) - WAL binary protocol decoding
- [internal/listener/listener.go](../internal/listener/listener.go) - Replication orchestration, slot/publication lifecycle, health probes
- [internal/listener/transaction/parser.go](../internal/listener/transaction/parser.go) - WAL message parsing using `pglogrepl.Parse()` (`pgoutput` plugin)
- [internal/listener/transaction/wal.go](../internal/listener/transaction/wal.go) - Transaction state, event filtering via `CreateEventsWithFilter()`
- [internal/publisher/](../internal/publisher/) - Broker adapters implementing `eventPublisher` interface
- [apis/config.go](../apis/config.go) - Config types with Viper loading (env prefix: `WAL_`)
- [apis/event.go](../apis/event.go) - Event struct with `SubjectName()` for topic construction
- [cmd/pgoutbox/init.go](../cmd/pgoutbox/init.go) - Publisher factory, connection setup with retry

## Developer Workflows

Expand All @@ -25,48 +27,63 @@ make ci # verify + lint + build + unit-tests
make fmt # Format source code
```

All commands run in Docker (`ghcr.io/appscode/golang-dev:1.24`). Output: `bin/pgoutbox-{OS}-{ARCH}`.
All commands run in Docker (`ghcr.io/appscode/golang-dev:1.25`). Output: `bin/pgoutbox-{OS}-{ARCH}`.

**Local PostgreSQL**: `docker/docker-compose.yml` + `docker/scripts/` for test DB setup.

## Critical Patterns

### Interface-Based Mocking
Listener depends on interfaces (`eventPublisher`, `parser`, `replication`, `repository`). Mock implementations in `*_mock_test.go` files:
### Interface-Based Testing
Listener depends on four interfaces defined in [listener.go](../internal/listener/listener.go):
```go
// internal/listener/listener.go
type eventPublisher interface {
Publish(context.Context, string, *apis.Event) error
}
type eventPublisher interface { Publish(context.Context, string, *apis.Event) error }
type parser interface { ParseWalMessage([]byte, *tx.WAL) error }
type replication interface { CreateReplicationSlot(...); StartReplication(...); ReceiveMessage(...) }
type repository interface { CreatePublication(...); GetSlotLSN(...); IsReplicationActive(...) }
```
Mock implementations in `*_mock_test.go` files use `github.com/stretchr/testify/mock`.

### Adding a New Publisher
1. Create `internal/publisher/{broker}.go` implementing `Publish(ctx, subject, *Event) error` and `Close() error`
2. Add type constant in [apis/config.go](../apis/config.go): `PublisherType{Name} PublisherType = "{name}"`
3. Add case in `factoryPublisher()` in [cmd/pgoutbox/init.go](../cmd/pgoutbox/init.go)

### Configuration
- YAML config + environment overrides (prefix `WAL_`, e.g., `WAL_DATABASE_HOST`)
- Struct tags: both `mapstructure` and `json` required
- YAML config + environment overrides (prefix `WAL_`, e.g., `WAL_DATABASE_HOST`, `WAL_LISTENER_SLOTNAME`)
- Struct tags: both `mapstructure` and `json` required on all config fields
- Publisher types: `nats`, `kafka`, `rabbitmq`, `google_pubsub`

### Topic Naming
Format: `{publisher.topic}.{publisher.topicPrefix}schemas.{schema}.tables.{table}`
Override via `listener.topicsMap` in config.
Override specific topics via `listener.topicsMap` in config.

### WAL Message Parsing
Uses `github.com/jackc/pglogrepl` for WAL message parsing. The `Parser.ParseWalMessage()` method uses `pglogrepl.Parse()` to decode messages:
- Message types: `BeginMessage`, `CommitMessage`, `RelationMessage`, `InsertMessage`, `UpdateMessage`, `DeleteMessage`
- Types: `pglogrepl.LSN` for LSN values, `uint32` for relation IDs, `*pglogrepl.TupleData` for row data
- Column flags: `Flags == 1` indicates the column is part of the primary key

### Logging
Use `log/slog` with injected `*slog.Logger`:
```go
l.log.Debug("message", slog.String("key", value))
l.log.Info("event was sent", slog.String("subject", name), slog.String("table", table))
```

## Common Tasks

| Task | Files to Modify |
|------|-----------------|
| Add publisher | `internal/publisher/{broker}.go` (implement `eventPublisher`), `cmd/pgoutbox/init.go` |
| Add filter logic | `apis/config.go` (`FilterStruct`), `internal/listener/transaction/parser.go` |
| Extend WAL parsing | `internal/listener/transaction/parser.go`, `transaction/data.go` |
| Add metrics | `apis/metrics.go` (Prometheus counters) |
| Add publisher | `internal/publisher/{broker}.go`, `apis/config.go` (type const), `cmd/pgoutbox/init.go` (factory) |
| Add filter logic | `apis/config.go` (`FilterStruct`), `internal/listener/transaction/wal.go` (`CreateEventsWithFilter`) |
| Extend WAL parsing | `internal/listener/transaction/parser.go`, `transaction/data.go` (uses `pglogrepl.Parse()`) |
| Add metrics | `apis/metrics.go` (Prometheus counters), update `monitor` interface |
| Add config field | `apis/config.go` (add both `mapstructure` and `json` tags) |

## Standards

- **License**: Apache 2.0 headers required (see `hack/license/`)
- **Errors**: Wrap with `fmt.Errorf("context: %w", err)`
- **Publication name**: Hardcoded as `_pgoutbox_`
- **Publication name**: Hardcoded as `pgoutbox` (auto-created for all tables)
- **Replication slot**: Configured via `listener.slotName`, auto-created on startup
- **Tests**: Table-driven tests with `testify/assert` and `testify/mock`
35 changes: 21 additions & 14 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
name: ci
name: CI

on:
push:
branches: [ "master" ]
pull_request:
branches: [ "master" ]
branches:
- '*'
push:
branches:
- master
workflow_dispatch:

jobs:
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.ref }}
cancel-in-progress: true

jobs:
build:
name: Build
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v4
- name: Set up Go 1.25
uses: actions/setup-go@v5
with:
go-version: '1.25'
id: go

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.25'
- uses: actions/checkout@v4

- name: Build
run: go build -v ./...
- name: Build
run: go build -v ./...

- name: Test
run: go test -v ./...
- name: Test
run: go test -v ./...
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ SRC_REG ?=
git_branch := $(shell git rev-parse --abbrev-ref HEAD)
git_tag := $(shell git describe --exact-match --abbrev=0 2>/dev/null || echo "")
commit_hash := $(shell git rev-parse --verify HEAD)
commit_timestamp := $(shell date --date="@$$(git show -s --format=%ct)" --utc +%FT%T)
commit_timestamp := $(shell date -u -r $$(git show -s --format=%ct) +%FT%T 2>/dev/null || date --date="@$$(git show -s --format=%ct)" --utc +%FT%T 2>/dev/null || echo "")

VERSION := $(shell git describe --tags --always --dirty)
version_strategy := commit_hash
Expand Down
47 changes: 24 additions & 23 deletions apis/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"strings"
"time"

"github.com/asaskevich/govalidator"
"github.com/go-playground/validator/v10"
"github.com/spf13/viper"
)

Expand All @@ -20,10 +20,10 @@ const (

// Config for pgoutbox.
type Config struct {
Listener *ListenerCfg `valid:"required" json:"listener" mapstructure:"listener"`
Database *DatabaseCfg `valid:"required" json:"database" mapstructure:"database"`
Publisher *PublisherCfg `valid:"required" json:"publisher" mapstructure:"publisher"`
Logger *Logger `valid:"required" json:"logger" mapstructure:"logger"`
Listener *ListenerCfg `validate:"required" json:"listener" mapstructure:"listener"`
Database *DatabaseCfg `validate:"required" json:"database" mapstructure:"database"`
Publisher *PublisherCfg `validate:"required" json:"publisher" mapstructure:"publisher"`
Logger *Logger `validate:"required" json:"logger" mapstructure:"logger"`
Telemetry *TelemetryCfg `json:"telemetry" mapstructure:"telemetry"`
}

Expand All @@ -36,36 +36,36 @@ type TelemetryCfg struct {

// ListenerCfg path of the listener config.
type ListenerCfg struct {
SlotName string `valid:"required" json:"slotName" mapstructure:"slotName"`
SlotName string `validate:"required" json:"slotName" mapstructure:"slotName"`
ServerPort int `json:"serverPort" mapstructure:"serverPort"`
AckTimeout time.Duration `json:"ackTimeout" mapstructure:"ackTimeout"`
RefreshConnection time.Duration `json:"refreshConnection" valid:"required" mapstructure:"refreshConnection"`
HeartbeatInterval time.Duration `json:"heartbeatInterval" valid:"required" mapstructure:"heartbeatInterval"`
RefreshConnection time.Duration `validate:"required" json:"refreshConnection" mapstructure:"refreshConnection"`
HeartbeatInterval time.Duration `validate:"required" json:"heartbeatInterval" mapstructure:"heartbeatInterval"`
Filter FilterStruct `json:"filter" mapstructure:"filter"`
TopicsMap map[string]string `json:"topicsMap" mapstructure:"topicsMap"`
}

// PublisherCfg represent configuration for any publisher types.
type PublisherCfg struct {
Type PublisherType `valid:"required" json:"type" mapstructure:"type"`
Address string `valid:"required" json:"address" mapstructure:"address"`
NatsCredPath string `valid:"required" json:"natsCredPath" mapstructure:"natsCredPath"`
Topic string `valid:"required" json:"topic" mapstructure:"topic"`
Type PublisherType `validate:"required,oneof=nats kafka rabbitmq google_pubsub" json:"type" mapstructure:"type"`
Address string `validate:"required" json:"address" mapstructure:"address"`
NatsCredPath string `validate:"required_if=Type nats" json:"natsCredPath" mapstructure:"natsCredPath"`
Topic string `validate:"required" json:"topic" mapstructure:"topic"`
TopicPrefix string `json:"topicPrefix" mapstructure:"topicPrefix"`
EnableTLS bool `json:"enableTLS" mapstructure:"enableTlS"`
ClientCert string `json:"clientCert" mapstructure:"clientCert"`
ClientKey string `json:"clientKey" mapstructure:"clientKey"`
CACert string `json:"CACert" mapstructure:"caCert"`
PubSubProjectID string `json:"pubSubProjectID" mapstructure:"pubSubProductId"`
ClientCert string `validate:"required_if=EnableTLS true" json:"clientCert" mapstructure:"clientCert"`
ClientKey string `validate:"required_if=EnableTLS true" json:"clientKey" mapstructure:"clientKey"`
CACert string `validate:"required_if=EnableTLS true" json:"CACert" mapstructure:"caCert"`
PubSubProjectID string `validate:"required_if=Type google_pubsub" json:"pubSubProjectID" mapstructure:"pubSubProductId"`
}

// DatabaseCfg path of the PostgreSQL DB config.
type DatabaseCfg struct {
Host string `valid:"required" json:"host" mapstructure:"host"`
Port uint16 `valid:"required" json:"port" mapstructure:"port"`
Name string `valid:"required" json:"name" mapstructure:"name"`
User string `valid:"required" json:"user" mapstructure:"user"`
Password string `valid:"required" json:"password" mapstructure:"password"`
Host string `validate:"required" json:"host" mapstructure:"host"`
Port uint16 `validate:"required" json:"port" mapstructure:"port"`
Name string `validate:"required" json:"name" mapstructure:"name"`
User string `validate:"required" json:"user" mapstructure:"user"`
Password string `validate:"required" json:"password" mapstructure:"password"`
Debug bool `json:"debug" mapstructure:"debug"`
}

Expand All @@ -74,10 +74,11 @@ type FilterStruct struct {
Tables map[string][]string `json:"tables" yaml:"tables" mapstructure:"tables"`
}

var validate = validator.New()

// Validate config data.
func (c Config) Validate() error {
_, err := govalidator.ValidateStruct(c)
return err
return validate.Struct(c)
}

// InitConfig load config from file.
Expand Down
Loading
Loading