Skip to content

Commit b54e5e1

Browse files
authored
fix: Use Go memory allocator for arrow (#810)
Following this short discussion - apache/arrow#35232 It seems we don't really need to use Retain/Release outside the arrow library. We can always bring this back in the future if we would like to experiment if this brings better memory performance then the default go allocator.
1 parent 2dfeb02 commit b54e5e1

File tree

8 files changed

+29
-100
lines changed

8 files changed

+29
-100
lines changed

internal/memdb/memdb.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,8 @@ func (c *client) overwrite(table *arrow.Schema, data arrow.Record) {
8686
}
8787
}
8888
if found {
89-
tmp := c.memoryDB[tableName][i]
9089
c.memoryDB[tableName] = append(c.memoryDB[tableName][:i], c.memoryDB[tableName][i+1:]...)
9190
c.memoryDB[tableName] = append(c.memoryDB[tableName], data)
92-
tmp.Release()
9391
return
9492
}
9593
}
@@ -110,11 +108,6 @@ func (c *client) Migrate(_ context.Context, tables schema.Schemas) error {
110108
if changes == nil {
111109
continue
112110
}
113-
for _, t := range c.memoryDB {
114-
for _, row := range t {
115-
row.Release()
116-
}
117-
}
118111
c.memoryDB[tableName] = make([]arrow.Record, 0)
119112
c.tables[tableName] = table
120113
}
@@ -204,11 +197,6 @@ func (*client) Metrics() destination.Metrics {
204197
}
205198

206199
func (c *client) Close(context.Context) error {
207-
for _, table := range c.memoryDB {
208-
for _, row := range table {
209-
row.Release()
210-
}
211-
}
212200
c.memoryDB = nil
213201
return nil
214202
}
@@ -230,8 +218,6 @@ func (c *client) deleteStaleTable(_ context.Context, table *arrow.Schema, source
230218
rowSyncTime := row.Column(syncColIndex).(*array.Timestamp).Value(0).ToTime(arrow.Microsecond).UTC()
231219
if !rowSyncTime.Before(syncTime) {
232220
filteredTable = append(filteredTable, c.memoryDB[tableName][i])
233-
} else {
234-
c.memoryDB[tableName][i].Release()
235221
}
236222
}
237223
}

internal/memdb/memdb_test.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"time"
77

