Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
caddy/go.sum

- name: golangci-lint
uses: golangci/golangci-lint-action@v6
uses: golangci/golangci-lint-action@v7
with:
version: latest
args: --timeout=30m
Expand Down
79 changes: 47 additions & 32 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,41 +1,56 @@
---
version: "2"
run:
tests: true

linters:
# inverted configuration with `enable-all` and `disable` is not scalable during updates of golangci-lint
enable-all: true
default: all
disable:
- canonicalheader
- cyclop
- depguard
- errcheck
- lll
- wsl
- testpackage
- exhaustruct
- paralleltest
- cyclop
- forcetypeassert
- tagliatelle
- varnamelen
- nonamedreturns
- testableexamples
- musttag
- depguard
- mnd

# Go 1.22+
- intrange

# weird issues
- lll
- mnd
- musttag
- nolintlint
- canonicalheader

issues:
exclude-rules:
- path: _test\.go
linters:
- gochecknoglobals
- funlen
- godox
- noctx
- wrapcheck
- goconst
- nonamedreturns
- paralleltest
- tagliatelle
- testableexamples
- testpackage
- varnamelen
- wsl
exclusions:
generated: lax
presets:
- comments
- common-false-positives
- legacy
- std-error-handling
rules:
- linters:
- funlen
- gochecknoglobals
- goconst
- godox
- noctx
- wrapcheck
path: _test\.go
paths:
- third_party$
- builtin$
- examples$
formatters:
enable:
- gci
- gofmt
- gofumpt
- goimports
exclusions:
generated: lax
paths:
- third_party$
- builtin$
- examples$
102 changes: 51 additions & 51 deletions bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,41 +166,6 @@ func (t *BoltTransport) Dispatch(update *Update) error {
return nil
}

// persist stores update in the database.
func (t *BoltTransport) persist(updateID string, updateJSON []byte) error {
if err := t.db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(t.bucketName))
if err != nil {
return fmt.Errorf("error when creating Bolt DB bucket: %w", err)
}

seq, err := bucket.NextSequence()
if err != nil {
return fmt.Errorf("error when generating Bolt DB sequence: %w", err)
}
prefix := make([]byte, 8)
binary.BigEndian.PutUint64(prefix, seq)

// The sequence value is prepended to the update id to create an ordered list
key := bytes.Join([][]byte{prefix, []byte(updateID)}, []byte{})

// The DB is append-only
bucket.FillPercent = 1

t.lastSeq = seq
t.lastEventID = updateID
if err := bucket.Put(key, updateJSON); err != nil {
return fmt.Errorf("unable to put value in Bolt DB: %w", err)
}

return t.cleanup(bucket, seq)
}); err != nil {
return fmt.Errorf("bolt error: %w", err)
}

return nil
}

