Skip to content

Commit c19eb63

Browse files
committed
lint and fix tests
Signed-off-by: Anton Troshin <[email protected]>
1 parent e5b06b5 commit c19eb63

File tree

6 files changed

+29
-35
lines changed

6 files changed

+29
-35
lines changed

pkg/runtime/processor/manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@ package processor
1616
import (
1717
"context"
1818
"fmt"
19-
subapi "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1"
20-
rtpubsub "github.com/dapr/dapr/pkg/runtime/pubsub"
2119

2220
"github.com/dapr/durabletask-go/backend"
2321

2422
"github.com/dapr/components-contrib/bindings"
2523
componentsapi "github.com/dapr/dapr/pkg/apis/components/v1alpha1"
24+
subapi "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1"
2625
"github.com/dapr/dapr/pkg/runtime/meta"
26+
rtpubsub "github.com/dapr/dapr/pkg/runtime/pubsub"
2727
)
2828

2929
// manager implements the life cycle events of a component category.

pkg/runtime/processor/subscriber/subscriber.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@ import (
1717
"context"
1818
"errors"
1919
"fmt"
20-
subapi "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1"
2120
"sync"
2221
"sync/atomic"
2322

2423
"google.golang.org/grpc"
2524

2625
apierrors "github.com/dapr/dapr/pkg/api/errors"
2726
"github.com/dapr/dapr/pkg/api/grpc/manager"
27+
subapi "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1"
2828
"github.com/dapr/dapr/pkg/config"
2929
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
3030
"github.com/dapr/dapr/pkg/resiliency"
@@ -139,7 +139,10 @@ func (s *Subscriber) StartStreamerSubscription(subscription *subapi.Subscription
139139
return apierrors.PubSub("").WithMetadata(nil).DeserializeError(errors.New("subscriber is closed"))
140140
}
141141

142-
sub, ok := s.compStore.GetStreamSubscription(subscription)
142+
sub, found := s.compStore.GetStreamSubscription(subscription)
143+
if !found {
144+
return apierrors.PubSub("").WithMetadata(nil).NotFound()
145+
}
143146

144147
pubsub, ok := s.compStore.GetPubSub(sub.PubsubName)
145148
if !ok {
@@ -153,8 +156,8 @@ func (s *Subscriber) StartStreamerSubscription(subscription *subapi.Subscription
153156

154157
key := s.adapterStreamer.StreamerKey(sub.PubsubName, sub.Topic)
155158

156-
subscriptions, ok := s.streamSubs[sub.PubsubName]
157-
if subscriptions == nil {
159+
_, exists := s.streamSubs[sub.PubsubName]
160+
if !exists {
158161
s.streamSubs[sub.PubsubName] = make(map[rtpubsub.ConnectionID]*namedSubscription)
159162
}
160163

@@ -352,7 +355,6 @@ func (s *Subscriber) reloadPubSubStream(name string, pubsub *rtpubsub.PubsubItem
352355
return nil
353356
}
354357

355-
//subs := make([]*namedSubscription, 0, len(s.compStore.ListSubscriptionsStreamByPubSub(name)))
356358
subs := make(map[rtpubsub.ConnectionID]*namedSubscription, len(s.compStore.ListSubscriptionsStreamByPubSub(name)))
357359
var errs []error
358360
for _, sub := range s.compStore.ListSubscriptionsStreamByPubSub(name) {

pkg/runtime/processor/subscriber/subscriber_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,23 +96,23 @@ func TestSubscriptionLifecycle(t *testing.T) {
9696
Topic: "topic4",
9797
Routes: subapi.Routes{Default: "/"},
9898
},
99-
})
99+
}, rtpubsub.ConnectionID(1))
100100
compStore.AddStreamSubscription(&subapi.Subscription{
101101
ObjectMeta: metav1.ObjectMeta{Name: "sub2||"},
102102
Spec: subapi.SubscriptionSpec{
103103
Pubsubname: "mockPubSub2",
104104
Topic: "topic5",
105105
Routes: subapi.Routes{Default: "/"},
106106
},
107-
})
107+
}, rtpubsub.ConnectionID(2))
108108
compStore.AddStreamSubscription(&subapi.Subscription{
109109
ObjectMeta: metav1.ObjectMeta{Name: "sub3||"},
110110
Spec: subapi.SubscriptionSpec{
111111
Pubsubname: "mockPubSub3",
112112
Topic: "topic6",
113113
Routes: subapi.Routes{Default: "/"},
114114
},
115-
})
115+
}, rtpubsub.ConnectionID(3))
116116

