@@ -166,41 +166,6 @@ func (t *BoltTransport) Dispatch(update *Update) error {
166166 return nil
167167}
168168
169- // persist stores update in the database.
170- func (t * BoltTransport ) persist (updateID string , updateJSON []byte ) error {
171- if err := t .db .Update (func (tx * bolt.Tx ) error {
172- bucket , err := tx .CreateBucketIfNotExists ([]byte (t .bucketName ))
173- if err != nil {
174- return fmt .Errorf ("error when creating Bolt DB bucket: %w" , err )
175- }
176-
177- seq , err := bucket .NextSequence ()
178- if err != nil {
179- return fmt .Errorf ("error when generating Bolt DB sequence: %w" , err )
180- }
181- prefix := make ([]byte , 8 )
182- binary .BigEndian .PutUint64 (prefix , seq )
183-
184- // The sequence value is prepended to the update id to create an ordered list
185- key := bytes .Join ([][]byte {prefix , []byte (updateID )}, []byte {})
186-
187- // The DB is append-only
188- bucket .FillPercent = 1
189-
190- t .lastSeq = seq
191- t .lastEventID = updateID
192- if err := bucket .Put (key , updateJSON ); err != nil {
193- return fmt .Errorf ("unable to put value in Bolt DB: %w" , err )
194- }
195-
196- return t .cleanup (bucket , seq )
197- }); err != nil {
198- return fmt .Errorf ("bolt error: %w" , err )
199- }
200-
201- return nil
202- }
203-
204169// AddSubscriber adds a new subscriber to the transport.
205170func (t * BoltTransport ) AddSubscriber (s * LocalSubscriber ) error {
206171 select {
@@ -211,7 +176,7 @@ func (t *BoltTransport) AddSubscriber(s *LocalSubscriber) error {
211176
212177 t .Lock ()
213178 t .subscribers .Add (s )
214- toSeq := t .lastSeq //nolint:ifshort
179+ toSeq := t .lastSeq
215180 t .Unlock ()
216181
217182 if s .RequestLastEventID != "" {
@@ -248,6 +213,29 @@ func (t *BoltTransport) GetSubscribers() (string, []*Subscriber, error) {
248213 return t .lastEventID , getSubscribers (t .subscribers ), nil
249214}
250215
216+ // Close closes the Transport.
217+ func (t * BoltTransport ) Close () (err error ) {
218+ t .closedOnce .Do (func () {
219+ close (t .closed )
220+
221+ t .Lock ()
222+ defer t .Unlock ()
223+
224+ t .subscribers .Walk (0 , func (s * LocalSubscriber ) bool {
225+ s .Disconnect ()
226+
227+ return true
228+ })
229+ err = t .db .Close ()
230+ })
231+
232+ if err == nil {
233+ return nil
234+ }
235+
236+ return fmt .Errorf ("unable to close Bolt DB: %w" , err )
237+ }
238+
251239//nolint:gocognit
252240func (t * BoltTransport ) dispatchHistory (s * LocalSubscriber , toSeq uint64 ) error {
253241 err := t .db .View (func (tx * bolt.Tx ) error {
@@ -303,27 +291,39 @@ func (t *BoltTransport) dispatchHistory(s *LocalSubscriber, toSeq uint64) error
303291 return nil
304292}
305293
306- // Close closes the Transport.
307- func (t * BoltTransport ) Close () (err error ) {
308- t .closedOnce .Do (func () {
309- close (t .closed )
294+ // persist stores update in the database.
295+ func (t * BoltTransport ) persist (updateID string , updateJSON []byte ) error {
296+ if err := t .db .Update (func (tx * bolt.Tx ) error {
297+ bucket , err := tx .CreateBucketIfNotExists ([]byte (t .bucketName ))
298+ if err != nil {
299+ return fmt .Errorf ("error when creating Bolt DB bucket: %w" , err )
300+ }
310301
311- t .Lock ()
312- defer t .Unlock ()
302+ seq , err := bucket .NextSequence ()
303+ if err != nil {
304+ return fmt .Errorf ("error when generating Bolt DB sequence: %w" , err )
305+ }
306+ prefix := make ([]byte , 8 )
307+ binary .BigEndian .PutUint64 (prefix , seq )
313308
314- t . subscribers . Walk ( 0 , func ( s * LocalSubscriber ) bool {
315- s . Disconnect ( )
309+ // The sequence value is prepended to the update id to create an ordered list
310+ key := bytes . Join ([][] byte { prefix , [] byte ( updateID )}, [] byte {} )
316311
317- return true
318- })
319- err = t .db .Close ()
320- })
312+ // The DB is append-only
313+ bucket .FillPercent = 1
321314
322- if err == nil {
323- return nil
315+ t .lastSeq = seq
316+ t .lastEventID = updateID
317+ if err := bucket .Put (key , updateJSON ); err != nil {
318+ return fmt .Errorf ("unable to put value in Bolt DB: %w" , err )
319+ }
320+
321+ return t .cleanup (bucket , seq )
322+ }); err != nil {
323+ return fmt .Errorf ("bolt error: %w" , err )
324324 }
325325
326- return fmt . Errorf ( "unable to close Bolt DB: %w" , err )
326+ return nil
327327}
328328
329329// cleanup removes entries in the history above the size limit, triggered probabilistically.
0 commit comments