diff --git a/config/config.go b/config/config.go index d9c1023..0dbb3e3 100644 --- a/config/config.go +++ b/config/config.go @@ -89,6 +89,10 @@ func (c *Config) SetDefault() { c.Slot.SlotActivityCheckerInterval = 1000 } + if c.Slot.ProtoVersion == 0 { + c.Slot.ProtoVersion = 2 + } + if c.Logger.Logger == nil { c.Logger.Logger = logger.NewSlog(c.Logger.LogLevel) } diff --git a/example/streaming-transactions/main.go b/example/streaming-transactions/main.go index 7339e2a..498ae2b 100644 --- a/example/streaming-transactions/main.go +++ b/example/streaming-transactions/main.go @@ -50,6 +50,7 @@ func main() { CreateIfNotExists: true, Name: "cdc_slot_streaming", SlotActivityCheckerInterval: 3000, + ProtoVersion: 2, }, Metric: config.MetricConfig{ Port: 8081, @@ -131,11 +132,6 @@ func main() { } } - if err = tx.Commit(ctx); err != nil { - slog.Error("commit hatası", "error", err) - os.Exit(1) - } - slog.Info("Transaction COMMIT edildi, mesajlar bekleniyor...") // --- Wait for the Results -------------------------------------------------- diff --git a/integration_test/basic_functionality_test.go b/integration_test/basic_functionality_test.go index 9d3148c..a2776fc 100644 --- a/integration_test/basic_functionality_test.go +++ b/integration_test/basic_functionality_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" cdc "github.com/Trendyol/go-pq-cdc" + "github.com/Trendyol/go-pq-cdc/config" "github.com/Trendyol/go-pq-cdc/pq/message/format" "github.com/Trendyol/go-pq-cdc/pq/replication" "github.com/stretchr/testify/assert" @@ -22,89 +23,91 @@ func TestBasicFunctionality(t *testing.T) { cdcCfg := Config cdcCfg.Slot.Name = "slot_test_basic_functionality" - postgresConn, err := newPostgresConn() - if !assert.NoError(t, err) { - t.FailNow() - } - - if !assert.NoError(t, SetupTestDB(ctx, postgresConn, cdcCfg)) { - t.FailNow() - } - - messageCh := make(chan any, 500) - handlerFunc := func(ctx *replication.ListenerContext) { - switch msg := ctx.Message.(type) { - case *format.Insert, *format.Delete, *format.Update: - messageCh <- msg - } - _ = ctx.Ack() - } - - connector, err := cdc.NewConnector(ctx, cdcCfg, handlerFunc) - if !assert.NoError(t, err) { - t.FailNow() - } - - t.Cleanup(func() { - connector.Close() - assert.NoError(t, RestoreDB(ctx)) - assert.NoError(t, postgresConn.Close(ctx)) - }) - - go connector.Start(ctx) - - waitCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - if !assert.NoError(t, connector.WaitUntilReady(waitCtx)) { - t.FailNow() - } - cancel() - - t.Run("Insert 10 book to table. Then check messages and metric", func(t *testing.T) { - books := CreateBooks(10) - for _, b := range books { - err = pgExec(ctx, postgresConn, fmt.Sprintf("INSERT INTO books(id, name) VALUES(%d, '%s')", b.ID, b.Name)) - assert.NoError(t, err) + forEachProtoVersion(t, cdcCfg, func(t *testing.T, cdcCfg config.Config) { + postgresConn, err := newPostgresConn() + if !assert.NoError(t, err) { + t.FailNow() } - for i := range 10 { - m := <-messageCh - assert.Equal(t, books[i].Map(), m.(*format.Insert).Decoded) + if !assert.NoError(t, SetupTestDB(ctx, postgresConn, cdcCfg)) { + t.FailNow() } - metric, _ := fetchInsertOpMetric() - assert.True(t, metric == 10) - }) - - t.Run("Update 5 book on table. Then check messages and metric", func(t *testing.T) { - books := CreateBooks(5) - for i, b := range books { - b.ID = i + 1 - books[i] = b - err = pgExec(ctx, postgresConn, fmt.Sprintf("UPDATE books SET name = '%s' WHERE id = %d", b.Name, b.ID)) - assert.NoError(t, err) + messageCh := make(chan any, 500) + handlerFunc := func(ctx *replication.ListenerContext) { + switch msg := ctx.Message.(type) { + case *format.Insert, *format.Delete, *format.Update: + messageCh <- msg + } + _ = ctx.Ack() } - for i := range 5 { - m := <-messageCh - assert.Equal(t, books[i].Map(), m.(*format.Update).NewDecoded) + connector, err := cdc.NewConnector(ctx, cdcCfg, handlerFunc) + if !assert.NoError(t, err) { + t.FailNow() } - metric, _ := fetchUpdateOpMetric() - assert.True(t, metric == 5) - }) + t.Cleanup(func() { + connector.Close() + assert.NoError(t, RestoreDB(ctx)) + assert.NoError(t, postgresConn.Close(ctx)) + }) - t.Run("Delete 5 book from table. Then check messages and metric", func(t *testing.T) { - for i := range 5 { - err = pgExec(ctx, postgresConn, fmt.Sprintf("DELETE FROM books WHERE id = %d", i+1)) - assert.NoError(t, err) - } + go connector.Start(ctx) - for i := range 5 { - m := <-messageCh - assert.Equal(t, int32(i+1), m.(*format.Delete).OldDecoded["id"]) + waitCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + if !assert.NoError(t, connector.WaitUntilReady(waitCtx)) { + t.FailNow() } - - metric, _ := fetchDeleteOpMetric() - assert.True(t, metric == 5) + cancel() + + t.Run("Insert 10 book to table. Then check messages and metric", func(t *testing.T) { + books := CreateBooks(10) + for _, b := range books { + err = pgExec(ctx, postgresConn, fmt.Sprintf("INSERT INTO books(id, name) VALUES(%d, '%s')", b.ID, b.Name)) + assert.NoError(t, err) + } + + for i := range 10 { + m := <-messageCh + assert.Equal(t, books[i].Map(), m.(*format.Insert).Decoded) + } + + metric, _ := fetchInsertOpMetric() + assert.True(t, metric == 10) + }) + + t.Run("Update 5 book on table. Then check messages and metric", func(t *testing.T) { + books := CreateBooks(5) + for i, b := range books { + b.ID = i + 1 + books[i] = b + err = pgExec(ctx, postgresConn, fmt.Sprintf("UPDATE books SET name = '%s' WHERE id = %d", b.Name, b.ID)) + assert.NoError(t, err) + } + + for i := range 5 { + m := <-messageCh + assert.Equal(t, books[i].Map(), m.(*format.Update).NewDecoded) + } + + metric, _ := fetchUpdateOpMetric() + assert.True(t, metric == 5) + }) + + t.Run("Delete 5 book from table. Then check messages and metric", func(t *testing.T) { + for i := range 5 { + err = pgExec(ctx, postgresConn, fmt.Sprintf("DELETE FROM books WHERE id = %d", i+1)) + assert.NoError(t, err) + } + + for i := range 5 { + m := <-messageCh + assert.Equal(t, int32(i+1), m.(*format.Delete).OldDecoded["id"]) + } + + metric, _ := fetchDeleteOpMetric() + assert.True(t, metric == 5) + }) }) } diff --git a/integration_test/concurrent_tx_ordering_test.go b/integration_test/concurrent_tx_ordering_test.go index eeca8b5..0899f9b 100644 --- a/integration_test/concurrent_tx_ordering_test.go +++ b/integration_test/concurrent_tx_ordering_test.go @@ -19,82 +19,75 @@ import ( // B's changes first, followed by A's. func TestConcurrentTxOrdering(t *testing.T) { ctx := context.Background() - slot := "slot_test_concurrent_order" - // ---------- connector prep ------------------------------------------- cdcCfg := Config - cdcCfg.Slot.Name = slot + cdcCfg.Slot.Name = "slot_test_concurrent_order" - pgConn, err := newPostgresConn() - if !assert.NoError(t, err) { - t.FailNow() - } - if !assert.NoError(t, SetupTestDB(ctx, pgConn, cdcCfg)) { - t.FailNow() - } - - msgCh := make(chan *format.Insert, 5) - handler := func(lCtx *replication.ListenerContext) { - if ins, ok := lCtx.Message.(*format.Insert); ok { - msgCh <- ins + forEachProtoVersion(t, cdcCfg, func(t *testing.T, cdcCfg config.Config) { + pgConn, err := newPostgresConn() + if !assert.NoError(t, err) { + t.FailNow() + } + if !assert.NoError(t, SetupTestDB(ctx, pgConn, cdcCfg)) { + t.FailNow() } - _ = lCtx.Ack() - } - - connector, _ := cdc.NewConnector(ctx, cdcCfg, handler) - go connector.Start(ctx) - waitCtx, cancel := context.WithTimeout(ctx, 4*time.Second) - assert.NoError(t, connector.WaitUntilReady(waitCtx)) - cancel() - cfg := config.Config{Host: Config.Host, Port: Config.Port, Username: "postgres", Password: "postgres", Database: Config.Database} - pool, _ := pgxpool.New(ctx, cfg.DSNWithoutSSL()) + msgCh := make(chan *format.Insert, 5) + handler := func(lCtx *replication.ListenerContext) { + if ins, ok := lCtx.Message.(*format.Insert); ok { + msgCh <- ins + } + _ = lCtx.Ack() + } - //---------------- start TxA (long) ------------------------------------- - txA, _ := pool.Begin(ctx) - _, _ = txA.Exec(ctx, "INSERT INTO books(id,name) VALUES(600,'A')") + connector, _ := cdc.NewConnector(ctx, cdcCfg, handler) + go connector.Start(ctx) + waitCtx, cancel := context.WithTimeout(ctx, 4*time.Second) + assert.NoError(t, connector.WaitUntilReady(waitCtx)) + cancel() - //---------------- TxB (short) commit first ----------------------------- - txB, _ := pool.Begin(ctx) - _, _ = txB.Exec(ctx, "INSERT INTO books(id,name) VALUES(601,'B')") - _ = txB.Commit(ctx) + cfg := config.Config{Host: Config.Host, Port: Config.Port, Username: "postgres", Password: "postgres", Database: Config.Database} + pool, _ := pgxpool.New(ctx, cfg.DSNWithoutSSL()) - // simulate some gap - time.Sleep(200 * time.Millisecond) + t.Cleanup(func() { + connector.Close() + _ = RestoreDB(ctx) + pool.Close() + }) - //---------------- commit TxA ------------------------------------------ - _ = txA.Commit(ctx) + txA, _ := pool.Begin(ctx) + _, _ = txA.Exec(ctx, "INSERT INTO books(id,name) VALUES(600,'A')") - //----------- collect two messages ------------------------------------ - var first, second *format.Insert - select { - case first = <-msgCh: - case <-time.After(2 * time.Second): - t.Fatal("timeout waiting first message") - } - select { - case second = <-msgCh: - case <-time.After(2 * time.Second): - t.Fatal("timeout waiting second message") - } + txB, _ := pool.Begin(ctx) + _, _ = txB.Exec(ctx, "INSERT INTO books(id,name) VALUES(601,'B')") + _ = txB.Commit(ctx) - // Expect B(id 601) comes before A(id 600) - assert.Equal(t, int32(601), first.Decoded["id"]) - assert.Equal(t, int32(600), second.Decoded["id"]) + time.Sleep(200 * time.Millisecond) + _ = txA.Commit(ctx) - // -------- validate confirmed_flush_lsn & restart_lsn ----------------- - // Give connector a moment to send ACK and Postgres to advance slot - time.Sleep(500 * time.Millisecond) - var restartLSN, confirmedLSN string - row := pool.QueryRow(ctx, "SELECT restart_lsn, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name=$1", slot) - err = row.Scan(&restartLSN, &confirmedLSN) - assert.NoError(t, err) - assert.NotEmpty(t, confirmedLSN) - // When no other transaction is in-progress, restart_lsn should equal confirmed - // restart_lsn is the oldest LSN that might still be needed; it is <= confirmed_flush_lsn - confirmed, _ := pq.ParseLSN(confirmedLSN) - restart, _ := pq.ParseLSN(restartLSN) - assert.LessOrEqual(t, restart, confirmed) + var first, second *format.Insert + select { + case first = <-msgCh: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting first message") + } + select { + case second = <-msgCh: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting second message") + } - connector.Close() + assert.Equal(t, int32(601), first.Decoded["id"]) + assert.Equal(t, int32(600), second.Decoded["id"]) + + time.Sleep(500 * time.Millisecond) + var restartLSN, confirmedLSN string + row := pool.QueryRow(ctx, "SELECT restart_lsn, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name=$1", cdcCfg.Slot.Name) + err = row.Scan(&restartLSN, &confirmedLSN) + assert.NoError(t, err) + assert.NotEmpty(t, confirmedLSN) + confirmed, _ := pq.ParseLSN(confirmedLSN) + restart, _ := pq.ParseLSN(restartLSN) + assert.LessOrEqual(t, restart, confirmed) + }) } diff --git a/integration_test/copy_protocol_test.go b/integration_test/copy_protocol_test.go index 8eecb7e..b0a8627 100644 --- a/integration_test/copy_protocol_test.go +++ b/integration_test/copy_protocol_test.go @@ -20,104 +20,106 @@ func TestCopyProtocol(t *testing.T) { cdcCfg := Config cdcCfg.Slot.Name = "slot_test_copy_protocol" - postgresConn, err := newPostgresConn() - if !assert.NoError(t, err) { - t.FailNow() - } - - if !assert.NoError(t, SetupTestDB(ctx, postgresConn, cdcCfg)) { - t.FailNow() - } - - messageCh := make(chan *replication.ListenerContext) - totalCounter := atomic.Int64{} - handlerFunc := func(ctx *replication.ListenerContext) { - switch ctx.Message.(type) { - case *format.Insert, *format.Delete, *format.Update: - totalCounter.Add(1) - messageCh <- ctx + forEachProtoVersion(t, cdcCfg, func(t *testing.T, cdcCfg config.Config) { + postgresConn, err := newPostgresConn() + if !assert.NoError(t, err) { + t.FailNow() } - } - - cdc2Cfg := cdcCfg - cdc2Cfg.Metric.Port = 8085 - connector, err := cdc.NewConnector(ctx, cdcCfg, handlerFunc) - if !assert.NoError(t, err) { - t.FailNow() - } - - connector2, err := cdc.NewConnector(ctx, cdcCfg, handlerFunc) - if !assert.NoError(t, err) { - t.FailNow() - } - - cfg := config.Config{Host: Config.Host, Port: Config.Port, Username: "postgres", Password: "postgres", Database: Config.Database} - pool, err := pgxpool.New(ctx, cfg.DSNWithoutSSL()) - if !assert.NoError(t, err) { - t.FailNow() - } - - t.Cleanup(func() { - pool.Close() - connector2.Close() - assert.NoError(t, RestoreDB(ctx)) - }) - go connector.Start(ctx) - - waitCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - if !assert.NoError(t, connector.WaitUntilReady(waitCtx)) { - t.FailNow() - } - cancel() + if !assert.NoError(t, SetupTestDB(ctx, postgresConn, cdcCfg)) { + t.FailNow() + } - go connector2.Start(ctx) + messageCh := make(chan *replication.ListenerContext) + totalCounter := atomic.Int64{} + handlerFunc := func(ctx *replication.ListenerContext) { + switch ctx.Message.(type) { + case *format.Insert, *format.Delete, *format.Update: + totalCounter.Add(1) + messageCh <- ctx + } + } - t.Run("Insert 30 book to table with Copy protocol. Then stop the consumer after 16th message processed", func(t *testing.T) { - entries := make([][]any, 30) - books := CreateBooks(30) + cdc2Cfg := cdcCfg + cdc2Cfg.Metric.Port = 8085 + connector, err := cdc.NewConnector(ctx, cdcCfg, handlerFunc) + if !assert.NoError(t, err) { + t.FailNow() + } - for i, user := range books { - entries[i] = []any{user.ID, user.Name} + connector2, err := cdc.NewConnector(ctx, cdcCfg, handlerFunc) + if !assert.NoError(t, err) { + t.FailNow() } - _, err = pool.CopyFrom( - ctx, - pgx.Identifier{"books"}, - []string{"id", "name"}, - pgx.CopyFromRows(entries), - ) - if err != nil { - t.Errorf("error copying into %s table: %v", "books", err) + cfg := config.Config{Host: Config.Host, Port: Config.Port, Username: "postgres", Password: "postgres", Database: Config.Database} + pool, err := pgxpool.New(ctx, cfg.DSNWithoutSSL()) + if !assert.NoError(t, err) { + t.FailNow() } - for { - m := <-messageCh - if v, ok := m.Message.(*format.Insert); ok { - if v.Decoded["id"].(int32) == 16 { - connector.Close() - break - } - } + t.Cleanup(func() { + pool.Close() + connector2.Close() + assert.NoError(t, RestoreDB(ctx)) + }) - assert.NoError(t, m.Ack()) - } - }) + go connector.Start(ctx) - t.Run("Run CDC again. Then check message count after all messages consumed", func(t *testing.T) { - waitCtx, cancel = context.WithTimeout(context.Background(), 3*time.Second) - if !assert.NoError(t, connector2.WaitUntilReady(waitCtx)) { + waitCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + if !assert.NoError(t, connector.WaitUntilReady(waitCtx)) { t.FailNow() } cancel() - for { - m := <-messageCh - if v, ok := m.Message.(*format.Insert); ok { - if v.Decoded["id"].(int32) == 30 { - break + go connector2.Start(ctx) + + t.Run("Insert 30 book to table with Copy protocol. Then stop the consumer after 16th message processed", func(t *testing.T) { + entries := make([][]any, 30) + books := CreateBooks(30) + + for i, user := range books { + entries[i] = []any{user.ID, user.Name} + } + + _, err = pool.CopyFrom( + ctx, + pgx.Identifier{"books"}, + []string{"id", "name"}, + pgx.CopyFromRows(entries), + ) + if err != nil { + t.Errorf("error copying into %s table: %v", "books", err) + } + + for { + m := <-messageCh + if v, ok := m.Message.(*format.Insert); ok { + if v.Decoded["id"].(int32) == 16 { + connector.Close() + break + } } + + assert.NoError(t, m.Ack()) } - } + }) + + t.Run("Run CDC again. Then check message count after all messages consumed", func(t *testing.T) { + waitCtx, cancel = context.WithTimeout(context.Background(), 3*time.Second) + if !assert.NoError(t, connector2.WaitUntilReady(waitCtx)) { + t.FailNow() + } + cancel() + + for { + m := <-messageCh + if v, ok := m.Message.(*format.Insert); ok { + if v.Decoded["id"].(int32) == 30 { + break + } + } + } + }) }) } diff --git a/integration_test/heartbeat_test.go b/integration_test/heartbeat_test.go index c34ac90..ff63be4 100644 --- a/integration_test/heartbeat_test.go +++ b/integration_test/heartbeat_test.go @@ -23,87 +23,89 @@ func TestHeartbeatAdvancesLSN(t *testing.T) { cdcCfg := Config cdcCfg.Slot.Name = "slot_test_heartbeat_lsn" - postgresConn, err := newPostgresConn() - if !assert.NoError(t, err) { - t.FailNow() - } - - // Ensure base DB objects (books table, drop old publication) - if !assert.NoError(t, SetupTestDB(ctx, postgresConn, cdcCfg)) { - t.FailNow() - } - - // Extend publication with heartbeat table so that heartbeat changes are part of CDC stream - cdcCfg.Publication.Tables = append(cdcCfg.Publication.Tables, - publication.Table{ - Name: "heartbeat_events", - Schema: "public", - ReplicaIdentity: publication.ReplicaIdentityFull, - }, - ) - - // Enable heartbeat by specifying the table (library auto-creates it) - cdcCfg.Heartbeat = config.HeartbeatConfig{ - Table: publication.Table{ - Name: "heartbeat_events", - Schema: "public", - }, - Interval: 2 * time.Second, - } - - messageCh := make(chan any, 10) - handlerFunc := func(ctx *replication.ListenerContext) { - // We don't assert on specific heartbeat events here; just make sure ACKs flow. - messageCh <- ctx.Message - _ = ctx.Ack() - } - - connector, err := cdc.NewConnector(ctx, cdcCfg, handlerFunc) - if !assert.NoError(t, err) { - t.FailNow() - } - - t.Cleanup(func() { - connector.Close() - assert.NoError(t, RestoreDB(ctx)) - assert.NoError(t, postgresConn.Close(ctx)) - }) - - go connector.Start(ctx) - - waitCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - if !assert.NoError(t, connector.WaitUntilReady(waitCtx)) { + forEachProtoVersion(t, cdcCfg, func(t *testing.T, cdcCfg config.Config) { + postgresConn, err := newPostgresConn() + if !assert.NoError(t, err) { + t.FailNow() + } + + // Ensure base DB objects (books table, drop old publication) + if !assert.NoError(t, SetupTestDB(ctx, postgresConn, cdcCfg)) { + t.FailNow() + } + + // Extend publication with heartbeat table so that heartbeat changes are part of CDC stream + cdcCfg.Publication.Tables = append(cdcCfg.Publication.Tables, + publication.Table{ + Name: "heartbeat_events", + Schema: "public", + ReplicaIdentity: publication.ReplicaIdentityFull, + }, + ) + + // Enable heartbeat by specifying the table (library auto-creates it) + cdcCfg.Heartbeat = config.HeartbeatConfig{ + Table: publication.Table{ + Name: "heartbeat_events", + Schema: "public", + }, + Interval: 2 * time.Second, + } + + messageCh := make(chan any, 10) + handlerFunc := func(ctx *replication.ListenerContext) { + // We don't assert on specific heartbeat events here; just make sure ACKs flow. + messageCh <- ctx.Message + _ = ctx.Ack() + } + + connector, err := cdc.NewConnector(ctx, cdcCfg, handlerFunc) + if !assert.NoError(t, err) { + t.FailNow() + } + + t.Cleanup(func() { + connector.Close() + assert.NoError(t, RestoreDB(ctx)) + assert.NoError(t, postgresConn.Close(ctx)) + }) + + go connector.Start(ctx) + + waitCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if !assert.NoError(t, connector.WaitUntilReady(waitCtx)) { + cancel() + t.FailNow() + } cancel() - t.FailNow() - } - cancel() - - // Capture initial LSNs for the test slot - initialRestart, initialConfirmed, err := readSlotLSNs(ctx, postgresConn, cdcCfg.Slot.Name) - if !assert.NoError(t, err) { - t.FailNow() - } - // Wait long enough for a few heartbeat cycles - time.Sleep(7 * time.Second) - - finalRestart, finalConfirmed, err := readSlotLSNs(ctx, postgresConn, cdcCfg.Slot.Name) - if !assert.NoError(t, err) { - t.FailNow() - } - - // Heartbeat should cause at least confirmed_flush_lsn to move forward. - assert.NotEmpty(t, initialConfirmed) - assert.NotEmpty(t, finalConfirmed) - assert.NotEqualf(t, initialConfirmed, finalConfirmed, - "expected confirmed_flush_lsn to advance due to heartbeat, got initial=%s final=%s", - initialConfirmed, finalConfirmed, - ) - - // restart_lsn may move less frequently, but for practical purposes - // it should also advance when only heartbeat is producing changes. - assert.NotEmpty(t, initialRestart) - assert.NotEmpty(t, finalRestart) + // Capture initial LSNs for the test slot + initialRestart, initialConfirmed, err := readSlotLSNs(ctx, postgresConn, cdcCfg.Slot.Name) + if !assert.NoError(t, err) { + t.FailNow() + } + + // Wait long enough for a few heartbeat cycles + time.Sleep(7 * time.Second) + + finalRestart, finalConfirmed, err := readSlotLSNs(ctx, postgresConn, cdcCfg.Slot.Name) + if !assert.NoError(t, err) { + t.FailNow() + } + + // Heartbeat should cause at least confirmed_flush_lsn to move forward. + assert.NotEmpty(t, initialConfirmed) + assert.NotEmpty(t, finalConfirmed) + assert.NotEqualf(t, initialConfirmed, finalConfirmed, + "expected confirmed_flush_lsn to advance due to heartbeat, got initial=%s final=%s", + initialConfirmed, finalConfirmed, + ) + + // restart_lsn may move less frequently, but for practical purposes + // it should also advance when only heartbeat is producing changes. + assert.NotEmpty(t, initialRestart) + assert.NotEmpty(t, finalRestart) + }) } // readSlotLSNs fetches restart_lsn and confirmed_flush_lsn for a given slot diff --git a/integration_test/main_test.go b/integration_test/main_test.go index e29f0b9..094d7e3 100644 --- a/integration_test/main_test.go +++ b/integration_test/main_test.go @@ -219,6 +219,25 @@ func pgExec(ctx context.Context, conn pq.Connection, command string) error { return nil } +// protoVersions lists the pgoutput protocol versions that every CDC +// integration test should exercise. +var protoVersions = []int{1, 2} + +// forEachProtoVersion runs fn as a subtest for every protocol version. +// It copies cdcCfg, sets ProtoVersion, and appends "_v{N}" to the slot name +// so that parallel subtests don't collide. +func forEachProtoVersion(t *testing.T, cdcCfg config.Config, fn func(t *testing.T, cfg config.Config)) { + t.Helper() + for _, v := range protoVersions { + cfg := cdcCfg + cfg.Slot.ProtoVersion = v + cfg.Slot.Name = fmt.Sprintf("%s_v%d", cdcCfg.Slot.Name, v) + t.Run(fmt.Sprintf("proto_v%d", v), func(t *testing.T) { + fn(t, cfg) + }) + } +} + func fetchDeleteOpMetric() (int, error) { m, err := fetchMetrics("go_pq_cdc_delete_total") mi, _ := strconv.Atoi(m) diff --git a/integration_test/streaming_rollback_test.go b/integration_test/streaming_rollback_test.go new file mode 100644 index 0000000..18fba69 --- /dev/null +++ b/integration_test/streaming_rollback_test.go @@ -0,0 +1,203 @@ +package integration + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" + + cdc "github.com/Trendyol/go-pq-cdc" + "github.com/Trendyol/go-pq-cdc/config" + "github.com/Trendyol/go-pq-cdc/pq/message/format" + "github.com/Trendyol/go-pq-cdc/pq/replication" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/assert" +) + +// TestStreamingTransactionRollback verifies that when a large streaming +// transaction is explicitly rolled back, NO messages are delivered to the +// consumer. PostgreSQL sends StreamAbort for explicit rollbacks and the +// streamTxBuffer must discard all accumulated messages for that XID. +func TestStreamingTransactionRollback(t *testing.T) { + const ( + rowCount = 500 + slotName = "slot_test_stream_rollback" + ) + + ctx := context.Background() + + // Force PostgreSQL to stream in-progress transactions. + lowerLogicalDecodingWorkMem(ctx, t) + + cdcCfg := Config + cdcCfg.Slot.Name = slotName + cdcCfg.Slot.ProtoVersion = 2 + + postgresConn, err := newPostgresConn() + if !assert.NoError(t, err) { + t.FailNow() + } + if !assert.NoError(t, SetupTestDB(ctx, postgresConn, cdcCfg)) { + t.FailNow() + } + + msgCh := make(chan any, 10) + handler := func(lCtx *replication.ListenerContext) { + switch lCtx.Message.(type) { + case *format.Insert: + msgCh <- lCtx.Message + } + _ = lCtx.Ack() + } + + connector, err := cdc.NewConnector(ctx, cdcCfg, handler) + if !assert.NoError(t, err) { + t.FailNow() + } + + cfg := config.Config{Host: Config.Host, Port: Config.Port, Username: "postgres", Password: "postgres", Database: Config.Database} + pool, err := pgxpool.New(ctx, cfg.DSNWithoutSSL()) + if !assert.NoError(t, err) { + t.FailNow() + } + + t.Cleanup(func() { + connector.Close() + _ = RestoreDB(ctx) + pool.Close() + _ = postgresConn.Close(ctx) + }) + + go connector.Start(ctx) + waitCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + assert.NoError(t, connector.WaitUntilReady(waitCtx)) + cancel() + + // --- Insert many rows then ROLLBACK ----------------------------------- + tx, err := pool.Begin(ctx) + if !assert.NoError(t, err) { + t.FailNow() + } + for i := 0; i < rowCount; i++ { + _, err = tx.Exec(ctx, "INSERT INTO books (id, name) VALUES ($1, $2)", i+60000, fmt.Sprintf("rollback-book-%d", i)) + if !assert.NoError(t, err) { + _ = tx.Rollback(ctx) + t.FailNow() + } + } + assert.NoError(t, tx.Rollback(ctx)) + + // --- No messages should arrive ---------------------------------------- + select { + case msg := <-msgCh: + t.Fatalf("unexpected message received after streaming rollback: %v", msg) + case <-time.After(3 * time.Second): + // success – no messages delivered + } +} + +// TestStreamingRollbackThenCommit verifies that a rolled-back streaming +// transaction does not pollute a subsequent committed transaction. +// +// Timeline: +// +// TX-A (streamed): BEGIN → many INSERTs → ROLLBACK → no messages +// TX-B (streamed): BEGIN → many INSERTs → COMMIT → all messages delivered +// +// This ensures that streamTxBuffer correctly discards TX-A's messages on +// StreamAbort and independently delivers TX-B's messages on StreamCommit. +func TestStreamingRollbackThenCommit(t *testing.T) { + const ( + rowCount = 500 + slotName = "slot_test_stream_rollback_commit" + ) + + ctx := context.Background() + + lowerLogicalDecodingWorkMem(ctx, t) + + cdcCfg := Config + cdcCfg.Slot.Name = slotName + cdcCfg.Slot.ProtoVersion = 2 + + postgresConn, err := newPostgresConn() + if !assert.NoError(t, err) { + t.FailNow() + } + if !assert.NoError(t, SetupTestDB(ctx, postgresConn, cdcCfg)) { + t.FailNow() + } + + var received atomic.Int64 + msgCh := make(chan *format.Insert, rowCount+10) + handler := func(lCtx *replication.ListenerContext) { + if ins, ok := lCtx.Message.(*format.Insert); ok { + received.Add(1) + msgCh <- ins + } + _ = lCtx.Ack() + } + + connector, err := cdc.NewConnector(ctx, cdcCfg, handler) + if !assert.NoError(t, err) { + t.FailNow() + } + + cfg := config.Config{Host: Config.Host, Port: Config.Port, Username: "postgres", Password: "postgres", Database: Config.Database} + pool, err := pgxpool.New(ctx, cfg.DSNWithoutSSL()) + if !assert.NoError(t, err) { + t.FailNow() + } + + t.Cleanup(func() { + connector.Close() + _ = RestoreDB(ctx) + pool.Close() + _ = postgresConn.Close(ctx) + }) + + go connector.Start(ctx) + waitCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + assert.NoError(t, connector.WaitUntilReady(waitCtx)) + cancel() + + // --- TX-A: large transaction → ROLLBACK ------------------------------- + txA, err := pool.Begin(ctx) + if !assert.NoError(t, err) { + t.FailNow() + } + for i := 0; i < rowCount; i++ { + _, err = txA.Exec(ctx, "INSERT INTO books (id, name) VALUES ($1, $2)", i+70000, fmt.Sprintf("rollback-%d", i)) + assert.NoError(t, err) + } + assert.NoError(t, txA.Rollback(ctx)) + + // --- TX-B: large transaction → COMMIT --------------------------------- + txB, err := pool.Begin(ctx) + if !assert.NoError(t, err) { + t.FailNow() + } + for i := 0; i < rowCount; i++ { + _, err = txB.Exec(ctx, "INSERT INTO books (id, name) VALUES ($1, $2)", i+80000, fmt.Sprintf("commit-%d", i)) + assert.NoError(t, err) + } + assert.NoError(t, txB.Commit(ctx)) + + // --- Only TX-B's messages should arrive ------------------------------- + deadline := time.After(15 * time.Second) + for received.Load() < int64(rowCount) { + select { + case <-deadline: + t.Fatalf("timeout: expected %d insert messages from committed tx, got %d", rowCount, received.Load()) + case <-msgCh: + case <-time.After(100 * time.Millisecond): + } + } + + // Wait a bit more to ensure no extra messages from the rolled-back tx leak through + time.Sleep(1 * time.Second) + + assert.Equal(t, int64(rowCount), received.Load(), + "expected exactly %d messages from committed tx, but got %d (rolled-back tx may have leaked)", rowCount, received.Load()) +} diff --git a/integration_test/streaming_transaction_test.go b/integration_test/streaming_transaction_test.go index fdd0631..08dc128 100644 --- a/integration_test/streaming_transaction_test.go +++ b/integration_test/streaming_transaction_test.go @@ -62,6 +62,7 @@ func TestStreamingLargeTransactionCommit(t *testing.T) { // --- CDC setup -------------------------------------------------------- cdcCfg := Config cdcCfg.Slot.Name = slotName + cdcCfg.Slot.ProtoVersion = 2 postgresConn, err := newPostgresConn() if !assert.NoError(t, err) { @@ -161,6 +162,7 @@ func TestStreamingInterleavedTransactions(t *testing.T) { // --- CDC setup -------------------------------------------------------- cdcCfg := Config cdcCfg.Slot.Name = slotName + cdcCfg.Slot.ProtoVersion = 2 postgresConn, err := newPostgresConn() if !assert.NoError(t, err) { diff --git a/integration_test/system_identity_full_test.go b/integration_test/system_identity_full_test.go index 5be9518..200608b 100644 --- a/integration_test/system_identity_full_test.go +++ b/integration_test/system_identity_full_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" cdc "github.com/Trendyol/go-pq-cdc" + "github.com/Trendyol/go-pq-cdc/config" "github.com/Trendyol/go-pq-cdc/pq/message/format" "github.com/Trendyol/go-pq-cdc/pq/publication" "github.com/Trendyol/go-pq-cdc/pq/replication" @@ -19,67 +20,69 @@ func TestReplicaIdentityDefault(t *testing.T) { cdcCfg.Slot.Name = "slot_test_replica_identity_default" cdcCfg.Publication.Tables[0].ReplicaIdentity = publication.ReplicaIdentityDefault - postgresConn, err := newPostgresConn() - if !assert.NoError(t, err) { - t.FailNow() - } - - if !assert.NoError(t, SetupTestDB(ctx, postgresConn, cdcCfg)) { - t.FailNow() - } - - messageCh := make(chan any, 500) - handlerFunc := func(ctx *replication.ListenerContext) { - switch msg := ctx.Message.(type) { - case *format.Insert, *format.Delete, *format.Update: - messageCh <- msg + forEachProtoVersion(t, cdcCfg, func(t *testing.T, cdcCfg config.Config) { + postgresConn, err := newPostgresConn() + if !assert.NoError(t, err) { + t.FailNow() } - _ = ctx.Ack() - } - - connector, err := cdc.NewConnector(ctx, cdcCfg, handlerFunc) - if !assert.NoError(t, err) { - t.FailNow() - } - - defer func() { - connector.Close() - assert.NoError(t, RestoreDB(ctx)) - assert.NoError(t, postgresConn.Close(ctx)) - }() - - go connector.Start(ctx) - - waitCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - if !assert.NoError(t, connector.WaitUntilReady(waitCtx)) { - t.FailNow() - } - cancel() - - t.Run("should return old value is nil when update message received", func(t *testing.T) { - books := CreateBooks(10) - for _, b := range books { - err = pgExec(ctx, postgresConn, fmt.Sprintf("INSERT INTO books(id, name) VALUES(%d, '%s')", b.ID, b.Name)) - assert.NoError(t, err) + + if !assert.NoError(t, SetupTestDB(ctx, postgresConn, cdcCfg)) { + t.FailNow() } - for range 10 { - <-messageCh + messageCh := make(chan any, 500) + handlerFunc := func(ctx *replication.ListenerContext) { + switch msg := ctx.Message.(type) { + case *format.Insert, *format.Delete, *format.Update: + messageCh <- msg + } + _ = ctx.Ack() } - booksNew := CreateBooks(5) - for i, b := range booksNew { - b.ID = i + 1 - booksNew[i] = b - err = pgExec(ctx, postgresConn, fmt.Sprintf("UPDATE books SET name = '%s' WHERE id = %d", b.Name, b.ID)) - assert.NoError(t, err) + connector, err := cdc.NewConnector(ctx, cdcCfg, handlerFunc) + if !assert.NoError(t, err) { + t.FailNow() } - for i := range 5 { - m := <-messageCh - assert.Equal(t, booksNew[i].Map(), m.(*format.Update).NewDecoded) - assert.Nil(t, m.(*format.Update).OldDecoded["id"]) + t.Cleanup(func() { + connector.Close() + assert.NoError(t, RestoreDB(ctx)) + assert.NoError(t, postgresConn.Close(ctx)) + }) + + go connector.Start(ctx) + + waitCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + if !assert.NoError(t, connector.WaitUntilReady(waitCtx)) { + t.FailNow() } + cancel() + + t.Run("should return old value is nil when update message received", func(t *testing.T) { + books := CreateBooks(10) + for _, b := range books { + err = pgExec(ctx, postgresConn, fmt.Sprintf("INSERT INTO books(id, name) VALUES(%d, '%s')", b.ID, b.Name)) + assert.NoError(t, err) + } + + for range 10 { + <-messageCh + } + + booksNew := CreateBooks(5) + for i, b := range booksNew { + b.ID = i + 1 + booksNew[i] = b + err = pgExec(ctx, postgresConn, fmt.Sprintf("UPDATE books SET name = '%s' WHERE id = %d", b.Name, b.ID)) + assert.NoError(t, err) + } + + for i := range 5 { + m := <-messageCh + assert.Equal(t, booksNew[i].Map(), m.(*format.Update).NewDecoded) + assert.Nil(t, m.(*format.Update).OldDecoded["id"]) + } + }) }) } @@ -90,66 +93,68 @@ func TestReplicaIdentityFull(t *testing.T) { cdcCfg.Slot.Name = "slot_test_replica_identity_full" cdcCfg.Publication.Tables[0].ReplicaIdentity = publication.ReplicaIdentityFull - postgresConn, err := newPostgresConn() - if !assert.NoError(t, err) { - t.FailNow() - } - - if !assert.NoError(t, SetupTestDB(ctx, postgresConn, cdcCfg)) { - t.FailNow() - } - - messageCh := make(chan any, 500) - handlerFunc := func(ctx *replication.ListenerContext) { - switch msg := ctx.Message.(type) { - case *format.Insert, *format.Delete, *format.Update: - messageCh <- msg + forEachProtoVersion(t, cdcCfg, func(t *testing.T, cdcCfg config.Config) { + postgresConn, err := newPostgresConn() + if !assert.NoError(t, err) { + t.FailNow() } - _ = ctx.Ack() - } - - connector, err := cdc.NewConnector(ctx, cdcCfg, handlerFunc) - if !assert.NoError(t, err) { - t.FailNow() - } - - t.Cleanup(func() { - connector.Close() - assert.NoError(t, RestoreDB(ctx)) - assert.NoError(t, postgresConn.Close(ctx)) - }) - - go connector.Start(ctx) - waitCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - if !assert.NoError(t, connector.WaitUntilReady(waitCtx)) { - t.FailNow() - } - cancel() - - t.Run("should return new value and old value when update message received", func(t *testing.T) { - books := CreateBooks(10) - for _, b := range books { - err = pgExec(ctx, postgresConn, fmt.Sprintf("INSERT INTO books(id, name) VALUES(%d, '%s')", b.ID, b.Name)) - assert.NoError(t, err) + if !assert.NoError(t, SetupTestDB(ctx, postgresConn, cdcCfg)) { + t.FailNow() } - for range 10 { - <-messageCh + messageCh := make(chan any, 500) + handlerFunc := func(ctx *replication.ListenerContext) { + switch msg := ctx.Message.(type) { + case *format.Insert, *format.Delete, *format.Update: + messageCh <- msg + } + _ = ctx.Ack() } - booksNew := CreateBooks(5) - for i, b := range booksNew { - b.ID = i + 1 - booksNew[i] = b - err = pgExec(ctx, postgresConn, fmt.Sprintf("UPDATE books SET name = '%s' WHERE id = %d", b.Name, b.ID)) - assert.NoError(t, err) + connector, err := cdc.NewConnector(ctx, cdcCfg, handlerFunc) + if !assert.NoError(t, err) { + t.FailNow() } - for i := range 5 { - m := <-messageCh - assert.Equal(t, booksNew[i].Map(), m.(*format.Update).NewDecoded) - assert.Equal(t, books[i].Map(), m.(*format.Update).OldDecoded) + t.Cleanup(func() { + connector.Close() + assert.NoError(t, RestoreDB(ctx)) + assert.NoError(t, postgresConn.Close(ctx)) + }) + + go connector.Start(ctx) + + waitCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + if !assert.NoError(t, connector.WaitUntilReady(waitCtx)) { + t.FailNow() } + cancel() + + t.Run("should return new value and old value when update message received", func(t *testing.T) { + books := CreateBooks(10) + for _, b := range books { + err = pgExec(ctx, postgresConn, fmt.Sprintf("INSERT INTO books(id, name) VALUES(%d, '%s')", b.ID, b.Name)) + assert.NoError(t, err) + } + + for range 10 { + <-messageCh + } + + booksNew := CreateBooks(5) + for i, b := range booksNew { + b.ID = i + 1 + booksNew[i] = b + err = pgExec(ctx, postgresConn, fmt.Sprintf("UPDATE books SET name = '%s' WHERE id = %d", b.Name, b.ID)) + assert.NoError(t, err) + } + + for i := range 5 { + m := <-messageCh + assert.Equal(t, booksNew[i].Map(), m.(*format.Update).NewDecoded) + assert.Equal(t, books[i].Map(), m.(*format.Update).OldDecoded) + } + }) }) } diff --git a/integration_test/transactional_large_commit_test.go b/integration_test/transactional_large_commit_test.go index 00e556c..0293f54 100644 --- a/integration_test/transactional_large_commit_test.go +++ b/integration_test/transactional_large_commit_test.go @@ -31,6 +31,7 @@ func TestLargeTransactionalCommit(t *testing.T) { // --- CDC CONFIGURATION ------------------------------------------------- cdcCfg := Config cdcCfg.Slot.Name = slotName + cdcCfg.Slot.ProtoVersion = 1 postgresConn, err := newPostgresConn() if !assert.NoError(t, err) { diff --git a/integration_test/transactional_large_rollback_test.go b/integration_test/transactional_large_rollback_test.go index ce3c995..f3be0c0 100644 --- a/integration_test/transactional_large_rollback_test.go +++ b/integration_test/transactional_large_rollback_test.go @@ -24,63 +24,61 @@ func TestLargeTransactionalRollback(t *testing.T) { ctx := context.Background() - // ---------- CDC connector setup --------------------------------------- cdcCfg := Config cdcCfg.Slot.Name = slotName - postgresConn, err := newPostgresConn() - if !assert.NoError(t, err) { - t.FailNow() - } - if !assert.NoError(t, SetupTestDB(ctx, postgresConn, cdcCfg)) { - t.FailNow() - } + forEachProtoVersion(t, cdcCfg, func(t *testing.T, cdcCfg config.Config) { + postgresConn, err := newPostgresConn() + if !assert.NoError(t, err) { + t.FailNow() + } + if !assert.NoError(t, SetupTestDB(ctx, postgresConn, cdcCfg)) { + t.FailNow() + } - msgCh := make(chan any, 10) // expect 0 - handler := func(lCtx *replication.ListenerContext) { - switch lCtx.Message.(type) { - case *format.Insert: - msgCh <- lCtx.Message + msgCh := make(chan any, 10) + handler := func(lCtx *replication.ListenerContext) { + switch lCtx.Message.(type) { + case *format.Insert: + msgCh <- lCtx.Message + } + _ = lCtx.Ack() } - _ = lCtx.Ack() - } - connector, err := cdc.NewConnector(ctx, cdcCfg, handler) - if !assert.NoError(t, err) { - t.FailNow() - } + connector, err := cdc.NewConnector(ctx, cdcCfg, handler) + if !assert.NoError(t, err) { + t.FailNow() + } - cfg := config.Config{Host: Config.Host, Port: Config.Port, Username: "postgres", Password: "postgres", Database: Config.Database} - pool, err := pgxpool.New(ctx, cfg.DSNWithoutSSL()) - if !assert.NoError(t, err) { - t.FailNow() - } + cfg := config.Config{Host: Config.Host, Port: Config.Port, Username: "postgres", Password: "postgres", Database: Config.Database} + pool, err := pgxpool.New(ctx, cfg.DSNWithoutSSL()) + if !assert.NoError(t, err) { + t.FailNow() + } - t.Cleanup(func() { - connector.Close() - _ = RestoreDB(ctx) - pool.Close() - }) + t.Cleanup(func() { + connector.Close() + _ = RestoreDB(ctx) + pool.Close() + }) - go connector.Start(ctx) - waitCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - assert.NoError(t, connector.WaitUntilReady(waitCtx)) - cancel() + go connector.Start(ctx) + waitCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + assert.NoError(t, connector.WaitUntilReady(waitCtx)) + cancel() - // ---------- Large transaction with ROLLBACK --------------------------- - tx, err := pool.Begin(ctx) - assert.NoError(t, err) - for i := 0; i < rowCount; i++ { - _, err = tx.Exec(ctx, "INSERT INTO books (id, name) VALUES ($1, 'temp')", i+20000) + tx, err := pool.Begin(ctx) assert.NoError(t, err) - } - assert.NoError(t, tx.Rollback(ctx)) + for i := 0; i < rowCount; i++ { + _, err = tx.Exec(ctx, "INSERT INTO books (id, name) VALUES ($1, 'temp')", i+20000) + assert.NoError(t, err) + } + assert.NoError(t, tx.Rollback(ctx)) - // ---------- Validate that no message is received ---------------------- - select { - case <-msgCh: - t.Fatalf("unexpected message received after rollback") - case <-time.After(2 * time.Second): - // success, channel remained silent - } + select { + case <-msgCh: + t.Fatalf("unexpected message received after rollback") + case <-time.After(2 * time.Second): + } + }) } diff --git a/integration_test/transactional_process_test.go b/integration_test/transactional_process_test.go index e658113..d4ed70e 100644 --- a/integration_test/transactional_process_test.go +++ b/integration_test/transactional_process_test.go @@ -18,101 +18,103 @@ func TestTransactionalProcess(t *testing.T) { cdcCfg := Config cdcCfg.Slot.Name = "slot_test_transactional_process" - postgresConn, err := newPostgresConn() - if !assert.NoError(t, err) { - t.FailNow() - } - - if !assert.NoError(t, SetupTestDB(ctx, postgresConn, cdcCfg)) { - t.FailNow() - } - - messageCh := make(chan any, 500) - handlerFunc := func(ctx *replication.ListenerContext) { - switch msg := ctx.Message.(type) { - case *format.Insert, *format.Delete, *format.Update: - messageCh <- msg + forEachProtoVersion(t, cdcCfg, func(t *testing.T, cdcCfg config.Config) { + postgresConn, err := newPostgresConn() + if !assert.NoError(t, err) { + t.FailNow() } - _ = ctx.Ack() - } - - connector, err := cdc.NewConnector(ctx, cdcCfg, handlerFunc) - if !assert.NoError(t, err) { - t.FailNow() - } - - cfg := config.Config{Host: Config.Host, Port: Config.Port, Username: "postgres", Password: "postgres", Database: Config.Database} - pool, err := pgxpool.New(ctx, cfg.DSNWithoutSSL()) - if !assert.NoError(t, err) { - t.FailNow() - } - - t.Cleanup(func() { - connector.Close() - err = RestoreDB(ctx) - assert.NoError(t, err) - - pool.Close() - }) - go connector.Start(ctx) + if !assert.NoError(t, SetupTestDB(ctx, postgresConn, cdcCfg)) { + t.FailNow() + } - waitCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - if !assert.NoError(t, connector.WaitUntilReady(waitCtx)) { - t.FailNow() - } - cancel() + messageCh := make(chan any, 500) + handlerFunc := func(ctx *replication.ListenerContext) { + switch msg := ctx.Message.(type) { + case *format.Insert, *format.Delete, *format.Update: + messageCh <- msg + } + _ = ctx.Ack() + } - t.Run("Start transactional operation and commit. Then check the messages and metrics", func(t *testing.T) { - tx, err := pool.Begin(ctx) - assert.NoError(t, err) + connector, err := cdc.NewConnector(ctx, cdcCfg, handlerFunc) + if !assert.NoError(t, err) { + t.FailNow() + } - _, err = tx.Exec(ctx, "INSERT INTO books (id, name) VALUES (12, 'j*va is best')") - assert.NoError(t, err) + cfg := config.Config{Host: Config.Host, Port: Config.Port, Username: "postgres", Password: "postgres", Database: Config.Database} + pool, err := pgxpool.New(ctx, cfg.DSNWithoutSSL()) + if !assert.NoError(t, err) { + t.FailNow() + } - _, err = tx.Exec(ctx, "UPDATE books SET name = 'go is best' WHERE id = 12") - assert.NoError(t, err) + t.Cleanup(func() { + connector.Close() + err = RestoreDB(ctx) + assert.NoError(t, err) - err = tx.Commit(ctx) - assert.NoError(t, err) + pool.Close() + }) - insertMessage := <-messageCh - assert.Equal(t, map[string]any{"id": int32(12), "name": "j*va is best"}, insertMessage.(*format.Insert).Decoded) - updateMessage := <-messageCh - assert.Equal(t, map[string]any{"id": int32(12), "name": "go is best"}, updateMessage.(*format.Update).NewDecoded) + go connector.Start(ctx) - updateMetric, _ := fetchUpdateOpMetric() - insertMetric, _ := fetchInsertOpMetric() - deleteMetric, _ := fetchDeleteOpMetric() - assert.True(t, updateMetric == 1) - assert.True(t, insertMetric == 1) - assert.True(t, deleteMetric == 0) - }) + waitCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + if !assert.NoError(t, connector.WaitUntilReady(waitCtx)) { + t.FailNow() + } + cancel() + + t.Run("Start transactional operation and commit. Then check the messages and metrics", func(t *testing.T) { + tx, err := pool.Begin(ctx) + assert.NoError(t, err) + + _, err = tx.Exec(ctx, "INSERT INTO books (id, name) VALUES (12, 'j*va is best')") + assert.NoError(t, err) + + _, err = tx.Exec(ctx, "UPDATE books SET name = 'go is best' WHERE id = 12") + assert.NoError(t, err) + + err = tx.Commit(ctx) + assert.NoError(t, err) + + insertMessage := <-messageCh + assert.Equal(t, map[string]any{"id": int32(12), "name": "j*va is best"}, insertMessage.(*format.Insert).Decoded) + updateMessage := <-messageCh + assert.Equal(t, map[string]any{"id": int32(12), "name": "go is best"}, updateMessage.(*format.Update).NewDecoded) + + updateMetric, _ := fetchUpdateOpMetric() + insertMetric, _ := fetchInsertOpMetric() + deleteMetric, _ := fetchDeleteOpMetric() + assert.True(t, updateMetric == 1) + assert.True(t, insertMetric == 1) + assert.True(t, deleteMetric == 0) + }) - t.Run("Start transactional operation and rollback. Then Delete book which id is 12. Then check the messages and metrics", func(t *testing.T) { - tx, err := pool.Begin(ctx) - assert.NoError(t, err) + t.Run("Start transactional operation and rollback. Then Delete book which id is 12. Then check the messages and metrics", func(t *testing.T) { + tx, err := pool.Begin(ctx) + assert.NoError(t, err) - _, err = tx.Exec(ctx, "INSERT INTO books (id, name) VALUES (13, 'j*va is best')") - assert.NoError(t, err) + _, err = tx.Exec(ctx, "INSERT INTO books (id, name) VALUES (13, 'j*va is best')") + assert.NoError(t, err) - _, err = tx.Exec(ctx, "UPDATE books SET name = 'go is best' WHERE id = 13") - assert.NoError(t, err) + _, err = tx.Exec(ctx, "UPDATE books SET name = 'go is best' WHERE id = 13") + assert.NoError(t, err) - err = tx.Rollback(ctx) - assert.NoError(t, err) + err = tx.Rollback(ctx) + assert.NoError(t, err) - _, err = pool.Exec(ctx, "DELETE FROM books WHERE id = 12") - assert.NoError(t, err) + _, err = pool.Exec(ctx, "DELETE FROM books WHERE id = 12") + assert.NoError(t, err) - deleteMessage := <-messageCh - assert.Equal(t, int32(12), deleteMessage.(*format.Delete).OldDecoded["id"]) + deleteMessage := <-messageCh + assert.Equal(t, int32(12), deleteMessage.(*format.Delete).OldDecoded["id"]) - updateMetric, _ := fetchUpdateOpMetric() - insertMetric, _ := fetchInsertOpMetric() - deleteMetric, _ := fetchDeleteOpMetric() - assert.True(t, updateMetric == 1) - assert.True(t, insertMetric == 1) - assert.True(t, deleteMetric == 1) + updateMetric, _ := fetchUpdateOpMetric() + insertMetric, _ := fetchInsertOpMetric() + deleteMetric, _ := fetchDeleteOpMetric() + assert.True(t, updateMetric == 1) + assert.True(t, insertMetric == 1) + assert.True(t, deleteMetric == 1) + }) }) } diff --git a/pq/message/format/stream.go b/pq/message/format/stream.go index 1730ad7..f2758b7 100644 --- a/pq/message/format/stream.go +++ b/pq/message/format/stream.go @@ -8,15 +8,55 @@ import ( "github.com/go-playground/errors" ) +// StreamStart signals the beginning of a streaming transaction chunk. +// Between StreamStart and StreamStop, DML events belong to an in-progress +// transaction that has not yet been committed. +type StreamStart struct { + Xid uint32 + FirstSegment bool +} + +func NewStreamStart(data []byte) (*StreamStart, error) { + // StreamStart message format: + // Byte1('S') - message type (already at data[0]) + // Int32 - Xid (4 bytes) + // Int8 - first_segment flag (1 byte) + // Total: 1 + 4 + 1 = 6 bytes minimum + if len(data) < 6 { + return nil, errors.Newf("stream start message length must be at least 6 bytes, but got %d", len(data)) + } + return &StreamStart{ + Xid: binary.BigEndian.Uint32(data[1:]), + FirstSegment: data[5] == 1, + }, nil +} + // StreamStop signals the end of a streaming transaction chunk. // When streaming is enabled, large in-progress transactions are sent -// in chunks bracketed by STREAM START / STREAM STOP. Any buffered -// message must be flushed when this marker arrives. +// in chunks bracketed by STREAM START / STREAM STOP. type StreamStop struct{} // StreamAbort signals that a streamed transaction has been aborted. // Any buffered messages from this transaction must be discarded. -type StreamAbort struct{} +type StreamAbort struct { + Xid uint32 + SubXid uint32 +} + +func NewStreamAbort(data []byte) (*StreamAbort, error) { + // StreamAbort message format: + // Byte1('A') - message type (already at data[0]) + // Int32 - Xid (4 bytes) + // Int32 - SubXid (4 bytes) + // Total: 1 + 4 + 4 = 9 bytes minimum + if len(data) < 9 { + return nil, errors.Newf("stream abort message length must be at least 9 bytes, but got %d", len(data)) + } + return &StreamAbort{ + Xid: binary.BigEndian.Uint32(data[1:]), + SubXid: binary.BigEndian.Uint32(data[5:]), + }, nil +} // StreamCommit signals the final commit of a streamed transaction. // It carries the same LSN information as a regular Commit message, diff --git a/pq/message/format/stream_test.go b/pq/message/format/stream_test.go index 72ca476..7725a60 100644 --- a/pq/message/format/stream_test.go +++ b/pq/message/format/stream_test.go @@ -9,6 +9,197 @@ import ( "github.com/stretchr/testify/require" ) +func TestNewStreamStart(t *testing.T) { + t.Run("should decode stream start message successfully", func(t *testing.T) { + // Given + // StreamStart format: + // Byte1('S') + Int32(Xid) + Int8(FirstSegment) + data := []byte{ + 'S', // Message type 'S' (StreamStart) + 0, 0, 0, 42, // Xid: 42 + 1, // FirstSegment: true + } + + // When + ss, err := NewStreamStart(data) + + // Then + require.NoError(t, err) + assert.NotNil(t, ss) + assert.Equal(t, uint32(42), ss.Xid) + assert.True(t, ss.FirstSegment) + }) + + t.Run("should decode stream start with first_segment false", func(t *testing.T) { + // Given + data := []byte{ + 'S', // Message type 'S' + 0, 0, 0, 10, // Xid: 10 + 0, // FirstSegment: false + } + + // When + ss, err := NewStreamStart(data) + + // Then + require.NoError(t, err) + assert.NotNil(t, ss) + assert.Equal(t, uint32(10), ss.Xid) + assert.False(t, ss.FirstSegment) + }) + + t.Run("should return error when data is too short", func(t *testing.T) { + // Given + data := []byte{ + 'S', // Message type 'S' + 0, 0, 0, // Incomplete data + } + + // When + ss, err := NewStreamStart(data) + + // Then + require.Error(t, err) + assert.Nil(t, ss) + assert.Contains(t, err.Error(), "stream start message length must be at least 6 bytes") + }) + + t.Run("should return error for empty data", func(t *testing.T) { + // Given + data := []byte{} + + // When + ss, err := NewStreamStart(data) + + // Then + require.Error(t, err) + assert.Nil(t, ss) + }) + + t.Run("should decode with large xid", func(t *testing.T) { + // Given + data := []byte{ + 'S', // Message type 'S' + 0xFF, 0xFF, 0xFF, 0xFF, // Xid: max uint32 + 1, // FirstSegment: true + } + + // When + ss, err := NewStreamStart(data) + + // Then + require.NoError(t, err) + assert.NotNil(t, ss) + assert.Equal(t, uint32(0xFFFFFFFF), ss.Xid) + assert.True(t, ss.FirstSegment) + }) + + t.Run("should decode stream start message with minimum valid length", func(t *testing.T) { + // Given - exactly 6 bytes + data := []byte{ + 'S', // Message type 'S' + 0, 0, 0, 1, // Xid: 1 + 0, // FirstSegment: false + } + + // When + ss, err := NewStreamStart(data) + + // Then + require.NoError(t, err) + assert.NotNil(t, ss) + assert.Equal(t, uint32(1), ss.Xid) + assert.False(t, ss.FirstSegment) + }) +} + +func TestNewStreamAbort(t *testing.T) { + t.Run("should decode stream abort message successfully", func(t *testing.T) { + // Given + // StreamAbort format: + // Byte1('A') + Int32(Xid) + Int32(SubXid) + data := []byte{ + 'A', // Message type 'A' (StreamAbort) + 0, 0, 0, 42, // Xid: 42 + 0, 0, 0, 7, // SubXid: 7 + } + + // When + sa, err := NewStreamAbort(data) + + // Then + require.NoError(t, err) + assert.NotNil(t, sa) + assert.Equal(t, uint32(42), sa.Xid) + assert.Equal(t, uint32(7), sa.SubXid) + }) + + t.Run("should return error when data is too short", func(t *testing.T) { + // Given + data := []byte{ + 'A', // Message type 'A' + 0, 0, 0, 42, // Xid: 42 + 0, 0, // Incomplete SubXid + } + + // When + sa, err := NewStreamAbort(data) + + // Then + require.Error(t, err) + assert.Nil(t, sa) + assert.Contains(t, err.Error(), "stream abort message length must be at least 9 bytes") + }) + + t.Run("should return error for empty data", func(t *testing.T) { + // Given + data := []byte{} + + // When + sa, err := NewStreamAbort(data) + + // Then + require.Error(t, err) + assert.Nil(t, sa) + }) + + t.Run("should decode with large xid and subxid", func(t *testing.T) { + // Given + data := []byte{ + 'A', // Message type 'A' + 0xFF, 0xFF, 0xFF, 0xFF, // Xid: max uint32 + 0xFF, 0xFF, 0xFF, 0xFE, // SubXid: max uint32 - 1 + } + + // When + sa, err := NewStreamAbort(data) + + // Then + require.NoError(t, err) + assert.NotNil(t, sa) + assert.Equal(t, uint32(0xFFFFFFFF), sa.Xid) + assert.Equal(t, uint32(0xFFFFFFFE), sa.SubXid) + }) + + t.Run("should decode stream abort message with minimum valid length", func(t *testing.T) { + // Given - exactly 9 bytes + data := []byte{ + 'A', // Message type 'A' + 0, 0, 0, 1, // Xid: 1 + 0, 0, 0, 2, // SubXid: 2 + } + + // When + sa, err := NewStreamAbort(data) + + // Then + require.NoError(t, err) + assert.NotNil(t, sa) + assert.Equal(t, uint32(1), sa.Xid) + assert.Equal(t, uint32(2), sa.SubXid) + }) +} + func TestNewStreamCommit(t *testing.T) { t.Run("should decode stream commit message successfully", func(t *testing.T) { // Given diff --git a/pq/message/message.go b/pq/message/message.go index 434522a..a2fce82 100644 --- a/pq/message/message.go +++ b/pq/message/message.go @@ -47,12 +47,15 @@ func New(data []byte, serverTime time.Time, relation map[uint32]*format.Relation return format.NewUpdate(data, streamedTransaction, relation, serverTime) case DeleteByte: return format.NewDelete(data, streamedTransaction, relation, serverTime) + case StreamStartByte: + streamedTransaction = true + return format.NewStreamStart(data) case StreamStopByte: streamedTransaction = false return &format.StreamStop{}, nil case StreamAbortByte: streamedTransaction = false - return &format.StreamAbort{}, nil + return format.NewStreamAbort(data) case StreamCommitByte: streamedTransaction = false return format.NewStreamCommit(data) @@ -62,9 +65,6 @@ func New(data []byte, serverTime time.Time, relation map[uint32]*format.Relation relation[msg.OID] = msg } return msg, err - case StreamStartByte: - streamedTransaction = true - return nil, nil default: return nil, errors.Wrap(ErrorByteNotSupported, string(data[0])) } diff --git a/pq/replication/replication.go b/pq/replication/replication.go index ccde54d..ac230e1 100644 --- a/pq/replication/replication.go +++ b/pq/replication/replication.go @@ -20,12 +20,16 @@ func New(conn pq.Connection) *Replication { return &Replication{conn: conn} } -func (r *Replication) Start(publicationName, slotName string, startLSN pq.LSN) error { - pluginArguments := append([]string{ - "proto_version '2'", - "messages 'true'", - "streaming 'true'", - }, "publication_names '"+publicationName+"'") +func (r *Replication) Start(publicationName, slotName string, startLSN pq.LSN, protoVersion int) error { + pluginArguments := []string{ + fmt.Sprintf("proto_version '%d'", protoVersion), + } + + if protoVersion >= 2 { + pluginArguments = append(pluginArguments, "messages 'true'", "streaming 'true'") + } + + pluginArguments = append(pluginArguments, "publication_names '"+publicationName+"'") sql := fmt.Sprintf("START_REPLICATION SLOT %s LOGICAL %s (%s)", slotName, startLSN, strings.Join(pluginArguments, ",")) r.conn.Frontend().SendQuery(&pgproto3.Query{String: sql}) diff --git a/pq/replication/stream.go b/pq/replication/stream.go index 7d5a732..719ee17 100644 --- a/pq/replication/stream.go +++ b/pq/replication/stream.go @@ -137,7 +137,7 @@ func (s *stream) setup(ctx context.Context) error { replicationStartLsn = snapshotLSN } - if err := replication.Start(s.config.Publication.Name, s.config.Slot.Name, replicationStartLsn); err != nil { + if err := replication.Start(s.config.Publication.Name, s.config.Slot.Name, replicationStartLsn, s.config.Slot.ProtoVersion); err != nil { return err } @@ -166,7 +166,6 @@ type messageBuffer struct { } // flush emits the pending message (if any) with its original WAL position. -// Used at STREAM STOP boundaries where the final commit LSN is not yet known. func (b *messageBuffer) flush() { if b.pending != nil { b.outCh <- b.pending @@ -175,7 +174,7 @@ func (b *messageBuffer) flush() { } // flushWithLSN emits the pending message (if any), rewriting its WAL position -// to the given transaction-end LSN. Used at COMMIT and STREAM COMMIT. +// to the given transaction-end LSN. Used at COMMIT. func (b *messageBuffer) flushWithLSN(lsn pq.LSN) { if b.pending != nil { b.outCh <- &Message{ @@ -187,7 +186,7 @@ func (b *messageBuffer) flushWithLSN(lsn pq.LSN) { } // discard drops the pending message without emitting. -// Used at BEGIN (reset state) and STREAM ABORT (transaction rolled back). +// Used at BEGIN to reset state. func (b *messageBuffer) discard() { b.pending = nil } @@ -198,11 +197,75 @@ func (b *messageBuffer) buffer(msg *Message) { b.pending = msg } +// streamTxBuffer accumulates messages from streaming in-progress transactions. +// +// PostgreSQL streams large transactions in chunks (STREAM START / STREAM STOP) +// before the transaction is committed. Chunks from different transactions may +// be interleaved (e.g. TX-A chunk, TX-B chunk, TX-A chunk, …), so messages +// are stored per-XID in a map. +// +// Messages must NOT be delivered to the consumer until STREAM COMMIT arrives, +// because the transaction may still be rolled back (STREAM ABORT). This mirrors +// how PostgreSQL's own logical replication worker handles streaming: it writes +// to temporary storage and only applies on commit. +type streamTxBuffer struct { + txns map[uint32][]*Message + activeXid uint32 + streaming bool +} + +// startTx marks the beginning of a streaming chunk for the given XID. +func (s *streamTxBuffer) startTx(xid uint32) { + if s.txns == nil { + s.txns = make(map[uint32][]*Message) + } + s.activeXid = xid + s.streaming = true +} + +// append adds a message to the currently active streaming transaction. +func (s *streamTxBuffer) append(msg *Message) { + if msg != nil { + s.txns[s.activeXid] = append(s.txns[s.activeXid], msg) + } +} + +// stopTx marks the end of the current streaming chunk. +func (s *streamTxBuffer) stopTx() { + s.streaming = false +} + +// flushTx emits every accumulated message for the given XID through outCh. +// The last message's WAL position is rewritten to the transaction-end LSN. +func (s *streamTxBuffer) flushTx(xid uint32, outCh chan<- *Message, endLSN pq.LSN) { + s.streaming = false + msgs := s.txns[xid] + n := len(msgs) + for i, msg := range msgs { + if i == n-1 { + outCh <- &Message{ + message: msg.message, + walStart: int64(endLSN), + } + } else { + outCh <- msg + } + } + delete(s.txns, xid) +} + +// discardTx drops all accumulated messages for the given XID without emitting. +func (s *streamTxBuffer) discardTx(xid uint32) { + s.streaming = false + delete(s.txns, xid) +} + func (s *stream) sink(ctx context.Context) { logger.Info("postgres message sink started") buf := &messageBuffer{outCh: s.messageCH} - corrupted := s.sinkLoop(ctx, buf) + streamBuf := &streamTxBuffer{} + corrupted := s.sinkLoop(ctx, buf, streamBuf) s.sinkEnd <- struct{}{} if !s.closed.Load() { @@ -216,7 +279,7 @@ func (s *stream) sink(ctx context.Context) { // sinkLoop reads raw replication messages and dispatches them until the // connection is closed or a fatal error occurs. It returns true when the // connection is in a corrupted state and the caller should panic. -func (s *stream) sinkLoop(ctx context.Context, buf *messageBuffer) (corrupted bool) { +func (s *stream) sinkLoop(ctx context.Context, buf *messageBuffer, streamBuf *streamTxBuffer) (corrupted bool) { for { msgCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(300*time.Millisecond)) rawMsg, err := s.conn.ReceiveMessage(msgCtx) @@ -252,7 +315,7 @@ func (s *stream) sinkLoop(ctx context.Context, buf *messageBuffer) (corrupted bo return true } case message.XLogDataByteID: - s.handleXLogData(copyData.Data[1:], buf) + s.handleXLogData(copyData.Data[1:], buf, streamBuf) } } } @@ -304,7 +367,7 @@ func (s *stream) handleKeepalive(ctx context.Context, data []byte) error { // handleXLogData parses a WAL data message, decodes the logical replication // event, and dispatches it through the message buffer. -func (s *stream) handleXLogData(data []byte, buf *messageBuffer) { +func (s *stream) handleXLogData(data []byte, buf *messageBuffer, streamBuf *streamTxBuffer) { xld, err := ParseXLogData(data) if err != nil { logger.Error("parse xLog data", "error", err) @@ -327,17 +390,21 @@ func (s *stream) handleXLogData(data []byte, buf *messageBuffer) { return } - s.dispatchMessage(decodedMsg, xld, buf) + s.dispatchMessage(decodedMsg, xld, buf, streamBuf) } // dispatchMessage routes a decoded logical replication event to the correct // buffer action. // -// Transaction boundaries (BEGIN, COMMIT, STREAM COMMIT, STREAM STOP, STREAM ABORT) -// control the buffer lifecycle. DML events (INSERT, UPDATE, DELETE) are buffered -// with a one-message look-ahead so the last message in each transaction can have -// its WAL position rewritten to the transaction-end LSN. -func (s *stream) dispatchMessage(decodedMsg any, xld XLogData, buf *messageBuffer) { +// For regular (non-streaming) transactions the messageBuffer provides a +// one-message look-ahead so the last DML's WAL position can be rewritten to +// the transaction-end LSN at COMMIT. +// +// For streaming transactions (proto v2) messages are accumulated in the +// streamTxBuffer across STREAM START / STREAM STOP chunks. They are only +// emitted to the consumer on STREAM COMMIT and discarded on STREAM ABORT. +// This prevents uncommitted data from being delivered. +func (s *stream) dispatchMessage(decodedMsg any, xld XLogData, buf *messageBuffer, streamBuf *streamTxBuffer) { switch msg := decodedMsg.(type) { case *format.Begin: buf.discard() @@ -345,25 +412,34 @@ func (s *stream) dispatchMessage(decodedMsg any, xld XLogData, buf *messageBuffe case *format.Commit: buf.flushWithLSN(msg.TransactionEndLSN) + case *format.StreamStart: + // Beginning of a streaming chunk – DML events that follow belong + // to an in-progress transaction and must be buffered per-XID. + streamBuf.startTx(msg.Xid) + case *format.StreamStop: - // End of a streaming chunk – flush so messages are not lost when - // other transactions are interleaved between chunks. - buf.flush() + // End of a streaming chunk. Nothing is emitted to the consumer. + streamBuf.stopTx() case *format.StreamCommit: - // Final commit of a streamed transaction – rewrite LSN like Commit. - buf.flushWithLSN(msg.TransactionEndLSN) + // Final commit of a streamed transaction – emit all messages for this XID. + streamBuf.flushTx(msg.Xid, buf.outCh, msg.TransactionEndLSN) case *format.StreamAbort: - // Streamed transaction rolled back – discard buffered message. - buf.discard() + // Streamed transaction rolled back – discard messages for this XID. + streamBuf.discardTx(msg.Xid) default: // DML event (Insert, Update, Delete, Relation, …) - buf.buffer(&Message{ + m := &Message{ message: decodedMsg, walStart: int64(xld.WALStart), - }) + } + if streamBuf.streaming { + streamBuf.append(m) + } else { + buf.buffer(m) + } } } diff --git a/pq/slot/config.go b/pq/slot/config.go index ae8ce6b..aff4eef 100644 --- a/pq/slot/config.go +++ b/pq/slot/config.go @@ -9,7 +9,11 @@ import ( type Config struct { Name string `json:"name" yaml:"name"` SlotActivityCheckerInterval time.Duration `json:"slotActivityCheckerInterval" yaml:"slotActivityCheckerInterval"` - CreateIfNotExists bool `json:"createIfNotExists" yaml:"createIfNotExists"` + // ProtoVersion selects the pgoutput logical replication protocol version. + // 1 – compatible with PostgreSQL 10+; no streaming transaction support. + // 2 – requires PostgreSQL 14+; supports streaming large in-progress transactions (default). + ProtoVersion int `json:"protoVersion" yaml:"protoVersion"` + CreateIfNotExists bool `json:"createIfNotExists" yaml:"createIfNotExists"` } func (c Config) Validate() error { @@ -22,5 +26,9 @@ func (c Config) Validate() error { err = errors.Join(err, errors.New("slot activity checker interval cannot be lower than 1000 ms")) } + if c.ProtoVersion != 0 && c.ProtoVersion != 1 && c.ProtoVersion != 2 { + err = errors.Join(err, errors.New("slot protoVersion must be 1 or 2")) + } + return err }