117117
subs := New(Options{
118118
CompStore: compStore,
@@ -356,23 +356,23 @@ func TestReloadPubSub(t *testing.T) {
356356
Topic: "topic4",
357357
Routes: subapi.Routes{Default: "/"},
358358
},
359-
})
359+
}, rtpubsub.ConnectionID(1))
360360
compStore.AddStreamSubscription(&subapi.Subscription{
361361
ObjectMeta: metav1.ObjectMeta{Name: "sub2||"},
362362
Spec: subapi.SubscriptionSpec{
363363
Pubsubname: "mockPubSub2",
364364
Topic: "topic5",
365365
Routes: subapi.Routes{Default: "/"},
366366
},
367-
})
367+
}, rtpubsub.ConnectionID(2))
368368
compStore.AddStreamSubscription(&subapi.Subscription{
369369
ObjectMeta: metav1.ObjectMeta{Name: "sub3||"},
370370
Spec: subapi.SubscriptionSpec{
371371
Pubsubname: "mockPubSub3",
372372
Topic: "topic6",
373373
Routes: subapi.Routes{Default: "/"},
374374
},
375-
})
375+
}, rtpubsub.ConnectionID(3))
376376

377377
subs.ReloadPubSub("mockPubSub1")
378378
assert.Eventually(t, func() bool {

pkg/runtime/pubsub/streamer/conn.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,17 @@ limitations under the License.
1414
package streamer
1515

1616
import (
17-
rtpubsub "github.com/dapr/dapr/pkg/runtime/pubsub"
1817
"sync"
1918
"sync/atomic"
2019

2120
rtv1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
21+
rtpubsub "github.com/dapr/dapr/pkg/runtime/pubsub"
2222
)
2323

2424
type conn struct {
25-
lock sync.RWMutex
26-
streamLock sync.Mutex
27-
stream rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server
28-
//publishResponses map[string]chan *rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1
29-
//publishResponses2 sync.Map
25+
lock sync.RWMutex
26+
streamLock sync.Mutex
27+
stream rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server
3028
publishResponses3 map[string]map[rtpubsub.ConnectionID]chan *rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1
3129
closeCh chan struct{}
3230
closed atomic.Bool
@@ -38,10 +36,6 @@ func (c *conn) registerPublishResponse(id string) (chan *rtv1pb.SubscribeTopicEv
3836
log.Warnf("Lock registerPublishResponse messageId %s ConnectionID%d", id, c.connectionID)
3937
c.lock.Lock()
4038

41-
//ch, ok := c.publishResponses[id]
42-
//if !ok {
43-
// c.publishResponses[id] = ch
44-
//}
4539
if c.publishResponses3[id] == nil {
4640
c.publishResponses3[id] = make(map[rtpubsub.ConnectionID]chan *rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1)
4741
}
@@ -52,7 +46,6 @@ func (c *conn) registerPublishResponse(id string) (chan *rtv1pb.SubscribeTopicEv
5246
return ch, func() {
5347
log.Warnf("Lock registerPublishResponse defer messageId %s ConnectionID%d", id, c.connectionID)
5448
c.lock.Lock()
55-
//delete(c.publishResponses, id)
5649

5750
delete(c.publishResponses3[id], c.connectionID)
5851
if len(c.publishResponses3[id]) == 0 {
@@ -73,7 +66,6 @@ func (c *conn) registerPublishResponse(id string) (chan *rtv1pb.SubscribeTopicEv
7366
func (c *conn) notifyPublishResponse(resp *rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1) {
7467
log.Warnf("Lock notifyPublishResponse messageId %s ConnectionID%d", resp.GetId(), c.connectionID)
7568
c.lock.RLock()
76-
//ch, ok := c.publishResponses[resp.GetId()]
7769
ch, ok := c.publishResponses3[resp.GetId()][c.connectionID]
7870
c.lock.RUnlock()
7971
log.Warnf("Unlock notifyPublishResponse messageId %s ConnectionID%d", resp.GetId(), c.connectionID)

pkg/runtime/pubsub/streamer/streamer.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ func (s *streamer) Subscribe(stream rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server
6666
key := s.StreamerKey(req.GetPubsubName(), req.GetTopic())
6767

6868
connection := &conn{
69-
stream: stream,
70-
//publishResponses: make(map[string]chan *rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1),
69+
stream: stream,
7170
publishResponses3: make(map[string]map[rtpubsub.ConnectionID]chan *rtv1pb.SubscribeTopicEventsRequestProcessedAlpha1),
7271
closeCh: make(chan struct{}),
7372
connectionID: connectionID,
@@ -157,7 +156,7 @@ func (s *streamer) Publish(ctx context.Context, msg *rtpubsub.SubscribedMessage)
157156
}
158157

159158
if connection.closed.Load() {
160-
return fmt.Errorf("connection is closed")
159+
return errors.New("connection is closed")
161160
}
162161

163162
envelope, span, err := rtpubsub.GRPCEnvelopeFromSubscriptionMessage(ctx, msg, log, s.tracingSpec)

tests/integration/suite/daprd/subscriptions/stream/multi.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@ package stream
1515

1616
import (
1717
"context"
18-
"github.com/stretchr/testify/assert"
19-
"github.com/stretchr/testify/require"
2018
"sync"
2119
"sync/atomic"
2220
"testing"
2321
"time"
2422

23+
"github.com/stretchr/testify/assert"
24+
"github.com/stretchr/testify/require"
25+
2526
rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1"
2627
"github.com/dapr/dapr/tests/integration/framework"
2728
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
@@ -145,8 +146,8 @@ func (m *multi) Run(t *testing.T, ctx context.Context) {
145146
stream2: "b",
146147
stream3: "c",
147148
} {
148-
event, err := stream.Recv()
149-
require.NoError(t, err)
149+
event, recvErr := stream.Recv()
150+
require.NoError(t, recvErr)
150151
assert.Equal(t, topic, event.GetEventMessage().GetTopic())
151152
}
152153

@@ -226,7 +227,7 @@ func (m *multi) Run(t *testing.T, ctx context.Context) {
226227

227228
for receivedMessages < expectedMessages {
228229
event, recvErr := stream.Recv()
229-
require.NoError(c, recvErr)
230+
assert.NoError(c, recvErr)
230231
assert.Equal(c, singleTopicMultipleSubscribers, event.GetEventMessage().GetTopic())
231232

232233
sendErr := stream.Send(&rtv1.SubscribeTopicEventsRequestAlpha1{
@@ -237,7 +238,7 @@ func (m *multi) Run(t *testing.T, ctx context.Context) {
237238
},
238239
},
239240
})
240-
require.NoError(c, sendErr)
241+
assert.NoError(c, sendErr)
241242

242243
receivedTotal.Add(1)
243244
receivedMessages++
@@ -256,6 +257,6 @@ func (m *multi) Run(t *testing.T, ctx context.Context) {
256257

257258
assert.Eventually(t, func() bool {
258259
wg.Wait()
259-
return receivedTotal.Load() == int32(messagesToSend*len(subscribers))
260+
return receivedTotal.Load() == int32(messagesToSend*len(subscribers)) //nolint:gosec
260261
}, time.Second*10, time.Millisecond*10)
261262
}

0 commit comments

Comments
 (0)