Skip to content

Commit a893db6

Browse files
authored
fix: Arrow Retain and Release fixes (#795)
This fixes a memory leak detected by the `file` and `s3` destination tests, and refactors the retain/release logic a little bit. Quoting the Go Arrow docs: > If you send an object over a channel, you must call Retain before sending it as the receiver is assumed to own the object and will later call Release when it no longer needs the object. I think we should call `Retain` as close as possible to where a record is sent over a channel.
1 parent 591502f commit a893db6

File tree

2 files changed

+10
-5
lines changed

2 files changed

+10
-5
lines changed

internal/servers/destination/v0/destinations.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ func (s *Server) Write2(msg pb.Destination_Write2Server) error {
141141
select {
142142
case resources <- convertedResource:
143143
case <-ctx.Done():
144+
convertedResource.Release()
144145
close(resources)
145146
if err := eg.Wait(); err != nil {
146147
return status.Errorf(codes.Internal, "Context done: %v and failed to wait for plugin: %v", ctx.Err(), err)

plugins/destination/managed_writer.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ func (p *Plugin) flush(ctx context.Context, metrics *Metrics, table *arrow.Schem
7070
p.logger.Info().Str("table", tableName).Int("len", batchSize).Dur("duration", time.Since(start)).Msg("batch written successfully")
7171
atomic.AddUint64(&metrics.Writes, uint64(batchSize))
7272
}
73+
for _, r := range resources {
74+
r.Release()
75+
}
7376
}
7477

7578
func (*Plugin) removeDuplicatesByPK(table *arrow.Schema, resources []arrow.Record) []arrow.Record {
@@ -81,18 +84,19 @@ func (*Plugin) removeDuplicatesByPK(table *arrow.Schema, resources []arrow.Recor
8184

8285
pks := make(map[string]struct{}, len(resources))
8386
res := make([]arrow.Record, 0, len(resources))
84-
var reported bool
8587
for _, r := range resources {
88+
if r.NumRows() > 1 {
89+
panic(fmt.Sprintf("record with more than 1 row: %d", r.NumRows()))
90+
}
8691
key := pk.String(r)
8792
_, ok := pks[key]
88-
switch {
89-
case !ok:
93+
if !ok {
9094
pks[key] = struct{}{}
9195
res = append(res, r)
9296
continue
93-
case reported:
94-
continue
9597
}
98+
// duplicate, release early
99+
r.Release()
96100
}
97101

98102
return res

0 commit comments

Comments
 (0)