Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
72 changes: 72 additions & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# pgoutbox - AI Coding Instructions

## Architecture & Data Flow

**pgoutbox** captures PostgreSQL changes via logical decoding and publishes to message brokers (NATS, Kafka, RabbitMQ, Google Pub/Sub).

```
PostgreSQL WAL → pgx replication → Parser.ParseWalMessage() → WAL object → filter → Event → Publisher.Publish() → broker
```

### Key Components
- [internal/listener/listener.go](../internal/listener/listener.go) - Replication orchestration, slot/publication lifecycle
- [internal/listener/transaction/parser.go](../internal/listener/transaction/parser.go) - WAL binary protocol decoding
- [internal/publisher/](../internal/publisher/) - Broker adapters implementing `eventPublisher` interface
- [apis/config.go](../apis/config.go) - Config types with Viper loading (env prefix: `WAL_`)
- [apis/event.go](../apis/event.go) - Event struct with `SubjectName()` for topic construction

## Developer Workflows

```bash
make build # Build for current OS/ARCH (Docker containerized)
make test # Unit tests with race detector
make lint # golangci-lint with gofmt, goimports, unparam
make ci # verify + lint + build + unit-tests
make fmt # Format source code
```

All commands run in Docker (`ghcr.io/appscode/golang-dev:1.24`). Output: `bin/pgoutbox-{OS}-{ARCH}`.

**Local PostgreSQL**: `docker/docker-compose.yml` + `docker/scripts/` for test DB setup.

## Critical Patterns

### Interface-Based Mocking
Listener depends on interfaces (`eventPublisher`, `parser`, `replication`, `repository`). Mock implementations in `*_mock_test.go` files:
```go
// internal/listener/listener.go
type eventPublisher interface {
Publish(context.Context, string, *apis.Event) error
}
```

### Configuration
- YAML config + environment overrides (prefix `WAL_`, e.g., `WAL_DATABASE_HOST`)
- Struct tags: both `mapstructure` and `json` required
- Publisher types: `nats`, `kafka`, `rabbitmq`, `google_pubsub`

### Topic Naming
Format: `{publisher.topic}.{publisher.topicPrefix}schemas.{schema}.tables.{table}`
Override via `listener.topicsMap` in config.

### Logging
Use `log/slog` with injected `*slog.Logger`:
```go
l.log.Debug("message", slog.String("key", value))
```

## Common Tasks

| Task | Files to Modify |
|------|-----------------|
| Add publisher | `internal/publisher/{broker}.go` (implement `eventPublisher`), `cmd/pgoutbox/init.go` |
| Add filter logic | `apis/config.go` (`FilterStruct`), `internal/listener/transaction/parser.go` |
| Extend WAL parsing | `internal/listener/transaction/parser.go`, `transaction/data.go` |
| Add metrics | `apis/metrics.go` (Prometheus counters) |

## Standards

- **License**: Apache 2.0 headers required (see `hack/license/`)
- **Errors**: Wrap with `fmt.Errorf("context: %w", err)`
- **Publication name**: Hardcoded as `_pgoutbox_`
- **Replication slot**: Configured via `listener.slotName`, auto-created on startup
49 changes: 14 additions & 35 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,46 +1,25 @@
name: CI
name: ci

on:
pull_request:
branches:
- "*"
push:
branches:
- master
workflow_dispatch:

concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.ref }}
cancel-in-progress: true
branches: [ "master" ]
pull_request:
branches: [ "master" ]

jobs:

build:
name: Build
runs-on: ubuntu-24.04
steps:
- name: Set up Go 1.24
uses: actions/setup-go@v5
with:
go-version: '1.24'
id: go
- uses: actions/checkout@v4

- name: Use Node.js 20.x
uses: actions/setup-node@v4
with:
node-version: '20'
check-latest: true
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.25'

- name: Check out code into the Go module directory
uses: actions/checkout@v4
- name: Build
run: go build -v ./...

