Skip to content

Commit 7c75ef7

Browse files
committed
rename field and add type def for readability
Signed-off-by: Anton Troshin <[email protected]>
1 parent e1e8eda commit 7c75ef7

File tree

2 files changed

+22
-18
lines changed

2 files changed

+22
-18
lines changed

pkg/runtime/pubsub/streamer/conn.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,31 +22,31 @@ import (
2222
)
2323

2424
type conn struct {
25-
lock sync.RWMutex
26-
streamLock sync.Mutex
27-
stream rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server
28-
publishResponses3 map[string]map[rtpubsub.ConnectionID]chan *rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1
29-
closeCh chan struct{}
30-
closed atomic.Bool
31-
connectionID rtpubsub.ConnectionID
25+
lock sync.RWMutex
26+
streamLock sync.Mutex
27+
stream rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server
28+
closeCh chan struct{}
29+
closed atomic.Bool
30+
connectionID rtpubsub.ConnectionID
31+
publishResponses PublishResponses
3232
}
3333

3434
func (c *conn) registerPublishResponse(id string) (chan *rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1, func()) {
3535
ch := make(chan *rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1, 1)
3636
c.lock.Lock()
3737

38-
if c.publishResponses3[id] == nil {
39-
c.publishResponses3[id] = make(map[rtpubsub.ConnectionID]chan *rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1)
38+
if c.publishResponses[id] == nil {
39+
c.publishResponses[id] = make(ConnectionChannel)
4040
}
41-
c.publishResponses3[id][c.connectionID] = ch
41+
c.publishResponses[id][c.connectionID] = ch
4242

4343
c.lock.Unlock()
4444
return ch, func() {
4545
c.lock.Lock()
4646

47-
delete(c.publishResponses3[id], c.connectionID)
48-
if len(c.publishResponses3[id]) == 0 {
49-
delete(c.publishResponses3, id)
47+
delete(c.publishResponses[id], c.connectionID)
48+
if len(c.publishResponses[id]) == 0 {
49+
delete(c.publishResponses, id)
5050
}
5151

5252
c.lock.Unlock()
@@ -61,7 +61,7 @@ func (c *conn) registerPublishResponse(id string) (chan *rtv1pb.SubscribeTopicEv
6161

6262
func (c *conn) notifyPublishResponse(resp *rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1) {
6363
c.lock.RLock()
64-
ch, ok := c.publishResponses3[resp.GetId()][c.connectionID]
64+
ch, ok := c.publishResponses[resp.GetId()][c.connectionID]
6565
c.lock.RUnlock()
6666

6767
if !ok {

pkg/runtime/pubsub/streamer/streamer.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ type Connections map[rtpubsub.ConnectionID]*conn
4242

4343
type Subscribers map[string]Connections
4444

45+
type ConnectionChannel map[rtpubsub.ConnectionID]chan *rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1
46+
47+
type PublishResponses map[string]ConnectionChannel
48+
4549
type streamer struct {
4650
tracingSpec *config.TracingSpec
4751
subscribers Subscribers
@@ -63,10 +67,10 @@ func (s *streamer) Subscribe(stream rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server
6367
key := s.StreamerKey(req.GetPubsubName(), req.GetTopic())
6468

6569
connection := &conn{
66-
stream: stream,
67-
publishResponses3: make(map[string]map[rtpubsub.ConnectionID]chan *rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1),
68-
closeCh: make(chan struct{}),
69-
connectionID: connectionID,
70+
stream: stream,
71+
connectionID: connectionID,
72+
closeCh: make(chan struct{}),
73+
publishResponses: make(PublishResponses),
7074
}
7175
if s.subscribers[key] == nil {
7276
s.subscribers[key] = make(Connections)

0 commit comments

Comments
 (0)