Skip to content

Commit 88142d4

Browse files
author
Noam Preil
committed
proof of concept: unify refcounting logic
1 parent 611dbf5 commit 88142d4

File tree

1 file changed

+11
-29
lines changed

1 file changed

+11
-29
lines changed

arrow/ipc/message.go

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"io"
2323
"sync/atomic"
2424

25-
"github.com/apache/arrow-go/v18/arrow/internal/debug"
2625
"github.com/apache/arrow-go/v18/arrow/internal/flatbuf"
2726
"github.com/apache/arrow-go/v18/arrow/memory"
2827
)
@@ -66,10 +65,10 @@ func (m MessageType) String() string {
6665

6766
// Message is an IPC message, including metadata and body.
6867
type Message struct {
69-
refCount atomic.Int64
70-
msg *flatbuf.Message
71-
meta *memory.Buffer
72-
body *memory.Buffer
68+
memory.Refcount
69+
msg *flatbuf.Message
70+
meta *memory.Buffer
71+
body *memory.Buffer
7372
}
7473

7574
// NewMessage creates a new message from the metadata and body buffers.
@@ -85,7 +84,9 @@ func NewMessage(meta, body *memory.Buffer) *Message {
8584
meta: meta,
8685
body: body,
8786
}
88-
m.refCount.Add(1)
87+
m.Refcount.Buffers = []**memory.Buffer{&m.meta, &m.body}
88+
m.Additional = func() { m.msg = nil }
89+
m.Retain()
8990
return m
9091
}
9192

@@ -99,31 +100,12 @@ func newMessageFromFB(meta *flatbuf.Message, body *memory.Buffer) *Message {
99100
meta: memory.NewBufferBytes(meta.Table().Bytes),
100101
body: body,
101102
}
102-
m.refCount.Add(1)
103+
m.Refcount.Buffers = []**memory.Buffer{&m.meta, &m.body}
104+
m.Additional = func() { m.msg = nil }
105+
m.Retain()
103106
return m
104107
}
105108

106-
// Retain increases the reference count by 1.
107-
// Retain may be called simultaneously from multiple goroutines.
108-
func (msg *Message) Retain() {
109-
msg.refCount.Add(1)
110-
}
111-
112-
// Release decreases the reference count by 1.
113-
// Release may be called simultaneously from multiple goroutines.
114-
// When the reference count goes to zero, the memory is freed.
115-
func (msg *Message) Release() {
116-
debug.Assert(msg.refCount.Load() > 0, "too many releases")
117-
118-
if msg.refCount.Add(-1) == 0 {
119-
msg.meta.Release()
120-
msg.body.Release()
121-
msg.msg = nil
122-
msg.meta = nil
123-
msg.body = nil
124-
}
125-
}
126-
127109
func (msg *Message) Version() MetadataVersion {
128110
return MetadataVersion(msg.msg.Version())
129111
}
@@ -175,7 +157,7 @@ func (r *messageReader) Retain() {
175157
// When the reference count goes to zero, the memory is freed.
176158
// Release may be called simultaneously from multiple goroutines.
177159
func (r *messageReader) Release() {
178-
debug.Assert(r.refCount.Load() > 0, "too many releases")
160+
r.refCount.Load()
179161

180162
if r.refCount.Add(-1) == 0 {
181163
if r.msg != nil {

0 commit comments

Comments
 (0)