@@ -3,154 +3,165 @@ package pubsub
33import (
44 "context"
55 "encoding/json"
6- "github.com/xgodev/boost/model/errors"
7- "github.com/xgodev/boost/wrapper/log"
8- "github.com/xgodev/boost/wrapper/publisher"
6+ "sync"
97 "time"
108
119 "cloud.google.com/go/pubsub"
1210 v2 "github.com/cloudevents/sdk-go/v2"
1311 "github.com/matryer/try"
12+
13+ "github.com/xgodev/boost/model/errors"
14+ "github.com/xgodev/boost/wrapper/log"
15+ "github.com/xgodev/boost/wrapper/publisher"
1416)
1517
16- // client represents a pubsub client .
18+ // client implements a reusable Pub/Sub publisher .
1719type client struct {
1820 client * pubsub.Client
1921 options * Options
20- }
2122
22- // NewWithConfigPath returns connection with options from config path.
23- func NewWithConfigPath (ctx context.Context , c * pubsub.Client , path string ) (publisher.Driver , error ) {
24- options , err := NewOptionsWithPath (path )
25- if err != nil {
26- return nil , err
27- }
28- return NewWithOptions (ctx , c , options ), nil
23+ mu sync.Mutex
24+ topics map [string ]* pubsub.Topic
2925}
3026
31- // NewWithOptions returns connection with options .
27+ // NewWithOptions returns a publisher with a topic cache .
3228func NewWithOptions (ctx context.Context , c * pubsub.Client , options * Options ) publisher.Driver {
33- return & client {options : options , client : c }
34- }
35-
36- // New creates a new pubsub client.
37- func New (ctx context.Context , c * pubsub.Client ) (publisher.Driver , error ) {
38-
39- options , err := NewOptions ()
40- if err != nil {
41- return nil , err
29+ return & client {
30+ client : c ,
31+ options : options ,
32+ topics : make (map [string ]* pubsub.Topic ),
4233 }
43-
44- return NewWithOptions (ctx , c , options ), nil
4534}
4635
47- // Publish publishes an event slice .
36+ // Publish sends a batch of events to Pub/Sub .
4837func (p * client ) Publish (ctx context.Context , events []* v2.Event ) ([]publisher.PublishOutput , error ) {
49-
5038 logger := log .FromContext (ctx ).WithTypeOf (* p )
39+ logger .Info ("publishing to Pub/Sub" )
5140
52- logger .Info ("publishing to pubsub" )
53-
54- if len (events ) > 0 {
55-
56- return p .send (ctx , events )
57-
41+ if len (events ) == 0 {
42+ logger .Warn ("no messages to publish" )
43+ return nil , nil
5844 }
59-
60- logger .Warnf ("no messages were reported for posting" )
61-
62- return []publisher.PublishOutput {}, nil
45+ return p .send (ctx , events )
6346}
6447
65- func ( p * client ) send ( ctx context. Context , events [] * v2. Event ) ( res []publisher. PublishOutput , err error ) {
66-
67- logger := log . FromContext ( ctx ). WithTypeOf ( * p )
48+ // send iterates over the events and publishes them using cached topics.
49+ func ( p * client ) send ( ctx context. Context , events [] * v2. Event ) ([]publisher. PublishOutput , error ) {
50+ var results []publisher. PublishOutput
6851
69- for _ , out := range events {
52+ for _ , ev := range events {
53+ logger := log .FromContext (ctx ).WithTypeOf (* p ).
54+ WithField ("subject" , ev .Subject ()).
55+ WithField ("id" , ev .ID ())
7056
57+ // Convert event data to a generic map.
7158 var data map [string ]interface {}
72- if err := out .DataAs (& data ); err != nil {
73- res = append (res , publisher.PublishOutput {Event : out , Error : errors .Wrap (err , errors .Internalf ("unable to convert data to interface" ))})
59+ if err := ev .DataAs (& data ); err != nil {
60+ results = append (results , publisher.PublishOutput {
61+ Event : ev ,
62+ Error : errors .Wrap (err , errors .Internalf ("failed to convert event data" )),
63+ })
7464 continue
7565 }
7666
77- var rawMessage [] byte
78- rawMessage , err = json .Marshal (data )
67+ // Serialize data to JSON.
68+ raw , err : = json .Marshal (data )
7969 if err != nil {
80- res = append (res , publisher.PublishOutput {Event : out , Error : errors .Wrap (err , errors .Internalf ("error on marshal" ))})
70+ results = append (results , publisher.PublishOutput {
71+ Event : ev ,
72+ Error : errors .Wrap (err , errors .Internalf ("failed to marshal data" )),
73+ })
8174 continue
8275 }
8376
77+ // Build CloudEvents attributes.
8478 attrs := map [string ]string {
85- "ce_specversion" : out .SpecVersion (),
86- "ce_id" : out .ID (),
87- "ce_source" : out .Source (),
88- "ce_type" : out .Type (),
89- "content-type" : out .DataContentType (),
90- "ce_time" : out .Time ().String (),
79+ "ce_specversion" : ev .SpecVersion (),
80+ "ce_id" : ev .ID (),
81+ "ce_source" : ev .Source (),
82+ "ce_type" : ev .Type (),
83+ "content-type" : ev .DataContentType (),
84+ "ce_time" : ev .Time ().String (),
9185 "ce_path" : "/" ,
92- "ce_subject" : out .Subject (),
86+ "ce_subject" : ev .Subject (),
9387 }
9488
95- message := & pubsub. Message {
96- ID : out . ID (),
97- Data : rawMessage ,
98- Attributes : attrs ,
99- PublishTime : time . Now () ,
100- DeliveryAttempt : nil ,
89+ // Create Pub/Sub message.
90+ msg := & pubsub. Message {
91+ ID : ev . ID () ,
92+ Data : raw ,
93+ Attributes : attrs ,
94+ PublishTime : time . Now () ,
10195 }
10296
97+ // Set ordering key if enabled.
10398 if p .options .OrderingKey {
104- pk , err := p .partitionKey ( out )
99+ pk , err := p .getPartitionKey ( ev )
105100 if err != nil {
106- res = append (res , publisher.PublishOutput {Event : out , Error : errors .Wrap (err , errors .Internalf ("unable to gets partition key" ))})
101+ results = append (results , publisher.PublishOutput {
102+ Event : ev ,
103+ Error : errors .Wrap (err , errors .Internalf ("failed to get partition key" )),
104+ })
107105 continue
108106 }
109- message .OrderingKey = pk
107+ msg .OrderingKey = pk
110108 }
111109
112- topic := p .client .Topic (out .Subject ())
113- defer topic .Stop ()
114-
115- l := logger .WithField ("subject" , out .Subject ()).
116- WithField ("id" , out .ID ())
110+ // Retrieve cached topic or create a new one.
111+ topic := p .getTopic (ev .Subject ())
117112
113+ // Publish with retry logic.
118114 err = try .Do (func (attempt int ) (bool , error ) {
119-
120- l .Tracef ("publishing message to topic %s attempt %v" , out .Subject (), attempt )
121-
122- r := topic .Publish (ctx , message )
123- if _ , err := r .Get (ctx ); err != nil {
115+ logger .Tracef ("publishing to topic %s, attempt %d" , ev .Subject (), attempt )
116+ res := topic .Publish (ctx , msg )
117+ if _ , err := res .Get (ctx ); err != nil {
124118 log .Error (err )
125- return attempt < 5 , errors .NewInternal (err , "could not be published in gcp pubsub " )
119+ return attempt < 5 , errors .NewInternal (err , "Pub/Sub publish failed " )
126120 }
127- l .Infof ("message published" )
128- l .Debugf (string (rawMessage ))
121+ logger .Infof ("message published" )
122+ logger .Debugf ("payload: %s" , string (raw ))
129123 return false , nil
130124 })
131125
132126 if err != nil {
133- res = append (res , publisher.PublishOutput {Event : out , Error : err })
134- continue
127+ results = append (results , publisher.PublishOutput {Event : ev , Error : err })
128+ } else {
129+ results = append (results , publisher.PublishOutput {Event : ev })
135130 }
136-
137- res = append (res , publisher.PublishOutput {Event : out })
138-
139131 }
140132
141- return res , err
133+ return results , nil
142134}
143135
144- func (p * client ) partitionKey (out * v2.Event ) (string , error ) {
136+ // getTopic returns a cached Pub/Sub topic or creates it on first use.
137+ func (p * client ) getTopic (subject string ) * pubsub.Topic {
138+ p .mu .Lock ()
139+ defer p .mu .Unlock ()
140+
141+ if t , ok := p .topics [subject ]; ok {
142+ return t
143+ }
144+ t := p .client .Topic (subject )
145+ p .topics [subject ] = t
146+ return t
147+ }
145148
146- var pk string
147- exts := out .Extensions ()
149+ // Close stops all cached topics' background goroutines.
150+ // Call this when the publisher is shutting down.
151+ func (p * client ) Close () {
152+ p .mu .Lock ()
153+ defer p .mu .Unlock ()
148154
149- if key , ok := exts ["key" ]; ok {
150- pk = key .(string )
151- } else {
152- pk = out .ID ()
155+ for _ , t := range p .topics {
156+ t .Stop ()
153157 }
158+ p .topics = nil
159+ }
154160
155- return pk , nil
161+ // getPartitionKey extracts the ordering key extension or uses the event ID.
162+ func (p * client ) getPartitionKey (ev * v2.Event ) (string , error ) {
163+ if key , ok := ev .Extensions ()["key" ]; ok {
164+ return key .(string ), nil
165+ }
166+ return ev .ID (), nil
156167}
0 commit comments