88
"github.com/apache/arrow/go/v12/arrow"
9-
"github.com/apache/arrow/go/v12/arrow/memory"
109
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
1110
"github.com/cloudquery/plugin-sdk/v2/specs"
1211
"github.com/cloudquery/plugin-sdk/v2/testdata"
@@ -122,17 +121,14 @@ func TestOnWriteError(t *testing.T) {
122121
sourceSpec := specs.Source{
123122
Name: sourceName,
124123
}
125-
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
126-
defer mem.AssertSize(t, 0)
127124
ch := make(chan arrow.Record, 1)
128125
opts := testdata.GenTestDataOptions{
129126
SourceName: "test",
130127
SyncTime: time.Now(),
131128
MaxRows: 1,
132129
StableUUID: uuid.Nil,
133130
}
134-
record := testdata.GenTestData(mem, table.ToArrowSchema(), opts)[0]
135-
defer record.Release()
131+
record := testdata.GenTestData(table.ToArrowSchema(), opts)[0]
136132
ch <- record
137133
close(ch)
138134
err := p.Write(ctx, sourceSpec, tables, syncTime, ch)
@@ -160,8 +156,6 @@ func TestOnWriteCtxCancelled(t *testing.T) {
160156
sourceSpec := specs.Source{
161157
Name: sourceName,
162158
}
163-
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
164-
defer mem.AssertSize(t, 0)
165159
ch := make(chan arrow.Record, 1)
166160
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
167161
opts := testdata.GenTestDataOptions{
@@ -170,8 +164,7 @@ func TestOnWriteCtxCancelled(t *testing.T) {
170164
MaxRows: 1,
171165
StableUUID: uuid.Nil,
172166
}
173-
record := testdata.GenTestData(mem, table.ToArrowSchema(), opts)[0]
174-
defer record.Release()
167+
record := testdata.GenTestData(table.ToArrowSchema(), opts)[0]
175168
ch <- record
176169
defer cancel()
177170
err := p.Write(ctx, sourceSpec, tables, syncTime, ch)

plugins/destination/plugin_testing.go

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111

1212
"github.com/apache/arrow/go/v12/arrow"
1313
"github.com/apache/arrow/go/v12/arrow/array"
14-
"github.com/apache/arrow/go/v12/arrow/memory"
1514
"github.com/cloudquery/plugin-sdk/v2/schema"
1615
"github.com/cloudquery/plugin-sdk/v2/specs"
1716
"github.com/cloudquery/plugin-sdk/v2/types"
@@ -116,11 +115,9 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
116115
if suite.tests.SkipOverwrite {
117116
t.Skip("skipping " + t.Name())
118117
}
119-
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
120-
defer mem.AssertSize(t, 0)
121118
destSpec.Name = "test_write_overwrite"
122119
p := newPlugin()
123-
if err := suite.destinationPluginTestWriteOverwrite(ctx, mem, p, logger, destSpec); err != nil {
120+
if err := suite.destinationPluginTestWriteOverwrite(ctx, p, logger, destSpec); err != nil {
124121
t.Fatal(err)
125122
}
126123
if err := p.Close(ctx); err != nil {
@@ -133,11 +130,9 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
133130
if suite.tests.SkipOverwrite || suite.tests.SkipDeleteStale {
134131
t.Skip("skipping " + t.Name())
135132
}
136-
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
137-
defer mem.AssertSize(t, 0)
138133
destSpec.Name = "test_write_overwrite_delete_stale"
139134
p := newPlugin()
140-
if err := suite.destinationPluginTestWriteOverwriteDeleteStale(ctx, mem, p, logger, destSpec); err != nil {
135+
if err := suite.destinationPluginTestWriteOverwriteDeleteStale(ctx, p, logger, destSpec); err != nil {
141136
t.Fatal(err)
142137
}
143138
if err := p.Close(ctx); err != nil {
@@ -150,36 +145,30 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
150145
if suite.tests.SkipMigrateOverwrite {
151146
t.Skip("skipping " + t.Name())
152147
}
153-
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
154-
defer mem.AssertSize(t, 0)
155148
destSpec.WriteMode = specs.WriteModeOverwrite
156149
destSpec.Name = "test_migrate_overwrite"
157-
suite.destinationPluginTestMigrate(ctx, mem, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite)
150+
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite)
158151
})
159152

160153
t.Run("TestMigrateOverwriteForce", func(t *testing.T) {
161154
t.Helper()
162155
if suite.tests.SkipMigrateOverwriteForce {
163156
t.Skip("skipping " + t.Name())
164157
}
165-
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
166-
defer mem.AssertSize(t, 0)
167158
destSpec.WriteMode = specs.WriteModeOverwrite
168159
destSpec.MigrateMode = specs.MigrateModeForced
169160
destSpec.Name = "test_migrate_overwrite_force"
170-
suite.destinationPluginTestMigrate(ctx, mem, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite)
161+
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite)
171162
})
172163

173164
t.Run("TestWriteAppend", func(t *testing.T) {
174165
t.Helper()
175166
if suite.tests.SkipAppend {
176167
t.Skip("skipping " + t.Name())
177168
}
178-
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
179-
defer mem.AssertSize(t, 0)
180169
destSpec.Name = "test_write_append"
181170
p := newPlugin()
182-
if err := suite.destinationPluginTestWriteAppend(ctx, mem, p, logger, destSpec); err != nil {
171+
if err := suite.destinationPluginTestWriteAppend(ctx, p, logger, destSpec); err != nil {
183172
t.Fatal(err)
184173
}
185174
if err := p.Close(ctx); err != nil {
@@ -189,27 +178,23 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
189178

190179
t.Run("TestMigrateAppend", func(t *testing.T) {
191180
t.Helper()
192-
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
193-
defer mem.AssertSize(t, 0)
194181
if suite.tests.SkipMigrateAppend {
195182
t.Skip("skipping " + t.Name())
196183
}
197184
destSpec.WriteMode = specs.WriteModeAppend
198185
destSpec.Name = "test_migrate_append"
199-
suite.destinationPluginTestMigrate(ctx, mem, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend)
186+
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend)
200187
})
201188

202189
t.Run("TestMigrateAppendForce", func(t *testing.T) {
203190
t.Helper()
204191
if suite.tests.SkipMigrateAppendForce {
205192
t.Skip("skipping " + t.Name())
206193
}
207-
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
208-
defer mem.AssertSize(t, 0)
209194
destSpec.WriteMode = specs.WriteModeAppend
210195
destSpec.MigrateMode = specs.MigrateModeForced
211196
destSpec.Name = "test_migrate_append_force"
212-
suite.destinationPluginTestMigrate(ctx, mem, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend)
197+
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend)
213198
})
214199
}
215200

plugins/destination/plugin_testing_migrate.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99

1010
"github.com/apache/arrow/go/v12/arrow"
1111
"github.com/apache/arrow/go/v12/arrow/array"
12-
"github.com/apache/arrow/go/v12/arrow/memory"
1312
"github.com/cloudquery/plugin-sdk/v2/schema"
1413
"github.com/cloudquery/plugin-sdk/v2/specs"
1514
"github.com/cloudquery/plugin-sdk/v2/testdata"
@@ -22,7 +21,7 @@ func tableUUIDSuffix() string {
2221
return strings.ReplaceAll(uuid.NewString(), "-", "_")
2322
}
2423

25-
func testMigration(ctx context.Context, mem memory.Allocator, _ *testing.T, p *Plugin, logger zerolog.Logger, spec specs.Destination, target *arrow.Schema, source *arrow.Schema, mode specs.MigrateMode) error {
24+
func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog.Logger, spec specs.Destination, target *arrow.Schema, source *arrow.Schema, mode specs.MigrateMode) error {
2625
if err := p.Init(ctx, logger, spec); err != nil {
2726
return fmt.Errorf("failed to init plugin: %w", err)
2827
}
@@ -41,19 +40,15 @@ func testMigration(ctx context.Context, mem memory.Allocator, _ *testing.T, p *P
4140
SyncTime: syncTime,
4241
MaxRows: 1,
4342
}
44-
resource1 := testdata.GenTestData(mem, source, opts)[0]
45-
resource1.Retain()
46-
defer resource1.Release()
43+
resource1 := testdata.GenTestData(source, opts)[0]
4744
if err := p.writeOne(ctx, sourceSpec, syncTime, resource1); err != nil {
4845
return fmt.Errorf("failed to write one: %w", err)
4946
}
5047

5148
if err := p.Migrate(ctx, []*arrow.Schema{target}); err != nil {
5249
return fmt.Errorf("failed to migrate existing table: %w", err)
5350
}
54-
resource2 := testdata.GenTestData(mem, target, opts)[0]
55-
resource2.Retain()
56-
defer resource2.Release()
51+
resource2 := testdata.GenTestData(target, opts)[0]
5752
if err := p.writeOne(ctx, sourceSpec, syncTime, resource2); err != nil {
5853
return fmt.Errorf("failed to write one after migration: %w", err)
5954
}
@@ -85,7 +80,6 @@ func testMigration(ctx context.Context, mem memory.Allocator, _ *testing.T, p *P
8580

8681
func (*PluginTestSuite) destinationPluginTestMigrate(
8782
ctx context.Context,
88-
mem memory.Allocator,
8983
t *testing.T,
9084
newPlugin NewPluginFunc,
9185
logger zerolog.Logger,
@@ -117,7 +111,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
117111
}, &md)
118112

119113
p := newPlugin()
120-
if err := testMigration(ctx, mem, t, p, logger, spec, target, source, strategy.AddColumn); err != nil {
114+
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.AddColumn); err != nil {
121115
t.Fatalf("failed to migrate %s: %v", tableName, err)
122116
}
123117
if err := p.Close(ctx); err != nil {
@@ -147,7 +141,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
147141
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean},
148142
}, &md)
149143
p := newPlugin()
150-
if err := testMigration(ctx, mem, t, p, logger, spec, target, source, strategy.AddColumnNotNull); err != nil {
144+
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.AddColumnNotNull); err != nil {
151145
t.Fatalf("failed to migrate add_column_not_null: %v", err)
152146
}
153147
if err := p.Close(ctx); err != nil {
@@ -177,7 +171,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
177171
}, &md)
178172

179173
p := newPlugin()
180-
if err := testMigration(ctx, mem, t, p, logger, spec, target, source, strategy.RemoveColumn); err != nil {
174+
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.RemoveColumn); err != nil {
181175
t.Fatalf("failed to migrate remove_column: %v", err)
182176
}
183177
if err := p.Close(ctx); err != nil {
@@ -207,7 +201,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
207201
}, &md)
208202

209203
p := newPlugin()
210-
if err := testMigration(ctx, mem, t, p, logger, spec, target, source, strategy.RemoveColumnNotNull); err != nil {
204+
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.RemoveColumnNotNull); err != nil {
211205
t.Fatalf("failed to migrate remove_column_not_null: %v", err)
212206
}
213207
if err := p.Close(ctx); err != nil {
@@ -238,7 +232,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
238232
}, &md)
239233

