Skip to content

Commit bdcec92

Browse files
Deduplicate Subscriptions
Signed-off-by: Niranjani Vivek <niranjaniv@google.com>
1 parent 8fccb68 commit bdcec92

File tree

5 files changed

+783
-29
lines changed

5 files changed

+783
-29
lines changed

gnmi_server/server_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"time"
2424
"unsafe"
2525

26+
"github.com/Azure/sonic-mgmt-common/translib/db"
2627
"github.com/sonic-net/sonic-gnmi/common_utils"
2728
spb "github.com/sonic-net/sonic-gnmi/proto"
2829
sgpb "github.com/sonic-net/sonic-gnmi/proto/gnoi"
@@ -6230,6 +6231,107 @@ func TestGnoiAuthorization(t *testing.T) {
62306231
s.Stop()
62316232
}
62326233

6234+
func TestSubscriptionDeduplication(t *testing.T) {
6235+
// Create the server
6236+
s := createServer(t, 8081)
6237+
s.config.EnableTranslation = true
6238+
go runServer(t, s)
6239+
defer s.Stop()
6240+
6241+
rclient := db.RedisClient(db.ConfigDB)
6242+
defer db.CloseRedisClient(rclient)
6243+
6244+
// The server is ready - now a request is needed.
6245+
tlsConfig := &tls.Config{InsecureSkipVerify: true}
6246+
opts := []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))}
6247+
6248+
targetAddr := fmt.Sprintf("127.0.0.1:%d", s.config.Port)
6249+
conn, err := grpc.Dial(targetAddr, opts...)
6250+
if err != nil {
6251+
t.Fatalf("Dialing to %q failed: %v", targetAddr, err)
6252+
}
6253+
defer conn.Close()
6254+
gClient := pb.NewGNMIClient(conn)
6255+
6256+
samplePath, _ := xpath.ToGNMIPath("/interfaces/interface[name=Ethernet1]/config/description")
6257+
subReq := &pb.SubscribeRequest{
6258+
Request: &pb.SubscribeRequest_Subscribe{
6259+
Subscribe: &pb.SubscriptionList{
6260+
Prefix: &pb.Path{Origin: "openconfig", Target: "OC_YANG"},
6261+
Mode: pb.SubscriptionList_STREAM,
6262+
Encoding: pb.Encoding_PROTO,
6263+
Subscription: []*pb.Subscription{
6264+
{
6265+
Path: samplePath,
6266+
Mode: pb.SubscriptionMode_SAMPLE,
6267+
SampleInterval: 1000000000, // 1 second
6268+
},
6269+
},
6270+
},
6271+
},
6272+
}
6273+
6274+
updates := make([][]*pb.SubscribeResponse, 2)
6275+
wg := sync.WaitGroup{}
6276+
6277+
// Start a goroutine to change the description of the port every 100ms.
6278+
wg.Add(1)
6279+
go func() {
6280+
defer wg.Done()
6281+
end := time.Now().Add(5 * time.Second)
6282+
for time.Now().Before(end) {
6283+
if err := rclient.HSet(context.Background(), "PORT|Ethernet1", "description", time.Now().String()).Err(); err != nil {
6284+
t.Errorf("Failed to set description: %v", err)
6285+
return
6286+
}
6287+
time.Sleep(100 * time.Millisecond)
6288+
}
6289+
}()
6290+
6291+
// Start two Sample subscriptions and collect their responses.
6292+
for i := 0; i < 2; i++ {
6293+
wg.Add(1)
6294+
go func(i int) {
6295+
defer wg.Done()
6296+
subUpdates := []*pb.SubscribeResponse{}
6297+
ctx, cancel := context.WithTimeout(context.Background(), 5500*time.Millisecond)
6298+
defer cancel()
6299+
6300+
stream, err := gClient.Subscribe(ctx, grpc.MaxCallRecvMsgSize(6000000))
6301+
if err != nil {
6302+
t.Error(err.Error())
6303+
return
6304+
}
6305+
if err = stream.Send(subReq); err != nil {
6306+
t.Errorf("Failed to send subscription: %v", err)
6307+
return
6308+
}
6309+
6310+
syncReceived := false
6311+
for {
6312+
resp, err := stream.Recv()
6313+
if err != nil {
6314+
break
6315+
}
6316+
if resp.GetSyncResponse() {
6317+
syncReceived = true
6318+
continue
6319+
}
6320+
if syncReceived {
6321+
subUpdates = append(subUpdates, resp)
6322+
}
6323+
}
6324+
updates[i] = subUpdates
6325+
}(i)
6326+
time.Sleep(300 * time.Millisecond)
6327+
}
6328+
wg.Wait()
6329+
6330+
if !reflect.DeepEqual(updates[0], updates[1]) {
6331+
t.Errorf("Updates received do not match for identical subscriptions!\nFirst Client's Updates:%v\nSeconds Client's Updates:%v", updates[0], updates[1])
6332+
}
6333+
}
6334+
62336335
func init() {
62346336
// Enable logs at UT setup
62356337
flag.Lookup("v").Value.Set("10")
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
package client
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"sync/atomic"
7+
"time"
8+
9+
log "github.com/golang/glog"
10+
"github.com/golang/protobuf/proto"
11+
gnmipb "github.com/openconfig/gnmi/proto/gnmi"
12+
spb "github.com/sonic-net/sonic-gnmi/proto"
13+
)
14+
15+
var superSubs = superSubscriptions{
16+
mu: &sync.Mutex{},
17+
subs: map[*superSubscription]bool{},
18+
}
19+
20+
type superSubscriptions struct {
21+
mu *sync.Mutex
22+
subs map[*superSubscription]bool
23+
}
24+
25+
// superSubscription is used to deduplicate subscriptions. Stream Subscriptions
26+
// become part of a superSubscription and whenever a Sample is processed, the
27+
// response is sent to all clients that are part of the superSubscription.
28+
type superSubscription struct {
29+
mu *sync.RWMutex
30+
clients map[*TranslClient]struct{}
31+
request *gnmipb.SubscriptionList
32+
primaryClient *TranslClient
33+
tickers map[int]*time.Ticker // map of interval duration (nanoseconds) to ticker.
34+
sharedUpdates atomic.Uint64
35+
exclusiveUpdates atomic.Uint64
36+
}
37+
38+
// ------------- Super Subscription Functions -------------
39+
// createSuperSubscription takes a SubscriptionList and returns a new
40+
// superSubscription for that SubscriptionList. This function expects the
41+
// caller to already hold superSubs.mu before calling createSuperSubscription.
42+
func createSuperSubscription(subscription *gnmipb.SubscriptionList) *superSubscription {
43+
if subscription == nil {
44+
return nil
45+
}
46+
newSuperSub := &superSubscription{
47+
mu: &sync.RWMutex{},
48+
clients: map[*TranslClient]struct{}{},
49+
request: subscription,
50+
primaryClient: nil,
51+
tickers: map[int]*time.Ticker{},
52+
sharedUpdates: atomic.Uint64{},
53+
exclusiveUpdates: atomic.Uint64{},
54+
}
55+
if _, ok := superSubs.subs[newSuperSub]; ok {
56+
// This should never happen.
57+
log.V(0).Infof("Super Subscription (%p) for %v already exists but a new has been created!", newSuperSub, subscription)
58+
}
59+
superSubs.subs[newSuperSub] = true
60+
return newSuperSub
61+
}
62+
63+
// findSuperSubscription takes a SubscriptionList and tries to find an
64+
// existing superSubscription for that SubscriptionList. If one is found,
65+
// the superSubscription is returned. Else, nil is returned. This function
66+
// expects the caller to already hold superSubs.mu before calling findSuperSubscription.
67+
func findSuperSubscription(subscription *gnmipb.SubscriptionList) *superSubscription {
68+
if subscription == nil {
69+
return nil
70+
}
71+
for sub, _ := range superSubs.subs {
72+
if sub.request == nil {
73+
continue
74+
}
75+
if proto.Equal(sub.request, subscription) {
76+
return sub
77+
}
78+
}
79+
return nil
80+
}
81+
82+
// deleteSuperSub removes superSub from the superSubs map.
83+
// If the superSub is removed from the TranslClient, there
84+
// should be no remaining references to the superSub. This
85+
// function expects the caller to already hold superSubs.mu
86+
// before calling deleteSuperSubscription.
87+
func deleteSuperSubscription(superSub *superSubscription) {
88+
if superSub == nil {
89+
log.V(0).Info("deleteSuperSubscription called on a nil Super Subscription!")
90+
return
91+
}
92+
tickerCleanup(superSub.tickers)
93+
delete(superSubs.subs, superSub)
94+
}
95+
96+
// ------------- Super Subscription Methods -------------
97+
// sendNotifications takes a value and adds it to the notification
98+
// queue for each subscription in the superSubscription.
99+
func (ss *superSubscription) sendNotifications(v *spb.Value) {
100+
if v == nil {
101+
return
102+
}
103+
ss.mu.RLock()
104+
defer ss.mu.RUnlock()
105+
for client, _ := range ss.clients {
106+
value := proto.Clone(v).(*spb.Value)
107+
client.q.Put(Value{value})
108+
}
109+
}
110+
111+
// populateTickers populates the ticker_info objects in the intervalToTickerInfoMap with the
112+
// shared tickers. If tickers don't exist yet, they are created.
113+
func (ss *superSubscription) populateTickers(intervalToTickerInfoMap map[int][]*ticker_info) error {
114+
if intervalToTickerInfoMap == nil {
115+
return fmt.Errorf("Invalid intervalToTickerInfoMap passed in: %v", intervalToTickerInfoMap)
116+
}
117+
ss.mu.Lock()
118+
defer ss.mu.Unlock()
119+
if len(ss.tickers) == 0 {
120+
// Create the tickers.
121+
for interval, tInfos := range intervalToTickerInfoMap {
122+
ticker := time.NewTicker(time.Duration(interval) * time.Nanosecond)
123+
ss.tickers[interval] = ticker
124+
for _, tInfo := range tInfos {
125+
tInfo.t = ticker
126+
}
127+
}
128+
return nil
129+
}
130+
// Use the existing tickers.
131+
if len(ss.tickers) != len(intervalToTickerInfoMap) {
132+
return fmt.Errorf("Length of intervalToTickerInfoMap does not match length of existing tickers for Super Subscription! existing tickers=%v, intervalToTickerInfoMap=%v", ss.tickers, intervalToTickerInfoMap)
133+
}
134+
for interval, tInfos := range intervalToTickerInfoMap {
135+
ticker, ok := ss.tickers[interval]
136+
if !ok {
137+
return fmt.Errorf("Interval in intervalToTickerInfoMap not found in existing tickers for Super Subscription! interval=%v", interval)
138+
}
139+
for _, tInfo := range tInfos {
140+
tInfo.t = ticker
141+
}
142+
}
143+
return nil
144+
}
145+
func (ss *superSubscription) String() string {
146+
return fmt.Sprintf("[{%p} NumClients=%d, SharedUpdates=%d, ExclusiveUpdates=%d, Request=%v]", ss, len(ss.clients), ss.sharedUpdates.Load(), ss.exclusiveUpdates.Load(), ss.request)
147+
}
148+
149+
// ------------- TranslClient Methods -------------
150+
// isPrimary returns true if the client is the primary client of its superSubscription.
151+
func (c *TranslClient) isPrimary() bool {
152+
if c == nil || c.superSub == nil {
153+
return false
154+
}
155+
c.superSub.mu.RLock()
156+
defer c.superSub.mu.RUnlock()
157+
return c.superSub.primaryClient == c
158+
}
159+
160+
// leaveSuperSubscription removes the client from the superSubscription.
161+
// If there are no remaining clients in the superSubscription, it is deleted.
162+
func (c *TranslClient) leaveSuperSubscription() {
163+
if c == nil || c.superSub == nil {
164+
return
165+
}
166+
superSubs.mu.Lock()
167+
defer superSubs.mu.Unlock()
168+
c.superSub.mu.Lock()
169+
defer c.superSub.mu.Unlock()
170+
delete(c.superSub.clients, c)
171+
if len(c.superSub.clients) == 0 {
172+
deleteSuperSubscription(c.superSub)
173+
log.V(2).Infof("SuperSubscription (%s) closing!", c.superSub)
174+
} else if c.superSub.primaryClient == c {
175+
// Set a new primary client.
176+
for client := range c.superSub.clients {
177+
c.superSub.primaryClient = client
178+
client.wakeChan <- true
179+
break
180+
}
181+
log.V(2).Infof("SuperSubscription (%s): %p is now the primary client", c.superSub, c.superSub.primaryClient)
182+
}
183+
}
184+
185+
// addClientToSuperSubscription adds a client to a superSubscription.
186+
func (c *TranslClient) addClientToSuperSubscription(subscription *gnmipb.SubscriptionList) {
187+
if c == nil || subscription == nil {
188+
return
189+
}
190+
superSubs.mu.Lock()
191+
defer superSubs.mu.Unlock()
192+
superSub := findSuperSubscription(subscription)
193+
if superSub == nil {
194+
superSub = createSuperSubscription(subscription)
195+
}
196+
superSub.mu.Lock()
197+
defer superSub.mu.Unlock()
198+
c.superSub = superSub
199+
superSub.clients[c] = struct{}{}
200+
if superSub.primaryClient == nil {
201+
superSub.primaryClient = c
202+
}
203+
log.V(2).Infof("SuperSubscription (%s): added new client=%p", superSub, c)
204+
}

0 commit comments

Comments
 (0)