diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 378e18e9..2c9e8bc4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,7 +29,7 @@ jobs: caddy/go.sum - name: golangci-lint - uses: golangci/golangci-lint-action@v6 + uses: golangci/golangci-lint-action@v7 with: version: latest args: --timeout=30m diff --git a/.golangci.yml b/.golangci.yml index f0a8de04..1c94ef20 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,41 +1,56 @@ ---- +version: "2" run: tests: true - linters: - # inverted configuration with `enable-all` and `disable` is not scalable during updates of golangci-lint - enable-all: true + default: all disable: + - canonicalheader + - cyclop + - depguard - errcheck - - lll - - wsl - - testpackage - exhaustruct - - paralleltest - - cyclop - forcetypeassert - - tagliatelle - - varnamelen - - nonamedreturns - - testableexamples - - musttag - - depguard - - mnd - - # Go 1.22+ - intrange - - # weird issues + - lll + - mnd + - musttag - nolintlint - - canonicalheader - -issues: - exclude-rules: - - path: _test\.go - linters: - - gochecknoglobals - - funlen - - godox - - noctx - - wrapcheck - - goconst + - nonamedreturns + - paralleltest + - tagliatelle + - testableexamples + - testpackage + - varnamelen + - wsl + exclusions: + generated: lax + presets: + - comments + - common-false-positives + - legacy + - std-error-handling + rules: + - linters: + - funlen + - gochecknoglobals + - goconst + - godox + - noctx + - wrapcheck + path: _test\.go + paths: + - third_party$ + - builtin$ + - examples$ +formatters: + enable: + - gci + - gofmt + - gofumpt + - goimports + exclusions: + generated: lax + paths: + - third_party$ + - builtin$ + - examples$ diff --git a/bolt.go b/bolt.go index 03f871da..7b1d5098 100644 --- a/bolt.go +++ b/bolt.go @@ -166,41 +166,6 @@ func (t *BoltTransport) Dispatch(update *Update) error { return nil } -// persist stores update in the database. -func (t *BoltTransport) persist(updateID string, updateJSON []byte) error { - if err := t.db.Update(func(tx *bolt.Tx) error { - bucket, err := tx.CreateBucketIfNotExists([]byte(t.bucketName)) - if err != nil { - return fmt.Errorf("error when creating Bolt DB bucket: %w", err) - } - - seq, err := bucket.NextSequence() - if err != nil { - return fmt.Errorf("error when generating Bolt DB sequence: %w", err) - } - prefix := make([]byte, 8) - binary.BigEndian.PutUint64(prefix, seq) - - // The sequence value is prepended to the update id to create an ordered list - key := bytes.Join([][]byte{prefix, []byte(updateID)}, []byte{}) - - // The DB is append-only - bucket.FillPercent = 1 - - t.lastSeq = seq - t.lastEventID = updateID - if err := bucket.Put(key, updateJSON); err != nil { - return fmt.Errorf("unable to put value in Bolt DB: %w", err) - } - - return t.cleanup(bucket, seq) - }); err != nil { - return fmt.Errorf("bolt error: %w", err) - } - - return nil -} - // AddSubscriber adds a new subscriber to the transport. func (t *BoltTransport) AddSubscriber(s *LocalSubscriber) error { select { @@ -211,7 +176,7 @@ func (t *BoltTransport) AddSubscriber(s *LocalSubscriber) error { t.Lock() t.subscribers.Add(s) - toSeq := t.lastSeq //nolint:ifshort + toSeq := t.lastSeq t.Unlock() if s.RequestLastEventID != "" { @@ -248,6 +213,29 @@ func (t *BoltTransport) GetSubscribers() (string, []*Subscriber, error) { return t.lastEventID, getSubscribers(t.subscribers), nil } +// Close closes the Transport. +func (t *BoltTransport) Close() (err error) { + t.closedOnce.Do(func() { + close(t.closed) + + t.Lock() + defer t.Unlock() + + t.subscribers.Walk(0, func(s *LocalSubscriber) bool { + s.Disconnect() + + return true + }) + err = t.db.Close() + }) + + if err == nil { + return nil + } + + return fmt.Errorf("unable to close Bolt DB: %w", err) +} + //nolint:gocognit func (t *BoltTransport) dispatchHistory(s *LocalSubscriber, toSeq uint64) error { err := t.db.View(func(tx *bolt.Tx) error { @@ -303,27 +291,39 @@ func (t *BoltTransport) dispatchHistory(s *LocalSubscriber, toSeq uint64) error return nil } -// Close closes the Transport. -func (t *BoltTransport) Close() (err error) { - t.closedOnce.Do(func() { - close(t.closed) +// persist stores update in the database. +func (t *BoltTransport) persist(updateID string, updateJSON []byte) error { + if err := t.db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists([]byte(t.bucketName)) + if err != nil { + return fmt.Errorf("error when creating Bolt DB bucket: %w", err) + } - t.Lock() - defer t.Unlock() + seq, err := bucket.NextSequence() + if err != nil { + return fmt.Errorf("error when generating Bolt DB sequence: %w", err) + } + prefix := make([]byte, 8) + binary.BigEndian.PutUint64(prefix, seq) - t.subscribers.Walk(0, func(s *LocalSubscriber) bool { - s.Disconnect() + // The sequence value is prepended to the update id to create an ordered list + key := bytes.Join([][]byte{prefix, []byte(updateID)}, []byte{}) - return true - }) - err = t.db.Close() - }) + // The DB is append-only + bucket.FillPercent = 1 - if err == nil { - return nil + t.lastSeq = seq + t.lastEventID = updateID + if err := bucket.Put(key, updateJSON); err != nil { + return fmt.Errorf("unable to put value in Bolt DB: %w", err) + } + + return t.cleanup(bucket, seq) + }); err != nil { + return fmt.Errorf("bolt error: %w", err) } - return fmt.Errorf("unable to close Bolt DB: %w", err) + return nil } // cleanup removes entries in the history above the size limit, triggered probabilistically. diff --git a/demo_test.go b/demo_test.go index 19ae52ee..c1d56ec2 100644 --- a/demo_test.go +++ b/demo_test.go @@ -30,7 +30,7 @@ func TestEmptyBodyAndJWT(t *testing.T) { defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) - assert.Equal(t, "", string(body)) + assert.Empty(t, string(body)) } func TestBodyAndJWT(t *testing.T) { diff --git a/hub_test.go b/hub_test.go index 9f10983a..8dab050c 100644 --- a/hub_test.go +++ b/hub_test.go @@ -29,11 +29,11 @@ func TestNewHub(t *testing.T) { assert.IsType(t, &viper.Viper{}, h.config) - assert.False(t, h.opt.anonymous) - assert.Equal(t, defaultCookieName, h.opt.cookieName) - assert.Equal(t, 40*time.Second, h.opt.heartbeat) - assert.Equal(t, 5*time.Second, h.opt.dispatchTimeout) - assert.Equal(t, 600*time.Second, h.opt.writeTimeout) + assert.False(t, h.anonymous) + assert.Equal(t, defaultCookieName, h.cookieName) + assert.Equal(t, 40*time.Second, h.heartbeat) + assert.Equal(t, 5*time.Second, h.dispatchTimeout) + assert.Equal(t, 600*time.Second, h.writeTimeout) } func TestNewHubWithConfig(t *testing.T) { diff --git a/publish_test.go b/publish_test.go index 1a8836dc..dd8c48c5 100644 --- a/publish_test.go +++ b/publish_test.go @@ -377,7 +377,7 @@ func FuzzPublish(f *testing.F) { assert.Equal(t, http.StatusOK, resp.StatusCode) if id == "" { - assert.NotEqual(t, "", string(body)) + assert.NotEmpty(t, string(body)) return } diff --git a/subscribe.go b/subscribe.go index a8f45b7a..33518834 100644 --- a/subscribe.go +++ b/subscribe.go @@ -84,7 +84,7 @@ func (h *Hub) getWriteDeadline(s *LocalSubscriber) (deadline time.Time) { deadline = time.Now().Add(h.writeTimeout) } - if s.Claims != nil && s.Claims.ExpiresAt != nil && (deadline == time.Time{} || s.Claims.ExpiresAt.Time.Before(deadline)) { + if s.Claims != nil && s.Claims.ExpiresAt != nil && (deadline.Equal(time.Time{}) || s.Claims.ExpiresAt.Before(deadline)) { deadline = s.Claims.ExpiresAt.Time } diff --git a/subscribe_test.go b/subscribe_test.go index 75ab7d5f..2990beb6 100644 --- a/subscribe_test.go +++ b/subscribe_test.go @@ -112,7 +112,7 @@ func (r *subscribeRecorder) WriteString(str string) (int, error) { return 0, os.ErrDeadlineExceeded } - return r.ResponseRecorder.WriteString(str) + return r.WriteString(str) } func (r *subscribeRecorder) FlushError() error { @@ -120,7 +120,7 @@ func (r *subscribeRecorder) FlushError() error { return os.ErrDeadlineExceeded } - r.ResponseRecorder.Flush() + r.Flush() return nil } @@ -554,7 +554,7 @@ func TestSubscriptionEvents(t *testing.T) { body, _ := io.ReadAll(resp.Body) assert.Equal(t, http.StatusOK, resp.StatusCode) - assert.Equal(t, "", string(body)) + assert.Empty(t, string(body)) }() go func() { diff --git a/subscriber.go b/subscriber.go index 44212e15..3f94abce 100644 --- a/subscriber.go +++ b/subscriber.go @@ -85,6 +85,26 @@ func (s *Subscriber) Match(u *Update) bool { return s.MatchTopics(u.Topics, u.Private) } +func (s *Subscriber) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("id", s.ID) + enc.AddString("last_event_id", s.RequestLastEventID) + if s.RemoteAddr != "" { + enc.AddString("remote_addr", s.RemoteAddr) + } + if s.AllowedPrivateTopics != nil { + if err := enc.AddArray("topic_selectors", stringArray(s.AllowedPrivateTopics)); err != nil { + return fmt.Errorf("log error: %w", err) + } + } + if s.SubscribedTopics != nil { + if err := enc.AddArray("topics", stringArray(s.SubscribedTopics)); err != nil { + return fmt.Errorf("log error: %w", err) + } + } + + return nil +} + // getSubscriptions return the list of subscriptions associated to this subscriber. func (s *Subscriber) getSubscriptions(topic, context string, active bool) []subscription { var subscriptions []subscription //nolint:prealloc @@ -110,23 +130,3 @@ func (s *Subscriber) getSubscriptions(topic, context string, active bool) []subs return subscriptions } - -func (s *Subscriber) MarshalLogObject(enc zapcore.ObjectEncoder) error { - enc.AddString("id", s.ID) - enc.AddString("last_event_id", s.RequestLastEventID) - if s.RemoteAddr != "" { - enc.AddString("remote_addr", s.RemoteAddr) - } - if s.AllowedPrivateTopics != nil { - if err := enc.AddArray("topic_selectors", stringArray(s.AllowedPrivateTopics)); err != nil { - return fmt.Errorf("log error: %w", err) - } - } - if s.SubscribedTopics != nil { - if err := enc.AddArray("topics", stringArray(s.SubscribedTopics)); err != nil { - return fmt.Errorf("log error: %w", err) - } - } - - return nil -} diff --git a/transport.go b/transport.go index 41c3d4ca..8475c184 100644 --- a/transport.go +++ b/transport.go @@ -26,19 +26,6 @@ func RegisterTransportFactory(scheme string, factory TransportFactory) { transportFactoriesMu.Unlock() } -// Deprecated: directly instantiate the transport or use transports Caddy modules. -func NewTransport(u *url.URL, l Logger) (Transport, error) { //nolint:ireturn - transportFactoriesMu.RLock() - f, ok := transportFactories[u.Scheme] - transportFactoriesMu.RUnlock() - - if !ok { - return nil, &TransportError{dsn: u.Redacted(), msg: "no such transport available"} - } - - return f(u, l) -} - // Transport provides methods to dispatch and persist updates. type Transport interface { // Dispatch dispatches an update to all subscribers. @@ -54,6 +41,19 @@ type Transport interface { Close() error } +// Deprecated: directly instantiate the transport or use transports Caddy modules. +func NewTransport(u *url.URL, l Logger) (Transport, error) { //nolint:ireturn + transportFactoriesMu.RLock() + f, ok := transportFactories[u.Scheme] + transportFactoriesMu.RUnlock() + + if !ok { + return nil, &TransportError{dsn: u.Redacted(), msg: "no such transport available"} + } + + return f(u, l) +} + // TransportSubscribers provides a method to retrieve the list of active subscribers. type TransportSubscribers interface { // GetSubscribers gets the last event ID and the list of active subscribers at this time.