Skip to content

Commit ebd1cd3

Browse files
committed
Refactor subscriber index move to compstore
Signed-off-by: Anton Troshin <[email protected]>
1 parent c19eb63 commit ebd1cd3

File tree

6 files changed

+76
-10
lines changed

6 files changed

+76
-10
lines changed

pkg/api/grpc/subscribe.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (a *api) SubscribeTopicEventsAlpha1(stream runtimev1pb.Dapr_SubscribeTopicE
7676
Routes: subapi.Routes{Default: "/"},
7777
},
7878
}
79-
connectionID := a.pubsubAdapterStreamer.NextIndex()
79+
connectionID := a.Universal.CompStore().NextSubscriberIndex()
8080
err = a.Universal.CompStore().AddStreamSubscription(sub, connectionID)
8181
if err != nil {
8282
return err

pkg/runtime/compstore/compstore.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package compstore
1515

1616
import (
1717
"sync"
18+
"sync/atomic"
1819

1920
"github.com/dapr/durabletask-go/backend"
2021

@@ -63,6 +64,9 @@ type ComponentStore struct {
6364

6465
compPendingLock sync.Mutex
6566
compPending *compsv1alpha1.Component
67+
68+
// subscribersIndex is used to generate unique connection IDs for streaming subscriptions
69+
subscribersIndex atomic.Uint64
6670
}
6771

6872
func New() *ComponentStore {

pkg/runtime/compstore/subscriptions.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,3 +277,7 @@ func (c *ComponentStore) ListProgramaticSubscriptions() []rtpubsub.Subscription
277277
defer c.lock.RUnlock()
278278
return c.subscriptions.programmatics
279279
}
280+
281+
func (c *ComponentStore) NextSubscriberIndex() rtpubsub.ConnectionID {
282+
return rtpubsub.ConnectionID(c.subscribersIndex.Add(1))
283+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package compstore
15+
16+
import (
17+
"sync"
18+
"testing"
19+
20+
rtpubsub "github.com/dapr/dapr/pkg/runtime/pubsub"
21+
"github.com/stretchr/testify/assert"
22+
)
23+
24+
func TestNextSubscriberIndex(t *testing.T) {
25+
t.Run("sequential calls return incrementing values", func(t *testing.T) {
26+
store := New()
27+
28+
id1 := store.NextSubscriberIndex()
29+
id2 := store.NextSubscriberIndex()
30+
id3 := store.NextSubscriberIndex()
31+
32+
assert.Equal(t, rtpubsub.ConnectionID(1), id1)
33+
assert.Equal(t, rtpubsub.ConnectionID(2), id2)
34+
assert.Equal(t, rtpubsub.ConnectionID(3), id3)
35+
})
36+
37+
t.Run("concurrent calls return unique values", func(t *testing.T) {
38+
store := New()
39+
const numGoroutines = 100
40+
const numCallsPerGoroutine = 10
41+
var wg sync.WaitGroup
42+
var mu sync.Mutex
43+
ids := make(map[rtpubsub.ConnectionID]bool)
44+
45+
wg.Add(numGoroutines)
46+
for i := 0; i < numGoroutines; i++ {
47+
go func() {
48+
defer wg.Done()
49+
for j := 0; j < numCallsPerGoroutine; j++ {
50+
id := store.NextSubscriberIndex()
51+
mu.Lock()
52+
ids[id] = true
53+
mu.Unlock()
54+
}
55+
}()
56+
}
57+
wg.Wait()
58+
59+
assert.Len(t, ids, numGoroutines*numCallsPerGoroutine, "Expected all IDs to be unique")
60+
61+
for i := 1; i <= numGoroutines*numCallsPerGoroutine; i++ {
62+
assert.True(t, ids[rtpubsub.ConnectionID(i)], "Expected ID %d to be present", i)
63+
}
64+
})
65+
}

pkg/runtime/pubsub/adapter.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ type AdapterStreamer interface {
4747
Subscribe(rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server, *rtv1pb.SubscribeTopicEventsRequestInitialAlpha1, ConnectionID) error
4848
Publish(context.Context, *SubscribedMessage) error
4949
StreamerKey(pubsub, topic string) string
50-
NextIndex() ConnectionID
5150
Close(key string, connectionID ConnectionID)
5251
}
5352

pkg/runtime/pubsub/streamer/streamer.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"io"
2121
"strings"
2222
"sync"
23-
"sync/atomic"
2423
"time"
2524

2625
"google.golang.org/grpc/codes"
@@ -44,9 +43,8 @@ type Connections map[rtpubsub.ConnectionID]*conn
4443
type Subscribers map[string]Connections
4544

4645
type streamer struct {
47-
tracingSpec *config.TracingSpec
48-
subscribers Subscribers
49-
subscribersIndex atomic.Uint64
46+
tracingSpec *config.TracingSpec
47+
subscribers Subscribers
5048

5149
lock sync.RWMutex
5250
}
@@ -228,10 +226,6 @@ func (s *streamer) StreamerKey(pubsub, topic string) string {
228226
return "___" + pubsub + "||" + topic
229227
}
230228

231-
func (s *streamer) NextIndex() rtpubsub.ConnectionID {
232-
return rtpubsub.ConnectionID(s.subscribersIndex.Add(1))
233-
}
234-
235229
func (s *streamer) Close(key string, connectionID rtpubsub.ConnectionID) {
236230
log.Warn("RLock Close")
237231
s.lock.RLock()

0 commit comments

Comments
 (0)