Skip to content

Commit b801ffa

Browse files
Deduplicate Subscriptions
Signed-off-by: Niranjani Vivek <niranjaniv@google.com>
1 parent 8fccb68 commit b801ffa

File tree

5 files changed

+782
-29
lines changed

5 files changed

+782
-29
lines changed

gnmi_server/server_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"time"
2424
"unsafe"
2525

26+
"github.com/Azure/sonic-mgmt-common/translib/db"
2627
"github.com/sonic-net/sonic-gnmi/common_utils"
2728
spb "github.com/sonic-net/sonic-gnmi/proto"
2829
sgpb "github.com/sonic-net/sonic-gnmi/proto/gnoi"
@@ -6230,6 +6231,107 @@ func TestGnoiAuthorization(t *testing.T) {
62306231
s.Stop()
62316232
}
62326233

6234+
func TestSubscriptionDeduplication(t *testing.T) {
6235+
// Create the server
6236+
s := createServer(t, 8081)
6237+
s.config.EnableTranslation = true
6238+
go runServer(t, s)
6239+
defer s.Stop()
6240+
6241+
rclient := db.RedisClient(db.ConfigDB)
6242+
defer db.CloseRedisClient(rclient)
6243+
6244+
// The server is ready - now a request is needed.
6245+
tlsConfig := &tls.Config{InsecureSkipVerify: true}
6246+
opts := []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))}
6247+
6248+
targetAddr := fmt.Sprintf("127.0.0.1:%d", s.config.Port)
6249+
conn, err := grpc.Dial(targetAddr, opts...)
6250+
if err != nil {
6251+
t.Fatalf("Dialing to %q failed: %v", targetAddr, err)
6252+
}
6253+
defer conn.Close()
6254+
gClient := pb.NewGNMIClient(conn)
6255+
6256+
samplePath, _ := xpath.ToGNMIPath("/interfaces/interface[name=Ethernet1/1/1]/config/description")
6257+
subReq := &pb.SubscribeRequest{
6258+
Request: &pb.SubscribeRequest_Subscribe{
6259+
Subscribe: &pb.SubscriptionList{
6260+
Prefix: &pb.Path{Origin: "openconfig", Target: "OC_YANG"},
6261+
Mode: pb.SubscriptionList_STREAM,
6262+
Encoding: pb.Encoding_PROTO,
6263+
Subscription: []*pb.Subscription{
6264+
{
6265+
Path: samplePath,
6266+
Mode: pb.SubscriptionMode_SAMPLE,
6267+
SampleInterval: 1000000000, // 1 second
6268+
},
6269+
},
6270+
},
6271+
},
6272+
}
6273+
6274+
updates := make([][]*pb.SubscribeResponse, 2)
6275+
wg := sync.WaitGroup{}
6276+
6277+
// Start a goroutine to change the description of the port every 100ms.
6278+
wg.Add(1)
6279+
go func() {
6280+
defer wg.Done()
6281+
end := time.Now().Add(5 * time.Second)
6282+
for time.Now().Before(end) {
6283+
if err := rclient.HSet(context.Background(), "PORT|Ethernet1/1/1", "description", time.Now().String()).Err(); err != nil {
6284+
t.Errorf("Failed to set description: %v", err)
6285+
return
6286+
}
6287+
time.Sleep(100 * time.Millisecond)
6288+
}
6289+
}()
6290+
6291+
// Start two Sample subscriptions and collect their responses.
6292+
for i := 0; i < 2; i++ {
6293+
wg.Add(1)
6294+
go func(i int) {
6295+
defer wg.Done()
6296+
subUpdates := []*pb.SubscribeResponse{}
6297+
ctx, cancel := context.WithTimeout(context.Background(), 5500*time.Millisecond)
6298+
defer cancel()
6299+
6300+
stream, err := gClient.Subscribe(ctx, grpc.MaxCallRecvMsgSize(6000000))
6301+
if err != nil {
6302+
t.Error(err.Error())
6303+
return
6304+
}
6305+
if err = stream.Send(subReq); err != nil {
6306+
t.Errorf("Failed to send subscription: %v", err)
6307+
return
6308+
}
6309+
6310+
syncReceived := false
6311+
for {
6312+
resp, err := stream.Recv()
6313+
if err != nil {
6314+
break
6315+
}
6316+
if resp.GetSyncResponse() {
6317+
syncReceived = true
6318+
continue
6319+
}
6320+
if syncReceived {
6321+
subUpdates = append(subUpdates, resp)
6322+
}
6323+
}
6324+
updates[i] = subUpdates
6325+
}(i)
6326+
time.Sleep(300 * time.Millisecond)
6327+
}
6328+
wg.Wait()
6329+
6330+
if !reflect.DeepEqual(updates[0], updates[1]) {
6331+
t.Errorf("Updates received do not match for identical subscriptions!\nFirst Client's Updates:%v\nSeconds Client's Updates:%v", updates[0], updates[1])
6332+
}
6333+
}
6334+
62336335
func init() {
62346336
// Enable logs at UT setup
62356337
flag.Lookup("v").Value.Set("10")
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
package client
2+
3+
import (
4+
"fmt"
5+
log "github.com/golang/glog"
6+
"github.com/golang/protobuf/proto"
7+
gnmipb "github.com/openconfig/gnmi/proto/gnmi"
8+
spb "github.com/sonic-net/sonic-gnmi/proto"
9+
"sync"
10+
"sync/atomic"
11+
"time"
12+
)
13+
14+
var superSubs = superSubscriptions{
15+
mu: &sync.Mutex{},
16+
subs: map[*superSubscription]bool{},
17+
}
18+
19+
type superSubscriptions struct {
20+
mu *sync.Mutex
21+
subs map[*superSubscription]bool
22+
}
23+
24+
// superSubscription is used to deduplicate subscriptions. Stream Subscriptions
25+
// become part of a superSubscription and whenever a Sample is processed, the
26+
// response is sent to all clients that are part of the superSubscription.
27+
type superSubscription struct {
28+
mu *sync.RWMutex
29+
clients map[*TranslClient]struct{}
30+
request *gnmipb.SubscriptionList
31+
primaryClient *TranslClient
32+
tickers map[int]*time.Ticker // map of interval duration (nanoseconds) to ticker.
33+
sharedUpdates atomic.Uint64
34+
exclusiveUpdates atomic.Uint64
35+
}
36+
37+
// ------------- Super Subscription Functions -------------
38+
// createSuperSubscription takes a SubscriptionList and returns a new
39+
// superSubscription for that SubscriptionList. This function expects the
40+
// caller to already hold superSubs.mu before calling createSuperSubscription.
41+
func createSuperSubscription(subscription *gnmipb.SubscriptionList) *superSubscription {
42+
if subscription == nil {
43+
return nil
44+
}
45+
newSuperSub := &superSubscription{
46+
mu: &sync.RWMutex{},
47+
clients: map[*TranslClient]struct{}{},
48+
request: subscription,
49+
primaryClient: nil,
50+
tickers: map[int]*time.Ticker{},
51+
sharedUpdates: atomic.Uint64{},
52+
exclusiveUpdates: atomic.Uint64{},
53+
}
54+
if _, ok := superSubs.subs[newSuperSub]; ok {
55+
// This should never happen.
56+
log.V(0).Infof("Super Subscription (%p) for %v already exists but a new has been created!", newSuperSub, subscription)
57+
}
58+
superSubs.subs[newSuperSub] = true
59+
return newSuperSub
60+
}
61+
62+
// findSuperSubscription takes a SubscriptionList and tries to find an
63+
// existing superSubscription for that SubscriptionList. If one is found,
64+
// the superSubscription is returned. Else, nil is returned. This function
65+
// expects the caller to already hold superSubs.mu before calling findSuperSubscription.
66+
func findSuperSubscription(subscription *gnmipb.SubscriptionList) *superSubscription {
67+
if subscription == nil {
68+
return nil
69+
}
70+
for sub, _ := range superSubs.subs {
71+
if sub.request == nil {
72+
continue
73+
}
74+
if proto.Equal(sub.request, subscription) {
75+
return sub
76+
}
77+
}
78+
return nil
79+
}
80+
81+
// deleteSuperSub removes superSub from the superSubs map.
82+
// If the superSub is removed from the TranslClient, there
83+
// should be no remaining references to the superSub. This
84+
// function expects the caller to already hold superSubs.mu
85+
// before calling deleteSuperSubscription.
86+
func deleteSuperSubscription(superSub *superSubscription) {
87+
if superSub == nil {
88+
log.V(0).Info("deleteSuperSubscription called on a nil Super Subscription!")
89+
return
90+
}
91+
tickerCleanup(superSub.tickers)
92+
delete(superSubs.subs, superSub)
93+
}
94+
95+
// ------------- Super Subscription Methods -------------
96+
// sendNotifications takes a value and adds it to the notification
97+
// queue for each subscription in the superSubscription.
98+
func (ss *superSubscription) sendNotifications(v *spb.Value) {
99+
if v == nil {
100+
return
101+
}
102+
ss.mu.RLock()
103+
defer ss.mu.RUnlock()
104+
for client, _ := range ss.clients {
105+
value := proto.Clone(v).(*spb.Value)
106+
client.q.Put(Value{value})
107+
}
108+
}
109+
110+
// populateTickers populates the ticker_info objects in the intervalToTickerInfoMap with the
111+
// shared tickers. If tickers don't exist yet, they are created.
112+
func (ss *superSubscription) populateTickers(intervalToTickerInfoMap map[int][]*ticker_info) error {
113+
if intervalToTickerInfoMap == nil {
114+
return fmt.Errorf("Invalid intervalToTickerInfoMap passed in: %v", intervalToTickerInfoMap)
115+
}
116+
ss.mu.Lock()
117+
defer ss.mu.Unlock()
118+
if len(ss.tickers) == 0 {
119+
// Create the tickers.
120+
for interval, tInfos := range intervalToTickerInfoMap {
121+
ticker := time.NewTicker(time.Duration(interval) * time.Nanosecond)
122+
ss.tickers[interval] = ticker
123+
for _, tInfo := range tInfos {
124+
tInfo.t = ticker
125+
}
126+
}
127+
return nil
128+
}
129+
// Use the existing tickers.
130+
if len(ss.tickers) != len(intervalToTickerInfoMap) {
131+
return fmt.Errorf("Length of intervalToTickerInfoMap does not match length of existing tickers for Super Subscription! existing tickers=%v, intervalToTickerInfoMap=%v", ss.tickers, intervalToTickerInfoMap)
132+
}
133+
for interval, tInfos := range intervalToTickerInfoMap {
134+
ticker, ok := ss.tickers[interval]
135+
if !ok {
136+
return fmt.Errorf("Interval in intervalToTickerInfoMap not found in existing tickers for Super Subscription! interval=%v", interval)
137+
}
138+
for _, tInfo := range tInfos {
139+
tInfo.t = ticker
140+
}
141+
}
142+
return nil
143+
}
144+
func (ss *superSubscription) String() string {
145+
return fmt.Sprintf("[{%p} NumClients=%d, SharedUpdates=%d, ExclusiveUpdates=%d, Request=%v]", ss, len(ss.clients), ss.sharedUpdates.Load(), ss.exclusiveUpdates.Load(), ss.request)
146+
}
147+
148+
// ------------- TranslClient Methods -------------
149+
// isPrimary returns true if the client is the primary client of its superSubscription.
150+
func (c *TranslClient) isPrimary() bool {
151+
if c == nil || c.superSub == nil {
152+
return false
153+
}
154+
c.superSub.mu.RLock()
155+
defer c.superSub.mu.RUnlock()
156+
return c.superSub.primaryClient == c
157+
}
158+
159+
// leaveSuperSubscription removes the client from the superSubscription.
160+
// If there are no remaining clients in the superSubscription, it is deleted.
161+
func (c *TranslClient) leaveSuperSubscription() {
162+
if c == nil || c.superSub == nil {
163+
return
164+
}
165+
superSubs.mu.Lock()
166+
defer superSubs.mu.Unlock()
167+
c.superSub.mu.Lock()
168+
defer c.superSub.mu.Unlock()
169+
delete(c.superSub.clients, c)
170+
if len(c.superSub.clients) == 0 {
171+
deleteSuperSubscription(c.superSub)
172+
log.V(2).Infof("SuperSubscription (%s) closing!", c.superSub)
173+
} else if c.superSub.primaryClient == c {
174+
// Set a new primary client.
175+
for client := range c.superSub.clients {
176+
c.superSub.primaryClient = client
177+
client.wakeChan <- true
178+
break
179+
}
180+
log.V(2).Infof("SuperSubscription (%s): %p is now the primary client", c.superSub, c.superSub.primaryClient)
181+
}
182+
}
183+
184+
// addClientToSuperSubscription adds a client to a superSubscription.
185+
func (c *TranslClient) addClientToSuperSubscription(subscription *gnmipb.SubscriptionList) {
186+
if c == nil || subscription == nil {
187+
return
188+
}
189+
superSubs.mu.Lock()
190+
defer superSubs.mu.Unlock()
191+
superSub := findSuperSubscription(subscription)
192+
if superSub == nil {
193+
superSub = createSuperSubscription(subscription)
194+
}
195+
superSub.mu.Lock()
196+
defer superSub.mu.Unlock()
197+
c.superSub = superSub
198+
superSub.clients[c] = struct{}{}
199+
if superSub.primaryClient == nil {
200+
superSub.primaryClient = c
201+
}
202+
log.V(2).Infof("SuperSubscription (%s): added new client=%p", superSub, c)
203+
}

0 commit comments

Comments
 (0)