// AddSubscriber adds a new subscriber to the transport.
func (t *BoltTransport) AddSubscriber(s *LocalSubscriber) error {
select {
Expand All @@ -211,7 +176,7 @@ func (t *BoltTransport) AddSubscriber(s *LocalSubscriber) error {

t.Lock()
t.subscribers.Add(s)
toSeq := t.lastSeq //nolint:ifshort
toSeq := t.lastSeq
t.Unlock()

if s.RequestLastEventID != "" {
Expand Down Expand Up @@ -248,6 +213,29 @@ func (t *BoltTransport) GetSubscribers() (string, []*Subscriber, error) {
return t.lastEventID, getSubscribers(t.subscribers), nil
}

// Close closes the Transport.
func (t *BoltTransport) Close() (err error) {
t.closedOnce.Do(func() {
close(t.closed)

t.Lock()
defer t.Unlock()

t.subscribers.Walk(0, func(s *LocalSubscriber) bool {
s.Disconnect()

return true
})
err = t.db.Close()
})

if err == nil {
return nil
}

return fmt.Errorf("unable to close Bolt DB: %w", err)
}

//nolint:gocognit
func (t *BoltTransport) dispatchHistory(s *LocalSubscriber, toSeq uint64) error {
err := t.db.View(func(tx *bolt.Tx) error {
Expand Down Expand Up @@ -303,27 +291,39 @@ func (t *BoltTransport) dispatchHistory(s *LocalSubscriber, toSeq uint64) error
return nil
}

// Close closes the Transport.
func (t *BoltTransport) Close() (err error) {
t.closedOnce.Do(func() {
close(t.closed)
// persist stores update in the database.
func (t *BoltTransport) persist(updateID string, updateJSON []byte) error {
if err := t.db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(t.bucketName))
if err != nil {
return fmt.Errorf("error when creating Bolt DB bucket: %w", err)
}

t.Lock()
defer t.Unlock()
seq, err := bucket.NextSequence()
if err != nil {
return fmt.Errorf("error when generating Bolt DB sequence: %w", err)
}
prefix := make([]byte, 8)
binary.BigEndian.PutUint64(prefix, seq)

t.subscribers.Walk(0, func(s *LocalSubscriber) bool {
s.Disconnect()
// The sequence value is prepended to the update id to create an ordered list
key := bytes.Join([][]byte{prefix, []byte(updateID)}, []byte{})

return true
})
err = t.db.Close()
})
// The DB is append-only
bucket.FillPercent = 1

if err == nil {
return nil
t.lastSeq = seq
t.lastEventID = updateID
if err := bucket.Put(key, updateJSON); err != nil {
return fmt.Errorf("unable to put value in Bolt DB: %w", err)
}

return t.cleanup(bucket, seq)
}); err != nil {
return fmt.Errorf("bolt error: %w", err)
}

return fmt.Errorf("unable to close Bolt DB: %w", err)
return nil
}

// cleanup removes entries in the history above the size limit, triggered probabilistically.
Expand Down
2 changes: 1 addition & 1 deletion demo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestEmptyBodyAndJWT(t *testing.T) {

defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
assert.Equal(t, "", string(body))
assert.Empty(t, string(body))
}

func TestBodyAndJWT(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ func TestNewHub(t *testing.T) {

assert.IsType(t, &viper.Viper{}, h.config)

assert.False(t, h.opt.anonymous)
assert.Equal(t, defaultCookieName, h.opt.cookieName)
assert.Equal(t, 40*time.Second, h.opt.heartbeat)
assert.Equal(t, 5*time.Second, h.opt.dispatchTimeout)
assert.Equal(t, 600*time.Second, h.opt.writeTimeout)
assert.False(t, h.anonymous)
assert.Equal(t, defaultCookieName, h.cookieName)
assert.Equal(t, 40*time.Second, h.heartbeat)
assert.Equal(t, 5*time.Second, h.dispatchTimeout)
assert.Equal(t, 600*time.Second, h.writeTimeout)
}

func TestNewHubWithConfig(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func FuzzPublish(f *testing.F) {

assert.Equal(t, http.StatusOK, resp.StatusCode)
if id == "" {
assert.NotEqual(t, "", string(body))
assert.NotEmpty(t, string(body))

return
}
Expand Down
2 changes: 1 addition & 1 deletion subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (h *Hub) getWriteDeadline(s *LocalSubscriber) (deadline time.Time) {
deadline = time.Now().Add(h.writeTimeout)
}

if s.Claims != nil && s.Claims.ExpiresAt != nil && (deadline == time.Time{} || s.Claims.ExpiresAt.Time.Before(deadline)) {
if s.Claims != nil && s.Claims.ExpiresAt != nil && (deadline.Equal(time.Time{}) || s.Claims.ExpiresAt.Before(deadline)) {
deadline = s.Claims.ExpiresAt.Time
}

Expand Down
6 changes: 3 additions & 3 deletions subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,15 @@ func (r *subscribeRecorder) WriteString(str string) (int, error) {
return 0, os.ErrDeadlineExceeded
}

return r.ResponseRecorder.WriteString(str)
return r.WriteString(str)
}

func (r *subscribeRecorder) FlushError() error {
if time.Now().After(r.writeDeadline) {
return os.ErrDeadlineExceeded
}

r.ResponseRecorder.Flush()
r.Flush()

return nil
}
Expand Down Expand Up @@ -554,7 +554,7 @@ func TestSubscriptionEvents(t *testing.T) {
body, _ := io.ReadAll(resp.Body)

assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "", string(body))
assert.Empty(t, string(body))
}()

go func() {
Expand Down
40 changes: 20 additions & 20 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,26 @@ func (s *Subscriber) Match(u *Update) bool {
return s.MatchTopics(u.Topics, u.Private)
}

func (s *Subscriber) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("id", s.ID)
enc.AddString("last_event_id", s.RequestLastEventID)
if s.RemoteAddr != "" {
enc.AddString("remote_addr", s.RemoteAddr)
}
if s.AllowedPrivateTopics != nil {
if err := enc.AddArray("topic_selectors", stringArray(s.AllowedPrivateTopics)); err != nil {
return fmt.Errorf("log error: %w", err)
}
}
if s.SubscribedTopics != nil {
if err := enc.AddArray("topics", stringArray(s.SubscribedTopics)); err != nil {
return fmt.Errorf("log error: %w", err)
}
}

return nil
}

// getSubscriptions return the list of subscriptions associated to this subscriber.
func (s *Subscriber) getSubscriptions(topic, context string, active bool) []subscription {
var subscriptions []subscription //nolint:prealloc
Expand All @@ -110,23 +130,3 @@ func (s *Subscriber) getSubscriptions(topic, context string, active bool) []subs

return subscriptions
}

func (s *Subscriber) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("id", s.ID)
enc.AddString("last_event_id", s.RequestLastEventID)
if s.RemoteAddr != "" {
enc.AddString("remote_addr", s.RemoteAddr)
}
if s.AllowedPrivateTopics != nil {
if err := enc.AddArray("topic_selectors", stringArray(s.AllowedPrivateTopics)); err != nil {
return fmt.Errorf("log error: %w", err)
}
}
if s.SubscribedTopics != nil {
if err := enc.AddArray("topics", stringArray(s.SubscribedTopics)); err != nil {
return fmt.Errorf("log error: %w", err)
}
}

return nil
}
26 changes: 13 additions & 13 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,6 @@ func RegisterTransportFactory(scheme string, factory TransportFactory) {
transportFactoriesMu.Unlock()
}

// Deprecated: directly instantiate the transport or use transports Caddy modules.
func NewTransport(u *url.URL, l Logger) (Transport, error) { //nolint:ireturn
transportFactoriesMu.RLock()
f, ok := transportFactories[u.Scheme]
transportFactoriesMu.RUnlock()

if !ok {
return nil, &TransportError{dsn: u.Redacted(), msg: "no such transport available"}
}

return f(u, l)
}

// Transport provides methods to dispatch and persist updates.
type Transport interface {
// Dispatch dispatches an update to all subscribers.
Expand All @@ -54,6 +41,19 @@ type Transport interface {
Close() error
}

// Deprecated: directly instantiate the transport or use transports Caddy modules.
func NewTransport(u *url.URL, l Logger) (Transport, error) { //nolint:ireturn
transportFactoriesMu.RLock()
f, ok := transportFactories[u.Scheme]
transportFactoriesMu.RUnlock()

if !ok {
return nil, &TransportError{dsn: u.Redacted(), msg: "no such transport available"}
}

return f(u, l)
}

// TransportSubscribers provides a method to retrieve the list of active subscribers.
type TransportSubscribers interface {
// GetSubscribers gets the last event ID and the list of active subscribers at this time.
Expand Down
Loading