Skip to content

Commit 577997d

Browse files
committed
Auto reconnect if nats credential expires
Signed-off-by: Tamal Saha <tamal@appscode.com>
1 parent a8e7b9e commit 577997d

File tree

12 files changed

+190
-229
lines changed

12 files changed

+190
-229
lines changed

go.mod

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,9 @@ require (
88
github.com/cloudevents/sdk-go/v2 v2.15.2
99
github.com/nats-io/nats.go v1.38.0
1010
github.com/pkg/errors v0.9.1
11-
go.bytebuilders.dev/license-verifier v0.14.4
12-
go.bytebuilders.dev/license-verifier/kubernetes v0.14.4
11+
go.bytebuilders.dev/license-verifier v0.14.6
12+
go.bytebuilders.dev/license-verifier/kubernetes v0.14.6
1313
gomodules.xyz/counter v0.0.1
14-
gomodules.xyz/sync v0.1.0
1514
k8s.io/api v0.30.2
1615
k8s.io/apimachinery v0.30.2
1716
k8s.io/client-go v0.30.2
@@ -91,7 +90,7 @@ require (
9190
github.com/yudai/gojsondiff v1.0.0 // indirect
9291
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect
9392
github.com/zeebo/xxh3 v1.0.2 // indirect
94-
go.bytebuilders.dev/license-proxyserver v0.0.19 // indirect
93+
go.bytebuilders.dev/license-proxyserver v0.0.20 // indirect
9594
go.uber.org/multierr v1.11.0 // indirect
9695
go.uber.org/zap v1.27.0 // indirect
9796
golang.org/x/crypto v0.31.0 // indirect

go.sum

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -210,12 +210,12 @@ github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
210210
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
211211
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
212212
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
213-
go.bytebuilders.dev/license-proxyserver v0.0.19 h1:mY7zPDN0JCw2a1UajOuQUQKQKjjm5KBx2CbkT/+a1N8=
214-
go.bytebuilders.dev/license-proxyserver v0.0.19/go.mod h1:B3Ig2Fo1qUollSV9GgfyFK8tXBI0RmUSpP1KFMZ2N7Q=
215-
go.bytebuilders.dev/license-verifier v0.14.4 h1:JwTGQFew4nudwv8Pk3BdfQRts8KfgUQ5xhu138w1wt4=
216-
go.bytebuilders.dev/license-verifier v0.14.4/go.mod h1:LqWXJKee5ofDcCYM6T5WilYlUc4NlKeZz58tHwO8GEs=
217-
go.bytebuilders.dev/license-verifier/kubernetes v0.14.4 h1:NeHq6SuVhRIVaMW2kSXdr8DcuUOg2jVL9rsODIQp9Fc=
218-
go.bytebuilders.dev/license-verifier/kubernetes v0.14.4/go.mod h1:1C7SaOJShC60mIXP1hXBaDWGpb0hrHQ4p4nUEvI6YQY=
213+
go.bytebuilders.dev/license-proxyserver v0.0.20 h1:gzRSwUmX/LSwPVE6T9oy5RLIutU1EeI7hmS+QGsYBY4=
214+
go.bytebuilders.dev/license-proxyserver v0.0.20/go.mod h1:2PJmjMCXncVyeP3fIVQ+hwZnuhmWSTmbcuEMBrFKIac=
215+
go.bytebuilders.dev/license-verifier v0.14.6 h1:0iHYGURUbx8toiXvFKftn/qMpeHzqHbAgEnEzOCNLvo=
216+
go.bytebuilders.dev/license-verifier v0.14.6/go.mod h1:LqWXJKee5ofDcCYM6T5WilYlUc4NlKeZz58tHwO8GEs=
217+
go.bytebuilders.dev/license-verifier/kubernetes v0.14.6 h1:NxmASX0A3lu+ABd4zuT5Ib+I63y3j5uJxmlUFEGxqWg=
218+
go.bytebuilders.dev/license-verifier/kubernetes v0.14.6/go.mod h1:N5QxsJF4EGLduOsTsW9gGfRuuMvN33T8pg5Y9NfKzuo=
219219
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
220220
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
221221
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
@@ -283,8 +283,6 @@ gomodules.xyz/mergo v0.3.13 h1:q6cL/MMXZH/MrR2+yjSihFFq6UifXqjwaqI48B6cMEM=
283283
gomodules.xyz/mergo v0.3.13/go.mod h1:F/2rKC7j0URTnHUKDiTiLcGdLMhdv8jK2Za3cRTUVmc=
284284
gomodules.xyz/pointer v0.1.0 h1:sG2UKrYVSo6E3r4itAjXfPfe4fuXMi0KdyTHpR3vGCg=
285285
gomodules.xyz/pointer v0.1.0/go.mod h1:sPLsC0+yLTRecUiC5yVlyvXhZ6LAGojNCRWNNqoplvo=
286-
gomodules.xyz/sync v0.1.0 h1:Y7vHOtMrqN9FojRwkCQdC17dKL1fVx+6xb7WdfnXX58=
287-
gomodules.xyz/sync v0.1.0/go.mod h1:kv570yCdknyiZ8Y94uaRFGBC5E47TV/5A7PD9jlnJoQ=
288286
gomodules.xyz/testing v0.0.4 h1:XGKt4B64mBe7P9kPR0Rz1nCQpWoSpBEFdTGkfU1RLe4=
289287
gomodules.xyz/testing v0.0.4/go.mod h1:hD6aXtv9eVycPwS01zv+QTl5BrK2DXQgr6bHqnrW+44=
290288
gomodules.xyz/x v0.0.17 h1:Ik3wf0suCMiYPY0miFUh+q8BpjsUHc/7zvANbFViBQA=

lib/nats.go

Lines changed: 86 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"io"
2525
"net/http"
2626
"os"
27+
"strings"
2728
"sync"
2829
"time"
2930

@@ -45,10 +46,22 @@ const (
4546
)
4647

4748
type NatsConfig struct {
48-
// LicenseID string `json:"licenseID"`
49-
Subject string `json:"natsSubject"`
50-
Server string `json:"natsServer"`
51-
Client *nats.Conn `json:"-"`
49+
Subject string `json:"natsSubject"`
50+
Server string `json:"natsServer"`
51+
}
52+
53+
type NatsClient struct {
54+
cfg *rest.Config
55+
clusterID string
56+
LicenseFile string
57+
58+
le *kubernetes.LicenseEnforcer
59+
l *v1alpha1.License
60+
61+
nc *nats.Conn
62+
Subject string
63+
Server string
64+
mu sync.Mutex
5265
}
5366

5467
// NatsCredential represents the api response of the register licensed user api
@@ -61,75 +74,118 @@ type LicenseIDGetter interface {
6174
GetLicenseID() string
6275
}
6376

64-
type LicenseUpdater struct {
65-
le *kubernetes.LicenseEnforcer
66-
License v1alpha1.License
67-
mu sync.Mutex
77+
func (c *NatsClient) Request(data []byte, timeout time.Duration) (*nats.Msg, error) {
78+
c.mu.Lock()
79+
defer c.mu.Unlock()
80+
81+
var justConnected bool
82+
if c.nc == nil {
83+
if err := c.connect(); err != nil {
84+
return nil, err
85+
}
86+
justConnected = true
87+
}
88+
msg, err := c.nc.Request(c.Subject, data, timeout)
89+
if err != nil && !justConnected && isNatsAuthError(err.Error()) {
90+
if err := c.connect(); err != nil {
91+
return nil, err
92+
}
93+
msg, err = c.nc.Request(c.Subject, data, timeout)
94+
}
95+
return msg, err
96+
}
97+
98+
// src: https://github.com/nats-io/nats.go/blob/main/nats.go#L3693-L3709
99+
func isNatsAuthError(e string) bool {
100+
if strings.HasPrefix(e, nats.AUTHORIZATION_ERR) {
101+
return true
102+
}
103+
if strings.HasPrefix(e, nats.AUTHENTICATION_EXPIRED_ERR) {
104+
return true
105+
}
106+
if strings.HasPrefix(e, nats.AUTHENTICATION_REVOKED_ERR) {
107+
return true
108+
}
109+
if strings.HasPrefix(e, nats.ACCOUNT_AUTHENTICATION_EXPIRED_ERR) {
110+
return true
111+
}
112+
return false
68113
}
69114

70-
func (lu *LicenseUpdater) GetLicenseID() string {
71-
lu.mu.Lock()
72-
defer lu.mu.Unlock()
115+
func (c *NatsClient) GetLicenseID() string {
116+
c.mu.Lock()
117+
defer c.mu.Unlock()
118+
119+
if c.l.Status == v1alpha1.LicenseActive && time.Now().After(c.l.NotAfter.Time) {
120+
license, _ := c.le.LoadLicense()
121+
c.l = &license
122+
}
123+
return c.l.ID
124+
}
73125

74-
l := lu.License
75-
if l.Status == v1alpha1.LicenseActive && time.Now().After(l.NotAfter.Time) {
76-
license, _ := lu.le.LoadLicense()
77-
lu.License = license
78-
l = license
126+
func NewNatsConfig(cfg *rest.Config, clusterID string, LicenseFile string) *NatsClient {
127+
return &NatsClient{
128+
cfg: cfg,
129+
clusterID: clusterID,
130+
LicenseFile: LicenseFile,
79131
}
80-
return l.ID
81132
}
82133

83-
func NewNatsConfig(cfg *rest.Config, clusterID string, LicenseFile string) (*NatsConfig, LicenseIDGetter, error) {
84-
le, err := kubernetes.NewLicenseEnforcer(cfg, LicenseFile)
134+
func (c *NatsClient) connect() error {
135+
le, err := kubernetes.NewLicenseEnforcer(c.cfg, c.LicenseFile)
85136
if err != nil {
86-
return nil, nil, err
137+
return err
87138
}
88139
license, licenseBytes := le.LoadLicense()
89140
if license.Status != v1alpha1.LicenseActive {
90-
return nil, nil, fmt.Errorf("license status is %s", license.Status)
141+
return fmt.Errorf("license status is %s", license.Status)
91142
}
92143

93144
opts := verifier.Options{
94-
ClusterUID: clusterID,
145+
ClusterUID: c.clusterID,
95146
Features: info.ProductName,
96147
CACert: []byte(info.LicenseCA),
97148
License: licenseBytes,
98149
}
99150
data, err := json.Marshal(opts)
100151
if err != nil {
101-
return nil, nil, err
152+
return err
102153
}
103154

104155
resp, err := http.Post(info.MustRegistrationAPIEndpoint(), "application/json", bytes.NewReader(data))
105156
if err != nil {
106-
return nil, nil, err
157+
return err
107158
}
108159
defer resp.Body.Close()
109160

110161
body, err := io.ReadAll(resp.Body)
111162
if err != nil {
112-
return nil, nil, err
163+
return err
113164
}
114165

115166
if resp.StatusCode != http.StatusOK {
116-
return nil, nil, errors.New(resp.Status + ", " + string(body))
167+
return errors.New(resp.Status + ", " + string(body))
117168
}
118169

119170
var natscred NatsCredential
120171
err = json.Unmarshal(body, &natscred)
121172
if err != nil {
122-
return nil, nil, err
173+
return err
123174
}
124175

125176
klog.V(5).InfoS("using event receiver", "address", natscred.Server, "subject", natscred.Subject, "licenseID", license.ID)
126177

127-
natscred.Client, err = NewConnection(license.ID, natscred)
178+
nc, err := NewConnection(license.ID, natscred)
128179
if err != nil {
129-
return nil, nil, err
180+
return err
130181
}
131182

132-
return &natscred.NatsConfig, &LicenseUpdater{le: le, License: license}, nil
183+
c.le = le
184+
c.l = &license
185+
c.nc = nc
186+
c.Subject = natscred.Subject
187+
c.Server = natscred.Server
188+
return nil
133189
}
134190

135191
// NewConnection creates a new NATS connection

lib/publisher.go

Lines changed: 11 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package lib
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
gosync "sync"
2324
"time"
@@ -27,9 +28,7 @@ import (
2728
cloudeventssdk "github.com/cloudevents/sdk-go/v2"
2829
"github.com/cloudevents/sdk-go/v2/binding/format"
2930
cloudevents "github.com/cloudevents/sdk-go/v2/event"
30-
"github.com/nats-io/nats.go"
3131
"go.bytebuilders.dev/license-verifier/info"
32-
"gomodules.xyz/sync"
3332
core "k8s.io/api/core/v1"
3433
"k8s.io/apimachinery/pkg/labels"
3534
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -62,11 +61,7 @@ type Informer interface {
6261
type EventCreator func(obj client.Object) (*api.Event, error)
6362

6463
type EventPublisher struct {
65-
once sync.Once
66-
connect func() error
67-
68-
nats *NatsConfig
69-
lu LicenseIDGetter
64+
c *NatsClient
7065
mapper discovery.ResourceMapper
7166
createEvent EventCreator
7267

@@ -75,47 +70,15 @@ type EventPublisher struct {
7570
}
7671

7772
func NewEventPublisher(
78-
nats *NatsConfig,
73+
c *NatsClient,
7974
mapper discovery.ResourceMapper,
8075
fn EventCreator,
8176
) *EventPublisher {
82-
p := &EventPublisher{
77+
return &EventPublisher{
78+
c: c,
8379
mapper: mapper,
8480
createEvent: fn,
8581
}
86-
p.connect = func() error {
87-
p.nats = nats
88-
return nil
89-
}
90-
return p
91-
}
92-
93-
func NewResilientEventPublisher(
94-
fnConnect func() (*NatsConfig, LicenseIDGetter, error),
95-
mapper discovery.ResourceMapper,
96-
fnCreateEvent EventCreator,
97-
) *EventPublisher {
98-
p := &EventPublisher{
99-
mapper: mapper,
100-
createEvent: fnCreateEvent,
101-
}
102-
p.connect = func() error {
103-
var err error
104-
p.nats, p.lu, err = fnConnect()
105-
if err != nil {
106-
klog.V(5).InfoS("failed to connect with event receiver", "error", err)
107-
}
108-
return err
109-
}
110-
return p
111-
}
112-
113-
func (p *EventPublisher) NatsClient() (*nats.Conn, error) {
114-
p.once.Do(p.connect)
115-
if p.nats == nil {
116-
return nil, fmt.Errorf("not connected to nats")
117-
}
118-
return p.nats.Client, nil
11982
}
12083

12184
func (p *EventPublisher) Publish(ev *api.Event, et api.EventType) error {
@@ -145,7 +108,7 @@ func (p *EventPublisher) Publish(ev *api.Event, et api.EventType) error {
145108
defer cancel()
146109

147110
for {
148-
_, err = p.nats.Client.Request(p.nats.Subject, data, natsRequestTimeout)
111+
_, err = p.c.Request(data, natsRequestTimeout)
149112
if err == nil {
150113
cancel()
151114
} else {
@@ -154,10 +117,10 @@ func (p *EventPublisher) Publish(ev *api.Event, et api.EventType) error {
154117

155118
select {
156119
case <-ctx.Done():
157-
if ctx.Err() == context.DeadlineExceeded {
120+
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
158121
klog.V(5).Infof("failed to send event : %s", string(data))
159-
} else if ctx.Err() == context.Canceled {
160-
klog.V(5).Infof("Published event `%s` to channel `%s` and acknowledged", et, p.nats.Subject)
122+
} else if errors.Is(ctx.Err(), context.Canceled) {
123+
klog.V(5).Infof("Published event `%s` to channel `%s` and acknowledged", et, p.c.Subject)
161124
}
162125
return nil
163126
default:
@@ -183,13 +146,7 @@ func (p *EventPublisher) ForGVK(informer Informer, gvk schema.GroupVersionKind)
183146
if err != nil {
184147
return nil, err
185148
}
186-
187-
p.once.Do(p.connect)
188-
if p.nats == nil {
189-
return nil, fmt.Errorf("not connected to nats")
190-
}
191-
ev.LicenseID = p.lu.GetLicenseID()
192-
149+
ev.LicenseID = p.c.GetLicenseID()
193150
return ev, nil
194151
},
195152
}
@@ -255,12 +212,7 @@ func (p *EventPublisher) setupSiteInfoPublisher(cfg *rest.Config, kc kubernetes.
255212
identitylib.RefreshNodeStats(p.si, nodes)
256213
p.siMutex.Unlock()
257214

258-
p.once.Do(p.connect)
259-
if p.nats == nil {
260-
return nil, fmt.Errorf("not connected to nats")
261-
}
262-
263-
licenseID := p.lu.GetLicenseID()
215+
licenseID := p.c.GetLicenseID()
264216
p.si.Product.LicenseID = licenseID
265217
p.si.Name = fmt.Sprintf("%s.%s", licenseID, p.si.Product.ProductName)
266218
ev := &api.Event{

vendor/go.bytebuilders.dev/license-verifier/apis/licenses/v1alpha1/helper.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ limitations under the License.
1717
package v1alpha1
1818

1919
func (l License) DisableAnalytics() bool {
20-
return len(l.FeatureFlags) > 0 && l.FeatureFlags["DisableAnalytics"] == "true"
20+
return len(l.FeatureFlags) > 0 && l.FeatureFlags[FeatureDisableAnalytics] == "true"
21+
}
22+
23+
func (l License) EnableClientBilling() bool {
24+
return len(l.FeatureFlags) > 0 && l.FeatureFlags[FeatureEnableClientBilling] == "true"
2125
}
2226

2327
func (i *License) Less(j *License) bool {

0 commit comments

Comments
 (0)