Skip to content

Commit c17b64d

Browse files
authored
fix: Destination testing memory leak (#788)
<!-- Explain what problem this PR addresses --> ---
1 parent 8d35e37 commit c17b64d

File tree

6 files changed

+36
-2
lines changed

6 files changed

+36
-2
lines changed

internal/memdb/memdb.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818

1919
// client is mostly used for testing the destination plugin.
2020
type client struct {
21-
schema.DefaultTransformer
2221
spec specs.Destination
2322
memoryDB map[string][]arrow.Record
2423
tables map[string]*arrow.Schema
@@ -87,8 +86,10 @@ func (c *client) overwrite(table *arrow.Schema, data arrow.Record) {
8786
}
8887
}
8988
if found {
89+
tmp := c.memoryDB[tableName][i]
9090
c.memoryDB[tableName] = append(c.memoryDB[tableName][:i], c.memoryDB[tableName][i+1:]...)
9191
c.memoryDB[tableName] = append(c.memoryDB[tableName], data)
92+
tmp.Release()
9293
return
9394
}
9495
}
@@ -109,6 +110,11 @@ func (c *client) Migrate(_ context.Context, tables schema.Schemas) error {
109110
if changes == nil {
110111
continue
111112
}
113+
for _, t := range c.memoryDB {
114+
for _, row := range t {
115+
row.Release()
116+
}
117+
}
112118
c.memoryDB[tableName] = make([]arrow.Record, 0)
113119
c.tables[tableName] = table
114120
}
@@ -198,6 +204,11 @@ func (*client) Metrics() destination.Metrics {
198204
}
199205

200206
func (c *client) Close(context.Context) error {
207+
for _, table := range c.memoryDB {
208+
for _, row := range table {
209+
row.Release()
210+
}
211+
}
201212
c.memoryDB = nil
202213
return nil
203214
}
@@ -219,6 +230,8 @@ func (c *client) deleteStaleTable(_ context.Context, table *arrow.Schema, source
219230
rowSyncTime := row.Column(syncColIndex).(*array.Timestamp).Value(0).ToTime(arrow.Microsecond).UTC()
220231
if !rowSyncTime.Before(syncTime) {
221232
filteredTable = append(filteredTable, c.memoryDB[tableName][i])
233+
} else {
234+
c.memoryDB[tableName][i].Release()
222235
}
223236
}
224237
}

plugins/destination/plugin_testing.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/apache/arrow/go/v12/arrow/memory"
1515
"github.com/cloudquery/plugin-sdk/v2/schema"
1616
"github.com/cloudquery/plugin-sdk/v2/specs"
17+
"github.com/cloudquery/plugin-sdk/v2/types"
1718
"github.com/rs/zerolog"
1819
)
1920

@@ -214,8 +215,16 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
214215

215216
func sortRecordsBySyncTime(table *schema.Table, records []arrow.Record) {
216217
syncTimeIndex := table.Columns.Index(schema.CqSyncTimeColumn.Name)
218+
cqIDIndex := table.Columns.Index(schema.CqIDColumn.Name)
217219
sort.Slice(records, func(i, j int) bool {
218220
// sort by sync time, then UUID
219-
return records[i].Column(syncTimeIndex).(*array.Timestamp).Value(0).ToTime(arrow.Millisecond).Before(records[j].Column(syncTimeIndex).(*array.Timestamp).Value(0).ToTime(arrow.Millisecond))
221+
first := records[i].Column(syncTimeIndex).(*array.Timestamp).Value(0).ToTime(arrow.Millisecond)
222+
second := records[j].Column(syncTimeIndex).(*array.Timestamp).Value(0).ToTime(arrow.Millisecond)
223+
if first.Equal(second) {
224+
firstUUID := records[i].Column(cqIDIndex).(*types.UUIDArray).Value(0).String()
225+
secondUUID := records[j].Column(cqIDIndex).(*types.UUIDArray).Value(0).String()
226+
return strings.Compare(firstUUID, secondUUID) < 0
227+
}
228+
return first.Before(second)
220229
})
221230
}

plugins/destination/plugin_testing_migrate.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func testMigration(ctx context.Context, mem memory.Allocator, _ *testing.T, p *P
4242
MaxRows: 1,
4343
}
4444
resource1 := testdata.GenTestData(mem, source, opts)[0]
45+
resource1.Retain()
4546
defer resource1.Release()
4647
if err := p.writeOne(ctx, sourceSpec, syncTime, resource1); err != nil {
4748
return fmt.Errorf("failed to write one: %w", err)
@@ -51,6 +52,7 @@ func testMigration(ctx context.Context, mem memory.Allocator, _ *testing.T, p *P
5152
return fmt.Errorf("failed to migrate existing table: %w", err)
5253
}
5354
resource2 := testdata.GenTestData(mem, target, opts)[0]
55+
resource2.Retain()
5456
defer resource2.Release()
5557
if err := p.writeOne(ctx, sourceSpec, syncTime, resource2); err != nil {
5658
return fmt.Errorf("failed to write one after migration: %w", err)

plugins/destination/plugin_testing_overwrite.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
4242
MaxRows: 2,
4343
}
4444
resources := testdata.GenTestData(mem, schema.CQSchemaToArrow(table), opts)
45+
for _, r := range resources {
46+
r.Retain()
47+
}
4548
defer func() {
4649
for _, r := range resources {
4750
r.Release()
@@ -83,6 +86,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
8386
StableUUID: *u,
8487
}
8588
updatedResource := testdata.GenTestData(mem, schema.CQSchemaToArrow(table), opts)[0]
89+
updatedResource.Retain()
8690
defer updatedResource.Release()
8791
// write second time
8892
if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResource); err != nil {

plugins/destination/plugin_testing_overwrite_delete_stale.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
4848
incResources := testdata.GenTestData(mem, incTable.ToArrowSchema(), opts)
4949
allResources := resources
5050
allResources = append(allResources, incResources...)
51+
for _, r := range allResources {
52+
r.Retain()
53+
}
5154
defer func() {
5255
for _, r := range allResources {
5356
r.Release()
@@ -96,6 +99,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
9699
MaxRows: 1,
97100
}
98101
updatedResources := testdata.GenTestData(mem, table.ToArrowSchema(), opts)[0]
102+
updatedResources.Retain()
99103
defer updatedResources.Release()
100104

101105
if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResources); err != nil {

plugins/destination/plugin_testing_write_append.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context,
4040
MaxRows: 1,
4141
}
4242
record1 := testdata.GenTestData(mem, table.ToArrowSchema(), opts)[0]
43+
record1.Retain()
4344
defer record1.Release()
4445
if err := p.writeOne(ctx, specSource, syncTime, record1); err != nil {
4546
return fmt.Errorf("failed to write one second time: %w", err)
@@ -48,6 +49,7 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context,
4849
secondSyncTime := syncTime.Add(10 * time.Second).UTC()
4950
opts.SyncTime = secondSyncTime
5051
record2 := testdata.GenTestData(mem, table.ToArrowSchema(), opts)[0]
52+
record2.Retain()
5153
defer record2.Release()
5254

5355
if !s.tests.SkipSecondAppend {

0 commit comments

Comments
 (0)