Skip to content

Commit 32983db

Browse files
refactor: changed atomic.Value to atomic.Pointer in pipe (#851)
* refactor: changed atomic.Value to atomic.Pointer in pipe * fix: fixed unit test failure in mock
1 parent 51c2ebf commit 32983db

File tree

2 files changed

+22
-17
lines changed

2 files changed

+22
-17
lines changed

mock/result.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,8 @@ type pool struct {
186186
type pipe struct {
187187
conn net.Conn
188188
error atomic.Pointer[errs]
189-
clhks atomic.Value // closed hook, invoked after the conn is closed
190-
pshks atomic.Value // pubsub hook, registered by the SetPubSubHooks
189+
clhks atomic.Value // closed hook, invoked after the conn is closed
190+
pshks atomic.Pointer[pshks] // pubsub hook, registered by the SetPubSubHooks
191191
queue any
192192
cache rueidis.CacheStore
193193
r *bufio.Reader
@@ -225,3 +225,8 @@ type stream struct {
225225
}
226226

227227
type errs struct{ error }
228+
229+
type pshks struct {
230+
hooks rueidis.PubSubHooks
231+
close chan error
232+
}

pipe.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ var _ wire = (*pipe)(nil)
6363

6464
type pipe struct {
6565
conn net.Conn
66-
clhks atomic.Value // closed hook, invoked after the conn is closed
67-
pshks atomic.Value // pubsub hook, registered by the SetPubSubHooks
66+
clhks atomic.Value // closed hook, invoked after the conn is closed
67+
pshks atomic.Pointer[pshks] // pubsub hook, registered by the SetPubSubHooks
6868
queue queue
6969
cache CacheStore
7070
error atomic.Pointer[errs]
@@ -393,7 +393,7 @@ func (p *pipe) _background() {
393393
p.nsubs.Close()
394394
p.psubs.Close()
395395
p.ssubs.Close()
396-
if old := p.pshks.Swap(emptypshks).(*pshks); old.close != nil {
396+
if old := p.pshks.Swap(emptypshks); old.close != nil {
397397
old.close <- err
398398
close(old.close)
399399
}
@@ -705,60 +705,60 @@ func (p *pipe) handlePush(values []RedisMessage) (reply bool, unsubscribe bool)
705705
if len(values) >= 3 {
706706
m := PubSubMessage{Channel: values[1].string(), Message: values[2].string()}
707707
p.nsubs.Publish(values[1].string(), m)
708-
p.pshks.Load().(*pshks).hooks.OnMessage(m)
708+
p.pshks.Load().hooks.OnMessage(m)
709709
}
710710
case "pmessage":
711711
if len(values) >= 4 {
712712
m := PubSubMessage{Pattern: values[1].string(), Channel: values[2].string(), Message: values[3].string()}
713713
p.psubs.Publish(values[1].string(), m)
714-
p.pshks.Load().(*pshks).hooks.OnMessage(m)
714+
p.pshks.Load().hooks.OnMessage(m)
715715
}
716716
case "smessage":
717717
if len(values) >= 3 {
718718
m := PubSubMessage{Channel: values[1].string(), Message: values[2].string()}
719719
p.ssubs.Publish(values[1].string(), m)
720-
p.pshks.Load().(*pshks).hooks.OnMessage(m)
720+
p.pshks.Load().hooks.OnMessage(m)
721721
}
722722
case "unsubscribe":
723723
if len(values) >= 3 {
724724
s := PubSubSubscription{Kind: values[0].string(), Channel: values[1].string(), Count: values[2].intlen}
725725
p.nsubs.Unsubscribe(s)
726-
p.pshks.Load().(*pshks).hooks.OnSubscription(s)
726+
p.pshks.Load().hooks.OnSubscription(s)
727727
}
728728
return true, true
729729
case "punsubscribe":
730730
if len(values) >= 3 {
731731
s := PubSubSubscription{Kind: values[0].string(), Channel: values[1].string(), Count: values[2].intlen}
732732
p.psubs.Unsubscribe(s)
733-
p.pshks.Load().(*pshks).hooks.OnSubscription(s)
733+
p.pshks.Load().hooks.OnSubscription(s)
734734
}
735735
return true, true
736736
case "sunsubscribe":
737737
if len(values) >= 3 {
738738
s := PubSubSubscription{Kind: values[0].string(), Channel: values[1].string(), Count: values[2].intlen}
739739
p.ssubs.Unsubscribe(s)
740-
p.pshks.Load().(*pshks).hooks.OnSubscription(s)
740+
p.pshks.Load().hooks.OnSubscription(s)
741741
}
742742
return true, true
743743
case "subscribe":
744744
if len(values) >= 3 {
745745
s := PubSubSubscription{Kind: values[0].string(), Channel: values[1].string(), Count: values[2].intlen}
746746
p.nsubs.Confirm(s)
747-
p.pshks.Load().(*pshks).hooks.OnSubscription(s)
747+
p.pshks.Load().hooks.OnSubscription(s)
748748
}
749749
return true, false
750750
case "psubscribe":
751751
if len(values) >= 3 {
752752
s := PubSubSubscription{Kind: values[0].string(), Channel: values[1].string(), Count: values[2].intlen}
753753
p.psubs.Confirm(s)
754-
p.pshks.Load().(*pshks).hooks.OnSubscription(s)
754+
p.pshks.Load().hooks.OnSubscription(s)
755755
}
756756
return true, false
757757
case "ssubscribe":
758758
if len(values) >= 3 {
759759
s := PubSubSubscription{Kind: values[0].string(), Channel: values[1].string(), Count: values[2].intlen}
760760
p.ssubs.Confirm(s)
761-
p.pshks.Load().(*pshks).hooks.OnSubscription(s)
761+
p.pshks.Load().hooks.OnSubscription(s)
762762
}
763763
return true, false
764764
}
@@ -871,7 +871,7 @@ func (p *pipe) SetPubSubHooks(hooks PubSubHooks) <-chan error {
871871
return p._r2pipe(context.Background()).SetPubSubHooks(hooks)
872872
}
873873
if hooks.isZero() {
874-
if old := p.pshks.Swap(emptypshks).(*pshks); old.close != nil {
874+
if old := p.pshks.Swap(emptypshks); old.close != nil {
875875
close(old.close)
876876
}
877877
return nil
@@ -883,11 +883,11 @@ func (p *pipe) SetPubSubHooks(hooks PubSubHooks) <-chan error {
883883
hooks.OnSubscription = func(s PubSubSubscription) {}
884884
}
885885
ch := make(chan error, 1)
886-
if old := p.pshks.Swap(&pshks{hooks: hooks, close: ch}).(*pshks); old.close != nil {
886+
if old := p.pshks.Swap(&pshks{hooks: hooks, close: ch}); old.close != nil {
887887
close(old.close)
888888
}
889889
if err := p.Error(); err != nil {
890-
if old := p.pshks.Swap(emptypshks).(*pshks); old.close != nil {
890+
if old := p.pshks.Swap(emptypshks); old.close != nil {
891891
old.close <- err
892892
close(old.close)
893893
}

0 commit comments

Comments
 (0)