Skip to content

Commit 162407d

Browse files
committed
feat: add async conflict detection
1 parent ac9d26d commit 162407d

File tree

8 files changed

+86
-17
lines changed

8 files changed

+86
-17
lines changed

item.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ func (b *Item[T]) Trans() T {
3535
if b.stat.hasdestroyed() {
3636
panic("use after destroy")
3737
}
38+
if b.pool.issync {
39+
b.stat.setinsyncop(true)
40+
}
3841
val := b.val
3942
atomic.StoreUintptr(
4043
(*uintptr)(&b.stat), uintptr(destroyedstatus),
@@ -47,6 +50,10 @@ func (b *Item[T]) Trans() T {
4750
// HasInvolved whether this item is buffered
4851
// and will be Reset on putting back.
4952
func (b *Item[T]) HasInvolved() bool {
53+
if b.pool.issync {
54+
b.stat.setinsyncop(true)
55+
defer b.stat.setinsyncop(false)
56+
}
5057
return b.stat.isbuffered()
5158
}
5259

@@ -57,6 +64,10 @@ func (b *Item[T]) V(f func(T)) {
5764
if b.stat.hasdestroyed() {
5865
panic("use after destroy")
5966
}
67+
if b.pool.issync {
68+
b.stat.setinsyncop(true)
69+
defer b.stat.setinsyncop(false)
70+
}
6071
f(b.val)
6172
runtime.KeepAlive(b)
6273
}
@@ -68,6 +79,10 @@ func (b *Item[T]) P(f func(*T)) {
6879
if b.stat.hasdestroyed() {
6980
panic("use after destroy")
7081
}
82+
if b.pool.issync {
83+
b.stat.setinsyncop(true)
84+
defer b.stat.setinsyncop(false)
85+
}
7186
f(&b.val)
7287
runtime.KeepAlive(b)
7388
}
@@ -77,6 +92,10 @@ func (b *Item[T]) Copy() (cb *Item[T]) {
7792
if b.stat.hasdestroyed() {
7893
panic("use after destroy")
7994
}
95+
if b.pool.issync {
96+
b.stat.setinsyncop(true)
97+
defer b.stat.setinsyncop(false)
98+
}
8099
cb = b.pool.New(b.cfg)
81100
b.pool.pooler.Copy(&cb.val, &b.val)
82101
return
@@ -100,6 +119,9 @@ func (b *Item[T]) destroybystat(stat status) {
100119
// Calling this method only when you're sure that
101120
// no one will use it, or it will cause a panic.
102121
func (b *Item[T]) ManualDestroy() {
122+
if b.pool.issync {
123+
b.stat.setinsyncop(true)
124+
}
103125
runtime.SetFinalizer(b, nil)
104126
b.destroybystat(status(atomic.SwapUintptr(
105127
(*uintptr)(&b.stat), uintptr(destroyedstatus),

pbuf/buffer.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,19 @@ import (
1010
func (bufferPool BufferPool[USRDAT]) NewBuffer(
1111
buf []byte,
1212
) *orbyte.Item[UserBuffer[USRDAT]] {
13-
return bufferPool.p.New(buf)
13+
return bufferPool.New(buf)
1414
}
1515

1616
// InvolveBuffer involve external *bytes.Buffer into Item.
1717
func (bufferPool BufferPool[USRDAT]) InvolveBuffer(
1818
buf *bytes.Buffer,
1919
) *orbyte.Item[UserBuffer[USRDAT]] {
20-
return bufferPool.p.Involve(buf.Len(), buf)
20+
return bufferPool.Involve(buf.Len(), buf)
2121
}
2222

2323
// ParseBuffer convert external *bytes.Buffer into Item.
2424
func (bufferPool BufferPool[USRDAT]) ParseBuffer(
2525
buf *bytes.Buffer,
2626
) *orbyte.Item[UserBuffer[USRDAT]] {
27-
return bufferPool.p.Parse(buf.Len(), buf)
28-
}
29-
30-
func (bufferPool BufferPool[USRDAT]) CountItems() (outside int32, inside int32) {
31-
return bufferPool.p.CountItems()
27+
return bufferPool.Parse(buf.Len(), buf)
3228
}

pbuf/buffer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func testBuffer(buf *OBuffer, t *testing.T) {
5959
runtime.Gosched()
6060
runtime.GC()
6161

62-
out, in := bufferPool.p.CountItems()
62+
out, in := bufferPool.CountItems()
6363
t.Log(out, in)
6464
if out != 0 {
6565
t.Fail()

pbuf/bytes.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func (b UserBytes[USRDAT]) B(f func([]byte, *USRDAT)) {
3636

3737
// NewBytes alloc sz bytes.
3838
func (bufferPool BufferPool[USRDAT]) NewBytes(sz int) (b UserBytes[USRDAT]) {
39-
buf := bufferPool.p.New(sz)
39+
buf := bufferPool.New(sz)
4040
b.buf = buf
4141
buf.P(func(buf *UserBuffer[USRDAT]) {
4242
b.b = buf.Len()
@@ -46,7 +46,7 @@ func (bufferPool BufferPool[USRDAT]) NewBytes(sz int) (b UserBytes[USRDAT]) {
4646

4747
// InvolveBytes involve outside buf into pool.
4848
func (bufferPool BufferPool[USRDAT]) InvolveBytes(p ...byte) (b UserBytes[USRDAT]) {
49-
buf := bufferPool.p.Involve(len(p), bytes.NewBuffer(p))
49+
buf := bufferPool.Involve(len(p), bytes.NewBuffer(p))
5050
b.buf = buf
5151
buf.P(func(buf *UserBuffer[USRDAT]) {
5252
b.b = buf.Len()
@@ -57,7 +57,7 @@ func (bufferPool BufferPool[USRDAT]) InvolveBytes(p ...byte) (b UserBytes[USRDAT
5757
// ParseBytes convert outside bytes to Bytes safely
5858
// without adding it into pool.
5959
func (bufferPool BufferPool[USRDAT]) ParseBytes(p ...byte) (b UserBytes[USRDAT]) {
60-
buf := bufferPool.p.Parse(len(p), bytes.NewBuffer(p))
60+
buf := bufferPool.Parse(len(p), bytes.NewBuffer(p))
6161
b.buf = buf
6262
buf.P(func(buf *UserBuffer[USRDAT]) {
6363
b.b = buf.Len()

pbuf/bytes_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func TestBytesSlice(t *testing.T) {
3737
runtime.GC()
3838
runtime.Gosched()
3939
runtime.GC()
40-
out, in := bufferPool.p.CountItems()
40+
out, in := bufferPool.CountItems()
4141
t.Log(out, in)
4242
if out != 0 {
4343
t.Fail()
@@ -60,7 +60,7 @@ func TestBytesInvolve(t *testing.T) {
6060
}
6161
}
6262
runtime.GC()
63-
out, in := bufferPool.p.CountItems()
63+
out, in := bufferPool.CountItems()
6464
t.Log(out, in)
6565
if out != 0 {
6666
t.Fail()
@@ -80,7 +80,7 @@ func TestBytesParse(t *testing.T) {
8080
}
8181
}
8282
runtime.GC()
83-
out, in := bufferPool.p.CountItems()
83+
out, in := bufferPool.CountItems()
8484
t.Log(out, in)
8585
if out != 0 {
8686
t.Fail()
@@ -107,7 +107,7 @@ func TestBytesCopy(t *testing.T) {
107107
runtime.GC()
108108
runtime.Gosched()
109109
runtime.GC()
110-
out, in := bufferPool.p.CountItems()
110+
out, in := bufferPool.CountItems()
111111
t.Log(out, in)
112112
if out != 0 {
113113
t.Fail()

pbuf/pbuf.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ type (
1616
)
1717

1818
type BufferPool[USRDAT any] struct {
19-
p *orbyte.Pool[UserBuffer[USRDAT]]
19+
*orbyte.Pool[UserBuffer[USRDAT]]
2020
}
2121

2222
func NewBufferPool[USRDAT any]() BufferPool[USRDAT] {
2323
return BufferPool[USRDAT]{
24-
p: orbyte.NewPool[UserBuffer[USRDAT]](bufpooler[USRDAT]{}),
24+
orbyte.NewPool[UserBuffer[USRDAT]](bufpooler[USRDAT]{}),
2525
}
2626
}
2727

@@ -60,3 +60,23 @@ func ParseBytes(b ...byte) Bytes {
6060
func CountItems() (outside int32, inside int32) {
6161
return bufferPool.CountItems()
6262
}
63+
64+
// SetNoPutBack see Pool.SetNoPutBack
65+
func SetNoPutBack(on bool) {
66+
bufferPool.SetNoPutBack(on)
67+
}
68+
69+
// SetSyncItem see Pool.SetSyncItem
70+
func SetSyncItem(on bool) {
71+
bufferPool.SetSyncItem(on)
72+
}
73+
74+
// LimitInput see Pool.LimitInput
75+
func LimitInput(n int32) {
76+
bufferPool.LimitInput(n)
77+
}
78+
79+
// LimitInput see Pool.LimitOutput
80+
func LimitOutput(n int32) {
81+
bufferPool.LimitOutput(n)
82+
}

pool.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Pool[T any] struct {
2121
pooler Pooler[T]
2222

2323
noputbak bool
24+
issync bool
2425
}
2526

2627
// NewPool make a new pool from custom pooler.
@@ -43,6 +44,13 @@ func (pool *Pool[T]) SetNoPutBack(on bool) {
4344
pool.noputbak = on
4445
}
4546

47+
// SetSyncItem make it panic on every read-write conflict.
48+
//
49+
// Enable this to detect coding errors.
50+
func (pool *Pool[T]) SetSyncItem(on bool) {
51+
pool.issync = on
52+
}
53+
4654
// LimitOutput will automatically set new item no-autodestroy
4755
// if countout > outlim.
4856
func (pool *Pool[T]) LimitOutput(n int32) {

status.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import "sync/atomic"
55
const (
66
statusisbuffered = 1 << iota
77
statusdestroyed
8+
statusinsyncop
89
)
910

1011
type status uintptr
@@ -38,6 +39,24 @@ func (c *status) setbool(v bool, typ uintptr) {
3839
}
3940
}
4041

42+
// setboolunique panic on non-unique set
43+
func (c *status) setboolunique(v bool, typ uintptr) {
44+
olds := atomic.LoadUintptr((*uintptr)(c))
45+
oldv := olds&typ != 0
46+
if oldv == v {
47+
panic("non-unique operation")
48+
}
49+
news := status(olds).mask(v, typ)
50+
for !atomic.CompareAndSwapUintptr((*uintptr)(c), olds, uintptr(news)) {
51+
olds = atomic.LoadUintptr((*uintptr)(c))
52+
oldv = olds&typ != 0
53+
if oldv == v {
54+
panic("non-unique operation")
55+
}
56+
news = status(olds).mask(v, typ)
57+
}
58+
}
59+
4160
func (c *status) loadbool(typ uintptr) bool {
4261
return atomic.LoadUintptr((*uintptr)(c))&typ != 0
4362
}
@@ -57,3 +76,7 @@ func (c *status) hasdestroyed() bool {
5776
func (c *status) setdestroyed(v bool) {
5877
c.setbool(v, statusdestroyed)
5978
}
79+
80+
func (c *status) setinsyncop(v bool) {
81+
c.setboolunique(v, statusinsyncop)
82+
}

0 commit comments

Comments
 (0)