Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func (c *Config) SetDefault() {
c.Slot.SlotActivityCheckerInterval = 1000
}

if c.Slot.ProtoVersion == 0 {
c.Slot.ProtoVersion = 2
}

if c.Logger.Logger == nil {
c.Logger.Logger = logger.NewSlog(c.Logger.LogLevel)
}
Expand Down
6 changes: 1 addition & 5 deletions example/streaming-transactions/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func main() {
CreateIfNotExists: true,
Name: "cdc_slot_streaming",
SlotActivityCheckerInterval: 3000,
ProtoVersion: 2,
},
Metric: config.MetricConfig{
Port: 8081,
Expand Down Expand Up @@ -131,11 +132,6 @@ func main() {
}
}

if err = tx.Commit(ctx); err != nil {
slog.Error("commit hatası", "error", err)
os.Exit(1)
}

slog.Info("Transaction COMMIT edildi, mesajlar bekleniyor...")

// --- Wait for the Results --------------------------------------------------
Expand Down
149 changes: 76 additions & 73 deletions integration_test/basic_functionality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
cdc "github.com/Trendyol/go-pq-cdc"
"github.com/Trendyol/go-pq-cdc/config"
"github.com/Trendyol/go-pq-cdc/pq/message/format"
"github.com/Trendyol/go-pq-cdc/pq/replication"
"github.com/stretchr/testify/assert"
Expand All @@ -22,89 +23,91 @@ func TestBasicFunctionality(t *testing.T) {
cdcCfg := Config
cdcCfg.Slot.Name = "slot_test_basic_functionality"

postgresConn, err := newPostgresConn()
if !assert.NoError(t, err) {
t.FailNow()
}

if !assert.NoError(t, SetupTestDB(ctx, postgresConn, cdcCfg)) {
t.FailNow()
}

messageCh := make(chan any, 500)
handlerFunc := func(ctx *replication.ListenerContext) {
switch msg := ctx.Message.(type) {
case *format.Insert, *format.Delete, *format.Update:
messageCh <- msg
}
_ = ctx.Ack()
}

connector, err := cdc.NewConnector(ctx, cdcCfg, handlerFunc)
if !assert.NoError(t, err) {
t.FailNow()
}

t.Cleanup(func() {
connector.Close()
assert.NoError(t, RestoreDB(ctx))
assert.NoError(t, postgresConn.Close(ctx))
})

go connector.Start(ctx)

waitCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
if !assert.NoError(t, connector.WaitUntilReady(waitCtx)) {
t.FailNow()
}
cancel()

t.Run("Insert 10 book to table. Then check messages and metric", func(t *testing.T) {
books := CreateBooks(10)
for _, b := range books {
err = pgExec(ctx, postgresConn, fmt.Sprintf("INSERT INTO books(id, name) VALUES(%d, '%s')", b.ID, b.Name))
assert.NoError(t, err)
forEachProtoVersion(t, cdcCfg, func(t *testing.T, cdcCfg config.Config) {
postgresConn, err := newPostgresConn()
if !assert.NoError(t, err) {
t.FailNow()
}

for i := range 10 {
m := <-messageCh
assert.Equal(t, books[i].Map(), m.(*format.Insert).Decoded)
if !assert.NoError(t, SetupTestDB(ctx, postgresConn, cdcCfg)) {
t.FailNow()
}

metric, _ := fetchInsertOpMetric()
assert.True(t, metric == 10)
})

t.Run("Update 5 book on table. Then check messages and metric", func(t *testing.T) {
books := CreateBooks(5)
for i, b := range books {
b.ID = i + 1
books[i] = b
err = pgExec(ctx, postgresConn, fmt.Sprintf("UPDATE books SET name = '%s' WHERE id = %d", b.Name, b.ID))
assert.NoError(t, err)
messageCh := make(chan any, 500)
handlerFunc := func(ctx *replication.ListenerContext) {
switch msg := ctx.Message.(type) {
case *format.Insert, *format.Delete, *format.Update:
messageCh <- msg
}
_ = ctx.Ack()
}

for i := range 5 {
m := <-messageCh
assert.Equal(t, books[i].Map(), m.(*format.Update).NewDecoded)
connector, err := cdc.NewConnector(ctx, cdcCfg, handlerFunc)
if !assert.NoError(t, err) {
t.FailNow()
}

metric, _ := fetchUpdateOpMetric()
assert.True(t, metric == 5)
})
t.Cleanup(func() {
connector.Close()
assert.NoError(t, RestoreDB(ctx))
assert.NoError(t, postgresConn.Close(ctx))
})

t.Run("Delete 5 book from table. Then check messages and metric", func(t *testing.T) {
for i := range 5 {
err = pgExec(ctx, postgresConn, fmt.Sprintf("DELETE FROM books WHERE id = %d", i+1))
assert.NoError(t, err)
}
go connector.Start(ctx)

for i := range 5 {
m := <-messageCh
assert.Equal(t, int32(i+1), m.(*format.Delete).OldDecoded["id"])
waitCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
if !assert.NoError(t, connector.WaitUntilReady(waitCtx)) {
t.FailNow()
}

metric, _ := fetchDeleteOpMetric()
assert.True(t, metric == 5)
cancel()

t.Run("Insert 10 book to table. Then check messages and metric", func(t *testing.T) {
books := CreateBooks(10)
for _, b := range books {
err = pgExec(ctx, postgresConn, fmt.Sprintf("INSERT INTO books(id, name) VALUES(%d, '%s')", b.ID, b.Name))
assert.NoError(t, err)
}

for i := range 10 {
m := <-messageCh
assert.Equal(t, books[i].Map(), m.(*format.Insert).Decoded)
}

metric, _ := fetchInsertOpMetric()
assert.True(t, metric == 10)
})

t.Run("Update 5 book on table. Then check messages and metric", func(t *testing.T) {
books := CreateBooks(5)
for i, b := range books {
b.ID = i + 1
books[i] = b
err = pgExec(ctx, postgresConn, fmt.Sprintf("UPDATE books SET name = '%s' WHERE id = %d", b.Name, b.ID))
assert.NoError(t, err)
}

for i := range 5 {
m := <-messageCh
assert.Equal(t, books[i].Map(), m.(*format.Update).NewDecoded)
}

metric, _ := fetchUpdateOpMetric()
assert.True(t, metric == 5)
})

t.Run("Delete 5 book from table. Then check messages and metric", func(t *testing.T) {
for i := range 5 {
err = pgExec(ctx, postgresConn, fmt.Sprintf("DELETE FROM books WHERE id = %d", i+1))
assert.NoError(t, err)
}

for i := range 5 {
m := <-messageCh
assert.Equal(t, int32(i+1), m.(*format.Delete).OldDecoded["id"])
}

metric, _ := fetchDeleteOpMetric()
assert.True(t, metric == 5)
})
})
}
123 changes: 58 additions & 65 deletions integration_test/concurrent_tx_ordering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,82 +19,75 @@ import (
// B's changes first, followed by A's.
func TestConcurrentTxOrdering(t *testing.T) {
ctx := context.Background()
slot := "slot_test_concurrent_order"

// ---------- connector prep -------------------------------------------
cdcCfg := Config
cdcCfg.Slot.Name = slot
cdcCfg.Slot.Name = "slot_test_concurrent_order"

pgConn, err := newPostgresConn()
if !assert.NoError(t, err) {
t.FailNow()
}
if !assert.NoError(t, SetupTestDB(ctx, pgConn, cdcCfg)) {
t.FailNow()
}

msgCh := make(chan *format.Insert, 5)
handler := func(lCtx *replication.ListenerContext) {
if ins, ok := lCtx.Message.(*format.Insert); ok {
msgCh <- ins
forEachProtoVersion(t, cdcCfg, func(t *testing.T, cdcCfg config.Config) {
pgConn, err := newPostgresConn()
if !assert.NoError(t, err) {
t.FailNow()
}
if !assert.NoError(t, SetupTestDB(ctx, pgConn, cdcCfg)) {
t.FailNow()
}
_ = lCtx.Ack()
}

connector, _ := cdc.NewConnector(ctx, cdcCfg, handler)
go connector.Start(ctx)
waitCtx, cancel := context.WithTimeout(ctx, 4*time.Second)
assert.NoError(t, connector.WaitUntilReady(waitCtx))
cancel()

cfg := config.Config{Host: Config.Host, Port: Config.Port, Username: "postgres", Password: "postgres", Database: Config.Database}
pool, _ := pgxpool.New(ctx, cfg.DSNWithoutSSL())
msgCh := make(chan *format.Insert, 5)
handler := func(lCtx *replication.ListenerContext) {
if ins, ok := lCtx.Message.(*format.Insert); ok {
msgCh <- ins
}
_ = lCtx.Ack()
}

//---------------- start TxA (long) -------------------------------------
txA, _ := pool.Begin(ctx)
_, _ = txA.Exec(ctx, "INSERT INTO books(id,name) VALUES(600,'A')")
connector, _ := cdc.NewConnector(ctx, cdcCfg, handler)
go connector.Start(ctx)
waitCtx, cancel := context.WithTimeout(ctx, 4*time.Second)
assert.NoError(t, connector.WaitUntilReady(waitCtx))
cancel()

//---------------- TxB (short) commit first -----------------------------
txB, _ := pool.Begin(ctx)
_, _ = txB.Exec(ctx, "INSERT INTO books(id,name) VALUES(601,'B')")
_ = txB.Commit(ctx)
cfg := config.Config{Host: Config.Host, Port: Config.Port, Username: "postgres", Password: "postgres", Database: Config.Database}
pool, _ := pgxpool.New(ctx, cfg.DSNWithoutSSL())

// simulate some gap
time.Sleep(200 * time.Millisecond)
t.Cleanup(func() {
connector.Close()
_ = RestoreDB(ctx)
pool.Close()
})

//---------------- commit TxA ------------------------------------------
_ = txA.Commit(ctx)
txA, _ := pool.Begin(ctx)
_, _ = txA.Exec(ctx, "INSERT INTO books(id,name) VALUES(600,'A')")

//----------- collect two messages ------------------------------------
var first, second *format.Insert
select {
case first = <-msgCh:
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting first message")
}
select {
case second = <-msgCh:
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting second message")
}
txB, _ := pool.Begin(ctx)
_, _ = txB.Exec(ctx, "INSERT INTO books(id,name) VALUES(601,'B')")
_ = txB.Commit(ctx)

// Expect B(id 601) comes before A(id 600)
assert.Equal(t, int32(601), first.Decoded["id"])
assert.Equal(t, int32(600), second.Decoded["id"])
time.Sleep(200 * time.Millisecond)
_ = txA.Commit(ctx)

// -------- validate confirmed_flush_lsn & restart_lsn -----------------
// Give connector a moment to send ACK and Postgres to advance slot
time.Sleep(500 * time.Millisecond)
var restartLSN, confirmedLSN string
row := pool.QueryRow(ctx, "SELECT restart_lsn, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name=$1", slot)
err = row.Scan(&restartLSN, &confirmedLSN)
assert.NoError(t, err)
assert.NotEmpty(t, confirmedLSN)
// When no other transaction is in-progress, restart_lsn should equal confirmed
// restart_lsn is the oldest LSN that might still be needed; it is <= confirmed_flush_lsn
confirmed, _ := pq.ParseLSN(confirmedLSN)
restart, _ := pq.ParseLSN(restartLSN)
assert.LessOrEqual(t, restart, confirmed)
var first, second *format.Insert
select {
case first = <-msgCh:
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting first message")
}
select {
case second = <-msgCh:
case <-time.After(2 * time.Second):
t.Fatal("timeout waiting second message")
}

connector.Close()
assert.Equal(t, int32(601), first.Decoded["id"])
assert.Equal(t, int32(600), second.Decoded["id"])

time.Sleep(500 * time.Millisecond)
var restartLSN, confirmedLSN string
row := pool.QueryRow(ctx, "SELECT restart_lsn, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name=$1", cdcCfg.Slot.Name)
err = row.Scan(&restartLSN, &confirmedLSN)
assert.NoError(t, err)
assert.NotEmpty(t, confirmedLSN)
confirmed, _ := pq.ParseLSN(confirmedLSN)
restart, _ := pq.ParseLSN(restartLSN)
assert.LessOrEqual(t, restart, confirmed)
})
}
Loading
Loading