Skip to content

Commit 75a3cf6

Browse files
author
Noam Preil
committed
improved API
1 parent 9654a4f commit 75a3cf6

File tree

3 files changed

+20
-32
lines changed

3 files changed

+20
-32
lines changed

arrow/ipc/message.go

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@ import (
2020
"encoding/binary"
2121
"fmt"
2222
"io"
23-
"sync/atomic"
2423
"unsafe"
2524

26-
"github.com/apache/arrow-go/v18/arrow/internal/debug"
2725
"github.com/apache/arrow-go/v18/arrow/internal/flatbuf"
2826
"github.com/apache/arrow-go/v18/arrow/memory"
2927
)
@@ -128,9 +126,9 @@ type MessageReader interface {
128126

129127
// MessageReader reads messages from an io.Reader.
130128
type messageReader struct {
129+
memory.Refcount
131130
r io.Reader
132131

133-
refCount atomic.Int64
134132
msg *Message
135133

136134
mem memory.Allocator
@@ -145,30 +143,11 @@ func NewMessageReader(r io.Reader, opts ...Option) MessageReader {
145143
}
146144

147145
mr := &messageReader{r: r, mem: cfg.alloc}
148-
mr.refCount.Add(1)
146+
mr.Retain()
147+
mr.ReferenceDependency(unsafe.Pointer(&mr.msg))
149148
return mr
150149
}
151150

152-
// Retain increases the reference count by 1.
153-
// Retain may be called simultaneously from multiple goroutines.
154-
func (r *messageReader) Retain() {
155-
r.refCount.Add(1)
156-
}
157-
158-
// Release decreases the reference count by 1.
159-
// When the reference count goes to zero, the memory is freed.
160-
// Release may be called simultaneously from multiple goroutines.
161-
func (r *messageReader) Release() {
162-
debug.Assert(r.refCount.Load() > 0, "too many releases")
163-
164-
if r.refCount.Add(-1) == 0 {
165-
if r.msg != nil {
166-
r.msg.Release()
167-
r.msg = nil
168-
}
169-
}
170-
}
171-
172151
// Message returns the current message that has been extracted from the
173152
// underlying stream.
174153
// It is valid until the next call to Message.

arrow/ipc/reader.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"io"
2323
"sync/atomic"
24+
"unsafe"
2425

2526
"github.com/apache/arrow-go/v18/arrow"
2627
"github.com/apache/arrow-go/v18/arrow/array"
@@ -97,7 +98,8 @@ func NewReader(r io.Reader, opts ...Option) (rr *Reader, err error) {
9798
}()
9899
cfg := newConfig(opts...)
99100
mr := &messageReader{r: r, mem: cfg.alloc}
100-
mr.refCount.Add(1)
101+
mr.Retain()
102+
mr.ReferenceDependency(unsafe.Pointer(&mr.msg))
101103
rr = &Reader{
102104
r: mr,
103105
refCount: atomic.Int64{},

arrow/memory/refcount.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111

1212
type Refcount struct {
1313
count atomic.Int64
14-
dependencies []**Refcount
14+
dependencies []unsafe.Pointer
1515
buffers []**Buffer
1616
derived []unsafe.Pointer
1717
}
@@ -20,7 +20,7 @@ type Refcount struct {
2020
// When this object is completely unreferenced, all dependencies will
2121
// be unreferenced by it and, if this was the only object still
2222
// referencing them, they will be freed as well, recursively.
23-
func (r *Refcount) ReferenceDependency(d ...**Refcount) {
23+
func (r *Refcount) ReferenceDependency(d ...unsafe.Pointer) {
2424
r.dependencies = d
2525
}
2626

@@ -31,9 +31,8 @@ func (r *Refcount) ReferenceBuffer(b ...**Buffer) {
3131
r.buffers = b
3232
}
3333

34-
// Must only be called once per object, with a list of pointers that are
35-
// _derived from_ allocations owned by or referenced by this object.
36-
// When this object is unreferenced, all such pointers will be nilled.
34+
// Must only be called once per object, with a list of pointers that need to be
35+
// cleared when the object becomes unreferenced.
3736
// Note: this needs the _address of_ the pointers to nil, _not_ the pointers
3837
// themselves!
3938
func (r *Refcount) ReferenceDerived(p ...unsafe.Pointer) {
@@ -52,8 +51,16 @@ func (r *Refcount) Release() {
5251
*buffer = nil
5352
}
5453
for _, dependency := range r.dependencies {
55-
(*dependency).Release()
56-
*dependency = nil
54+
ptr := (*unsafe.Pointer)(dependency)
55+
if *ptr != nil {
56+
// Ptr should be a **T, where T has a Refcount
57+
// embedded at the front.
58+
// So, if *ptr != nil, we should be able to cast *ptr
59+
// to a *Refcount.
60+
rc := (*Refcount)(*ptr)
61+
rc.Release()
62+
*ptr = nil
63+
}
5764
}
5865
r.buffers = nil
5966
r.dependencies = nil

0 commit comments

Comments
 (0)