@@ -59,7 +59,6 @@ func New(opts Options) rtpubsub.AdapterStreamer {
59
59
}
60
60
61
61
func (s * streamer ) Subscribe (stream rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server , req * rtv1pb.SubscribeTopicEventsRequestInitialAlpha1 , connectionID rtpubsub.ConnectionID ) error {
62
- log .Warn ("Lock Subscribe ConnectionID" , connectionID )
63
62
s .lock .Lock ()
64
63
key := s .StreamerKey (req .GetPubsubName (), req .GetTopic ())
65
64
@@ -74,12 +73,10 @@ func (s *streamer) Subscribe(stream rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server
74
73
}
75
74
s.subscribers [key ][connectionID ] = connection
76
75
77
- log .Infof ("Subscribing to pubsub '%s' topic '%s' ConnectionID%d" , req .GetPubsubName (), req .GetTopic (), connectionID )
76
+ log .Infof ("Subscribing to pubsub '%s' topic '%s' ConnectionID %d" , req .GetPubsubName (), req .GetTopic (), connectionID )
78
77
s .lock .Unlock ()
79
- log .Warn ("Unlock Subscribe ConnectionID" , connectionID )
80
78
81
79
defer func () {
82
- log .Warn ("Lock Subscribe defer" )
83
80
s .lock .Lock ()
84
81
if connections := s .subscribers [key ]; connections != nil {
85
82
delete (connections , connectionID )
@@ -88,7 +85,6 @@ func (s *streamer) Subscribe(stream rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server
88
85
}
89
86
}
90
87
s .lock .Unlock ()
91
- log .Warn ("Unlock Subscribe defer" )
92
88
}()
93
89
94
90
errCh := make (chan error , 2 )
@@ -103,15 +99,12 @@ func (s *streamer) Subscribe(stream rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server
103
99
104
100
go func () {
105
101
for {
106
- log .Info ("Waiting for Recv, connectionID" , connectionID )
107
102
resp , err := stream .Recv ()
108
- log .Infof ("Recv, connectionID%d, resp%s" , connectionID , resp )
109
103
s , ok := status .FromError (err )
110
104
111
105
if (ok && s .Code () == codes .Canceled ) ||
112
106
errors .Is (err , context .Canceled ) ||
113
107
errors .Is (err , io .EOF ) {
114
- log .Infof ("Unsubscribed from pubsub '%s' topic '%s'" , req .GetPubsubName (), req .GetTopic ())
115
108
errCh <- err
116
109
return
117
110
}
@@ -143,12 +136,10 @@ func (s *streamer) Subscribe(stream rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server
143
136
}
144
137
145
138
func (s * streamer ) Publish (ctx context.Context , msg * rtpubsub.SubscribedMessage ) error {
146
- log .Warn ("RLock Publish ConnectionID" , msg .SubscriberID )
147
139
s .lock .RLock ()
148
140
key := s .StreamerKey (msg .PubSub , msg .Topic )
149
141
connection , ok := s.subscribers [key ][msg.SubscriberID ]
150
142
s .lock .RUnlock ()
151
- log .Warn ("RUnlock Publish ConnectionID" , msg .SubscriberID )
152
143
if ! ok {
153
144
return fmt .Errorf ("no streamer subscribed to pubsub %q topic %q" , msg .PubSub , msg .Topic )
154
145
}
@@ -169,15 +160,13 @@ func (s *streamer) Publish(ctx context.Context, msg *rtpubsub.SubscribedMessage)
169
160
defer cleanup ()
170
161
171
162
start := time .Now ()
172
- log .Warn ("Lock stream ConnectionID" , connection .connectionID )
173
163
connection .streamLock .Lock ()
174
164
err = connection .stream .Send (& rtv1pb.SubscribeTopicEventsResponseAlpha1 {
175
165
SubscribeTopicEventsResponseType : & rtv1pb.SubscribeTopicEventsResponseAlpha1_EventMessage {
176
166
EventMessage : envelope ,
177
167
},
178
168
})
179
169
connection .streamLock .Unlock ()
180
- log .Warn ("Unlock stream ConnectionID" , connection .connectionID )
181
170
elapsed := diag .ElapsedSince (start )
182
171
183
172
if span != nil {
@@ -227,11 +216,9 @@ func (s *streamer) StreamerKey(pubsub, topic string) string {
227
216
}
228
217
229
218
func (s * streamer ) Close (key string , connectionID rtpubsub.ConnectionID ) {
230
- log .Warn ("RLock Close" )
231
219
s .lock .RLock ()
232
220
defer func () {
233
221
s .lock .RUnlock ()
234
- log .Warn ("Unlock Close defer" )
235
222
}()
236
223
237
224
if conn , ok := s.subscribers [key ][connectionID ]; ok {
0 commit comments