From c36c746c52a736506752e6b84b7671247d8c1ef4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Thu, 1 May 2025 01:15:39 +0200 Subject: [PATCH 1/5] allocate memory on tests --- source/logrepl/cdc_test.go | 2 +- source/logrepl/combined_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/logrepl/cdc_test.go b/source/logrepl/cdc_test.go index d827447..f7f9b41 100644 --- a/source/logrepl/cdc_test.go +++ b/source/logrepl/cdc_test.go @@ -510,7 +510,7 @@ func TestCDCIterator_NextN(t *testing.T) { } // Will keep calling NextN until all records are received - var records []opencdc.Record + records := make([]opencdc.Record, 0, 2) for len(records) < 2 { recordsTmp, err := i.NextN(ctx, 5) is.NoErr(err) diff --git a/source/logrepl/combined_test.go b/source/logrepl/combined_test.go index 90980cb..3d2a3bd 100644 --- a/source/logrepl/combined_test.go +++ b/source/logrepl/combined_test.go @@ -289,7 +289,7 @@ func TestCombinedIterator_NextN(t *testing.T) { is.NoErr(err) // Request 2 records in CDC mode - var records []opencdc.Record + records := make([]opencdc.Record, 0, 2) var retries int maxRetries := 10 for retries < maxRetries { From a3e887b319727dc5c05f8ff20b1b15f701a7c156 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Thu, 1 May 2025 01:17:32 +0200 Subject: [PATCH 2/5] similar behavior as before --- source/logrepl/combined.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/logrepl/combined.go b/source/logrepl/combined.go index a85be94..7ed853d 100644 --- a/source/logrepl/combined.go +++ b/source/logrepl/combined.go @@ -131,7 +131,7 @@ func (c *CombinedIterator) NextN(ctx context.Context, n int) ([]opencdc.Record, } sdk.Logger(ctx).Debug().Msg("Snapshot completed, switching to CDC mode") - return c.activeIterator.NextN(ctx, n) + return c.NextN(ctx, n) } return records, nil } From 4abb909c15218c9f0a99a3815954d41bdac7d6bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Thu, 1 May 2025 01:18:01 +0200 Subject: [PATCH 3/5] fix performance --- source/logrepl/cdc.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/logrepl/cdc.go b/source/logrepl/cdc.go index 1b95a70..d8c1372 100644 --- a/source/logrepl/cdc.go +++ b/source/logrepl/cdc.go @@ -124,7 +124,11 @@ func (i *CDCIterator) NextN(ctx context.Context, n int) ([]opencdc.Record, error return nil, fmt.Errorf("n must be greater than 0, got %d", n) } - recs := make([]opencdc.Record, 0, n) + // 40K msg/s + //recs := make([]opencdc.Record, 0, n) + + // 150K msg/s + var recs []opencdc.Record // Block until at least one record is received or context is canceled select { From d6cde4fc35a0de5bc8b9d4f8ff73a67ead92844a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Thu, 1 May 2025 01:20:37 +0200 Subject: [PATCH 4/5] fix lint --- source/logrepl/cdc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/logrepl/cdc.go b/source/logrepl/cdc.go index d8c1372..c27fe60 100644 --- a/source/logrepl/cdc.go +++ b/source/logrepl/cdc.go @@ -125,7 +125,7 @@ func (i *CDCIterator) NextN(ctx context.Context, n int) ([]opencdc.Record, error } // 40K msg/s - //recs := make([]opencdc.Record, 0, n) + // recs := make([]opencdc.Record, 0, n) // 150K msg/s var recs []opencdc.Record From 17631a7190997834bcb795a860507a45c4ce36a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Mon, 5 May 2025 19:14:17 +0200 Subject: [PATCH 5/5] remove comments --- source/logrepl/cdc.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/source/logrepl/cdc.go b/source/logrepl/cdc.go index c27fe60..1c99f20 100644 --- a/source/logrepl/cdc.go +++ b/source/logrepl/cdc.go @@ -124,10 +124,6 @@ func (i *CDCIterator) NextN(ctx context.Context, n int) ([]opencdc.Record, error return nil, fmt.Errorf("n must be greater than 0, got %d", n) } - // 40K msg/s - // recs := make([]opencdc.Record, 0, n) - - // 150K msg/s var recs []opencdc.Record // Block until at least one record is received or context is canceled