- name: Prepare git
env:
GITHUB_USER: 1gtm
GITHUB_TOKEN: ${{ secrets.LGTM_GITHUB_TOKEN }}
run: |
set -x
git config --global user.name "${GITHUB_USER}"
git config --global user.email "${GITHUB_USER}@appscode.com"
git config --global \
url."https://${GITHUB_USER}:${GITHUB_TOKEN}@github.com".insteadOf \
"https://github.com"
- name: Test
run: go test -v ./...
38 changes: 21 additions & 17 deletions apis/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ func TestConfig_Validate(t *testing.T) {
Password: "pass",
},
Publisher: &PublisherCfg{
Type: "kafka",
Address: "addr",
Topic: "stream",
TopicPrefix: "prefix",
Type: "kafka",
Address: "addr",
Topic: "stream",
TopicPrefix: "prefix",
NatsCredPath: "/etc/nats/creds/admin.creds",
},
},
wantErr: nil,
Expand All @@ -65,13 +66,14 @@ func TestConfig_Validate(t *testing.T) {
Password: "pass",
},
Publisher: &PublisherCfg{
Type: "kafka",
Address: "addr",
Topic: "stream",
TopicPrefix: "prefix",
Type: "kafka",
Address: "addr",
Topic: "stream",
TopicPrefix: "prefix",
NatsCredPath: "/etc/nats/creds/admin.creds",
},
},
wantErr: errors.New("Listener.RefreshConnection: non zero value required;Listener.SlotName: non zero value required"),
wantErr: errors.New("Listener.refreshConnection: non zero value required;Listener.slotName: non zero value required"),
},
{
name: "bad db cfg",
Expand All @@ -91,13 +93,14 @@ func TestConfig_Validate(t *testing.T) {
Password: "pass",
},
Publisher: &PublisherCfg{
Type: "kafka",
Address: "addr",
Topic: "stream",
TopicPrefix: "prefix",
Type: "kafka",
Address: "addr",
Topic: "stream",
TopicPrefix: "prefix",
NatsCredPath: "/etc/nats/creds/admin.creds",
},
},
wantErr: errors.New("Database.Host: non zero value required;Database.Port: non zero value required"),
wantErr: errors.New("Database.host: non zero value required;Database.port: non zero value required"),
},
{
name: "empty publisher kind",
Expand All @@ -119,11 +122,12 @@ func TestConfig_Validate(t *testing.T) {
Password: "pass",
},
Publisher: &PublisherCfg{
Topic: "stream",
TopicPrefix: "prefix",
Topic: "stream",
TopicPrefix: "prefix",
NatsCredPath: "/etc/nats/creds/admin.creds",
},
},
wantErr: errors.New("Publisher.Type: non zero value required"),
wantErr: errors.New("Publisher.address: non zero value required;Publisher.type: non zero value required"),
},
}

Expand Down
93 changes: 66 additions & 27 deletions cmd/pgoutbox/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,83 @@ import (
"kubeops.dev/pgoutbox/apis"
"kubeops.dev/pgoutbox/internal/publisher"

"github.com/jackc/pgx"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgproto3"
"github.com/nats-io/nats.go"
"k8s.io/apimachinery/pkg/util/wait"
)

// replicationConn wraps pgconn.PgConn to implement the replication interface expected by the listener.
type replicationConn struct {
conn *pgconn.PgConn
}

func newReplicationConn(conn *pgconn.PgConn) *replicationConn {
return &replicationConn{conn: conn}
}

func (r *replicationConn) CreateReplicationSlot(ctx context.Context, slotName, outputPlugin string) (pglogrepl.CreateReplicationSlotResult, error) {
return pglogrepl.CreateReplicationSlot(ctx, r.conn, slotName, outputPlugin, pglogrepl.CreateReplicationSlotOptions{})
}

func (r *replicationConn) DropReplicationSlot(ctx context.Context, slotName string) error {
return pglogrepl.DropReplicationSlot(ctx, r.conn, slotName, pglogrepl.DropReplicationSlotOptions{})
}

func (r *replicationConn) StartReplication(ctx context.Context, slotName string, startLsn pglogrepl.LSN, options pglogrepl.StartReplicationOptions) error {
return pglogrepl.StartReplication(ctx, r.conn, slotName, startLsn, options)
}

func (r *replicationConn) ReceiveMessage(ctx context.Context) ([]byte, error) {
rawMsg, err := r.conn.ReceiveMessage(ctx)
if err != nil {
return nil, err
}

if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok {
return nil, fmt.Errorf("received error from postgres: %s", errMsg.Message)
}

msg, ok := rawMsg.(*pgproto3.CopyData)
if !ok {
return nil, nil
}

return msg.Data, nil
}

func (r *replicationConn) SendStandbyStatusUpdate(ctx context.Context, status pglogrepl.StandbyStatusUpdate) error {
return pglogrepl.SendStandbyStatusUpdate(ctx, r.conn, status)
}

func (r *replicationConn) IsAlive() bool {
return !r.conn.IsClosed()
}

func (r *replicationConn) Close() error {
return r.conn.Close(context.Background())
}