240234
p := newPlugin()
241-
if err := testMigration(ctx, mem, t, p, logger, spec, target, source, strategy.ChangeColumn); err != nil {
235+
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.ChangeColumn); err != nil {
242236
t.Fatalf("failed to migrate change_column: %v", err)
243237
}
244238
if err := p.Close(ctx); err != nil {

plugins/destination/plugin_testing_overwrite.go

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77

88
"github.com/apache/arrow/go/v12/arrow"
99
"github.com/apache/arrow/go/v12/arrow/array"
10-
"github.com/apache/arrow/go/v12/arrow/memory"
1110
"github.com/cloudquery/plugin-sdk/v2/schema"
1211
"github.com/cloudquery/plugin-sdk/v2/specs"
1312
"github.com/cloudquery/plugin-sdk/v2/testdata"
@@ -16,7 +15,7 @@ import (
1615
"github.com/rs/zerolog"
1716
)
1817

19-
func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, mem memory.Allocator, p *Plugin, logger zerolog.Logger, spec specs.Destination) error {
18+
func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error {
2019
spec.WriteMode = specs.WriteModeOverwrite
2120
if err := p.Init(ctx, logger, spec); err != nil {
2221
return fmt.Errorf("failed to init plugin: %w", err)
@@ -41,15 +40,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
4140
SyncTime: syncTime,
4241
MaxRows: 2,
4342
}
44-
resources := testdata.GenTestData(mem, schema.CQSchemaToArrow(table), opts)
45-
for _, r := range resources {
46-
r.Retain()
47-
}
48-
defer func() {
49-
for _, r := range resources {
50-
r.Release()
51-
}
52-
}()
43+
resources := testdata.GenTestData(schema.CQSchemaToArrow(table), opts)
5344
if err := p.writeAll(ctx, sourceSpec, syncTime, resources); err != nil {
5445
return fmt.Errorf("failed to write all: %w", err)
5546
}
@@ -85,9 +76,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
8576
MaxRows: 1,
8677
StableUUID: *u,
8778
}
88-
updatedResource := testdata.GenTestData(mem, schema.CQSchemaToArrow(table), opts)[0]
89-
updatedResource.Retain()
90-
defer updatedResource.Release()
79+
updatedResource := testdata.GenTestData(schema.CQSchemaToArrow(table), opts)[0]
9180
// write second time
9281
if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResource); err != nil {
9382
return fmt.Errorf("failed to write one second time: %w", err)

plugins/destination/plugin_testing_overwrite_delete_stale.go

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,14 @@ import (
77

88
"github.com/apache/arrow/go/v12/arrow"
99
"github.com/apache/arrow/go/v12/arrow/array"
10-
"github.com/apache/arrow/go/v12/arrow/memory"
1110
"github.com/cloudquery/plugin-sdk/v2/specs"
1211
"github.com/cloudquery/plugin-sdk/v2/testdata"
1312
"github.com/cloudquery/plugin-sdk/v2/types"
1413
"github.com/google/uuid"
1514
"github.com/rs/zerolog"
1615
)
1716

18-
func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx context.Context, mem memory.Allocator, p *Plugin, logger zerolog.Logger, spec specs.Destination) error {
17+
func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error {
1918
spec.WriteMode = specs.WriteModeOverwriteDeleteStale
2019
if err := p.Init(ctx, logger, spec); err != nil {
2120
return fmt.Errorf("failed to init plugin: %w", err)
@@ -44,18 +43,10 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
4443
SyncTime: syncTime,
4544
MaxRows: 2,
4645
}
47-
resources := testdata.GenTestData(mem, table.ToArrowSchema(), opts)
48-
incResources := testdata.GenTestData(mem, incTable.ToArrowSchema(), opts)
46+
resources := testdata.GenTestData(table.ToArrowSchema(), opts)
47+
incResources := testdata.GenTestData(incTable.ToArrowSchema(), opts)
4948
allResources := resources
5049
allResources = append(allResources, incResources...)
51-
for _, r := range allResources {
52-
r.Retain()
53-
}
54-
defer func() {
55-
for _, r := range allResources {
56-
r.Release()
57-
}
58-
}()
5950
if err := p.writeAll(ctx, sourceSpec, syncTime, allResources); err != nil {
6051
return fmt.Errorf("failed to write all: %w", err)
6152
}
@@ -98,9 +89,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
9889
StableUUID: *u,
9990
MaxRows: 1,
10091
}
101-
updatedResources := testdata.GenTestData(mem, table.ToArrowSchema(), opts)[0]
102-
updatedResources.Retain()
103-
defer updatedResources.Release()
92+
updatedResources := testdata.GenTestData(table.ToArrowSchema(), opts)[0]
10493

10594
if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResources); err != nil {
10695
return fmt.Errorf("failed to write one second time: %w", err)

0 commit comments

Comments
 (0)