Skip to content

Commit 8dfa4b6

Browse files
sadath-12ItalyPaleAleberndverst
authored
GCP Pubsub: cache topics (dapr#3241)
Signed-off-by: sadath-12 <[email protected]> Signed-off-by: syedsadath-17 <[email protected]> Signed-off-by: Alessandro (Ale) Segala <[email protected]> Signed-off-by: Bernd Verst <[email protected]> Co-authored-by: Alessandro (Ale) Segala <[email protected]> Co-authored-by: Bernd Verst <[email protected]>
1 parent 79adc56 commit 8dfa4b6

File tree

1 file changed

+101
-15
lines changed

1 file changed

+101
-15
lines changed

pubsub/gcp/pubsub/pubsub.go

Lines changed: 101 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,15 @@ type GCPPubSub struct {
5454
metadata *metadata
5555
logger logger.Logger
5656

57-
closed atomic.Bool
58-
closeCh chan struct{}
59-
wg sync.WaitGroup
57+
closed atomic.Bool
58+
closeCh chan struct{}
59+
wg sync.WaitGroup
60+
topicCache map[string]cacheEntry
61+
lock *sync.RWMutex
62+
}
63+
64+
type cacheEntry struct {
65+
LastSync time.Time
6066
}
6167

6268
type GCPAuthJSON struct {
@@ -76,9 +82,39 @@ type WhatNow struct {
7682
Type string `json:"type"`
7783
}
7884

85+
const topicCacheRefreshInterval = 5 * time.Hour
86+
7987
// NewGCPPubSub returns a new GCPPubSub instance.
8088
func NewGCPPubSub(logger logger.Logger) pubsub.PubSub {
81-
return &GCPPubSub{logger: logger, closeCh: make(chan struct{})}
89+
client := &GCPPubSub{
90+
logger: logger,
91+
closeCh: make(chan struct{}),
92+
topicCache: make(map[string]cacheEntry),
93+
lock: &sync.RWMutex{},
94+
}
95+
return client
96+
}
97+
98+
func (g *GCPPubSub) periodicCacheRefresh() {
99+
// Run this loop 5 times every topicCacheRefreshInterval, to be able to delete items that are stale
100+
ticker := time.NewTicker(topicCacheRefreshInterval / 5)
101+
defer ticker.Stop()
102+
103+
for {
104+
select {
105+
case <-g.closeCh:
106+
return
107+
case <-ticker.C:
108+
g.lock.Lock()
109+
for key, entry := range g.topicCache {
110+
// Delete from the cache if the last sync was longer than topicCacheRefreshInterval
111+
if time.Since(entry.LastSync) > topicCacheRefreshInterval {
112+
delete(g.topicCache, key)
113+
}
114+
}
115+
g.lock.Unlock()
116+
}
117+
}
82118
}
83119

84120
func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) {
@@ -110,6 +146,12 @@ func (g *GCPPubSub) Init(ctx context.Context, meta pubsub.Metadata) error {
110146
return err
111147
}
112148

149+
g.wg.Add(1)
150+
go func() {
151+
defer g.wg.Done()
152+
g.periodicCacheRefresh()
153+
}()
154+
113155
pubsubClient, err := g.getPubSubClient(ctx, metadata)
114156
if err != nil {
115157
return fmt.Errorf("%s error creating pubsub client: %w", errorMessagePrefix, err)
@@ -174,12 +216,22 @@ func (g *GCPPubSub) Publish(ctx context.Context, req *pubsub.PublishRequest) err
174216
if g.closed.Load() {
175217
return errors.New("component is closed")
176218
}
219+
g.lock.RLock()
220+
_, topicExists := g.topicCache[req.Topic]
221+
g.lock.RUnlock()
177222

178-
if !g.metadata.DisableEntityManagement {
223+
// We are not acquiring a write lock before calling ensureTopic, so there's the chance that ensureTopic be called multiple time
224+
// This is acceptable in our case, even is slightly wasteful, as ensureTopic is idempotent
225+
if !g.metadata.DisableEntityManagement && !topicExists {
179226
err := g.ensureTopic(ctx, req.Topic)
180227
if err != nil {
181-
return fmt.Errorf("%s could not get valid topic %s, %s", errorMessagePrefix, req.Topic, err)
228+
return fmt.Errorf("%s could not get valid topic %s: %w", errorMessagePrefix, req.Topic, err)
229+
}
230+
g.lock.Lock()
231+
g.topicCache[req.Topic] = cacheEntry{
232+
LastSync: time.Now(),
182233
}
234+
g.lock.Unlock()
183235
}
184236

185237
topic := g.getTopic(req.Topic)
@@ -210,12 +262,22 @@ func (g *GCPPubSub) Subscribe(parentCtx context.Context, req pubsub.SubscribeReq
210262
if g.closed.Load() {
211263
return errors.New("component is closed")
212264
}
265+
g.lock.RLock()
266+
_, topicExists := g.topicCache[req.Topic]
267+
g.lock.RUnlock()
213268

214-
if !g.metadata.DisableEntityManagement {
269+
// We are not acquiring a write lock before calling ensureTopic, so there's the chance that ensureTopic be called multiple times
270+
// This is acceptable in our case, even is slightly wasteful, as ensureTopic is idempotent
271+
if !g.metadata.DisableEntityManagement && !topicExists {
215272
topicErr := g.ensureTopic(parentCtx, req.Topic)
216273
if topicErr != nil {
217-
return fmt.Errorf("%s could not get valid topic - topic:%q, error: %v", errorMessagePrefix, req.Topic, topicErr)
274+
return fmt.Errorf("%s could not get valid topic - topic:%q, error: %w", errorMessagePrefix, req.Topic, topicErr)
275+
}
276+
g.lock.Lock()
277+
g.topicCache[req.Topic] = cacheEntry{
278+
LastSync: time.Now(),
218279
}
280+
g.lock.Unlock()
219281

220282
subError := g.ensureSubscription(parentCtx, g.metadata.ConsumerID, req.Topic)
221283
if subError != nil {
@@ -354,9 +416,24 @@ func (g *GCPPubSub) getTopic(topic string) *gcppubsub.Topic {
354416
}
355417

356418
func (g *GCPPubSub) ensureSubscription(parentCtx context.Context, subscription string, topic string) error {
357-
err := g.ensureTopic(parentCtx, topic)
358-
if err != nil {
359-
return err
419+
g.lock.RLock()
420+
_, topicOK := g.topicCache[topic]
421+
_, dlTopicOK := g.topicCache[g.metadata.DeadLetterTopic]
422+
g.lock.RUnlock()
423+
if !topicOK {
424+
g.lock.Lock()
425+
// Double-check if the topic still doesn't exist to avoid race condition
426+
if _, ok := g.topicCache[topic]; !ok {
427+
err := g.ensureTopic(parentCtx, topic)
428+
if err != nil {
429+
g.lock.Unlock()
430+
return err
431+
}
432+
g.topicCache[topic] = cacheEntry{
433+
LastSync: time.Now(),
434+
}
435+
}
436+
g.lock.Unlock()
360437
}
361438

362439
managedSubscription := subscription + "-" + topic
@@ -369,11 +446,20 @@ func (g *GCPPubSub) ensureSubscription(parentCtx context.Context, subscription s
369446
EnableMessageOrdering: g.metadata.EnableMessageOrdering,
370447
}
371448

372-
if g.metadata.DeadLetterTopic != "" {
373-
subErr = g.ensureTopic(parentCtx, g.metadata.DeadLetterTopic)
374-
if subErr != nil {
375-
return subErr
449+
if g.metadata.DeadLetterTopic != "" && !dlTopicOK {
450+
g.lock.Lock()
451+
// Double-check if the DeadLetterTopic still doesn't exist to avoid race condition
452+
if _, ok := g.topicCache[g.metadata.DeadLetterTopic]; !ok {
453+
subErr = g.ensureTopic(parentCtx, g.metadata.DeadLetterTopic)
454+
if subErr != nil {
455+
g.lock.Unlock()
456+
return subErr
457+
}
458+
g.topicCache[g.metadata.DeadLetterTopic] = cacheEntry{
459+
LastSync: time.Now(),
460+
}
376461
}
462+
g.lock.Unlock()
377463
dlTopic := fmt.Sprintf("projects/%s/topics/%s", g.metadata.ProjectID, g.metadata.DeadLetterTopic)
378464
subConfig.DeadLetterPolicy = &gcppubsub.DeadLetterPolicy{
379465
DeadLetterTopic: dlTopic,

0 commit comments

Comments
 (0)