// initPgxConnections initialise db and replication connections.
func initPgxConnections(cfg *apis.DatabaseCfg, logger *slog.Logger, timeout time.Duration) (*pgx.Conn, *pgx.ReplicationConn, error) {
func initPgxConnections(cfg *apis.DatabaseCfg, logger *slog.Logger, timeout time.Duration) (*pgx.Conn, *pgconn.PgConn, error) {
var pgConn *pgx.Conn
var pgReplicationConn *pgx.ReplicationConn

pgxConf := pgx.ConnConfig{
LogLevel: pgx.LogLevelInfo,
Logger: pgxLogger{logger},
Host: cfg.Host,
Port: cfg.Port,
Database: cfg.Name,
User: cfg.User,
Password: cfg.Password,
}
var pgReplConn *pgconn.PgConn

connString := fmt.Sprintf("host=%s port=%d dbname=%s user=%s password=%s",
cfg.Host, cfg.Port, cfg.Name, cfg.User, cfg.Password)
replConnString := connString + " replication=database"

err := wait.PollUntilContextTimeout(context.TODO(), 5*time.Second, timeout, true, func(ctx context.Context) (bool, error) {
var err error
pgConn, err = pgx.Connect(pgxConf)
pgConn, err = pgx.Connect(ctx, connString)
if err != nil {
logger.Error("db connection:", slog.String("error", err.Error()))
return false, nil
}

pgReplicationConn, err = pgx.ReplicationConnect(pgxConf)
pgReplConn, err = pgconn.Connect(ctx, replConnString)
if err != nil {
logger.Error("db replication connection:", slog.String("error", err.Error()))
return false, nil
Expand All @@ -65,12 +113,12 @@ func initPgxConnections(cfg *apis.DatabaseCfg, logger *slog.Logger, timeout time
return nil, nil, fmt.Errorf("wait for db connection: %w", err)
}

return pgConn, pgReplicationConn, nil
return pgConn, pgReplConn, nil
}

func configureReplicaIdentityToFull(pgConn *pgx.Conn, filterTables apis.FilterStruct) error {
func configureReplicaIdentityToFull(ctx context.Context, pgConn *pgx.Conn, filterTables apis.FilterStruct) error {
for table := range filterTables.Tables {
_, err := pgConn.Exec(fmt.Sprintf("ALTER TABLE %s REPLICA IDENTITY FULL;", table))
_, err := pgConn.Exec(ctx, fmt.Sprintf("ALTER TABLE %s REPLICA IDENTITY FULL;", table))
if err != nil {
return fmt.Errorf("change replica identity to FULL for table %s: %w", table, err)
}
Expand All @@ -79,15 +127,6 @@ func configureReplicaIdentityToFull(pgConn *pgx.Conn, filterTables apis.FilterSt
return nil
}

type pgxLogger struct {
logger *slog.Logger
}

// Log DB message.
func (l pgxLogger) Log(_ pgx.LogLevel, msg string, _ map[string]any) {
l.logger.Debug(msg)
}

type eventPublisher interface {
Publish(context.Context, string, *apis.Event) error
Close() error
Expand Down
10 changes: 5 additions & 5 deletions cmd/pgoutbox/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (

// GetVersion returns latest git hash of commit.
func GetVersion() string {
var version = "unknown"
version := "unknown"

info, ok := debug.ReadBuildInfo()
if ok {
Expand Down Expand Up @@ -85,12 +85,12 @@ func main() {

logger := apis.InitSlog(cfg.Logger, version, false)

conn, rConn, err := initPgxConnections(cfg.Database, logger, time.Minute*10)
pgxConn, pgConn, err := initPgxConnections(cfg.Database, logger, time.Minute*10)
if err != nil {
return fmt.Errorf("pgx connection: %w", err)
}

if err = configureReplicaIdentityToFull(conn, cfg.Listener.Filter); err != nil {
if err = configureReplicaIdentityToFull(ctx, pgxConn, cfg.Listener.Filter); err != nil {
return fmt.Errorf("configure replica identity: %w", err)
}
pub, err := factoryPublisher(ctx, cfg.Publisher, logger)
Expand All @@ -107,8 +107,8 @@ func main() {
svc := listener.NewWalListener(
cfg,
logger,
listener.NewRepository(conn),
rConn,
listener.NewRepository(pgxConn),
newReplicationConn(pgConn),
pub,
transaction.NewBinaryParser(logger, binary.BigEndian),
apis.NewMetrics(),
Expand Down
Loading