Skip to content
This repository was archived by the owner on Sep 2, 2024. It is now read-only.

Commit ae53aac

Browse files
committed
add local cache observer
1 parent fc2446d commit ae53aac

File tree

5 files changed

+453
-10
lines changed

5 files changed

+453
-10
lines changed

cache/cache.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,13 +148,13 @@ func (c *Cache) Publish(msg internal.Command) error {
148148
func (c *Cache) PublishDocument(channel, typ string, v interface{}) {
149149
subs, err := c.Rdb.PubSubNumSub(c.Ctx, channel).Result()
150150
if err != nil {
151-
c.log.Error().Err(err).Msgf("error getting db subscribers for ", channel)
151+
c.log.Error().Err(err).Msgf("error getting db subscribers for %s", channel)
152152
return
153153
}
154154

155155
count, ok := subs[channel]
156156
if !ok {
157-
c.log.Warn().Msgf("cannot find channel in subs: %d", channel)
157+
c.log.Warn().Msgf("cannot find channel in subs: %s", channel)
158158
return
159159
} else if count == 0 {
160160
return

cache/cache_test.go

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
package cache
2+
3+
import (
4+
"encoding/json"
5+
"github.com/staticbackendhq/core/config"
6+
"github.com/staticbackendhq/core/internal"
7+
"github.com/staticbackendhq/core/logger"
8+
"os"
9+
"testing"
10+
"time"
11+
)
12+
13+
var (
14+
redisCache *Cache
15+
devCache *CacheDev
16+
adminToken internal.Token
17+
adminAuth internal.Auth
18+
document string
19+
)
20+
21+
type suite struct {
22+
name string
23+
cache internal.Volatilizer
24+
}
25+
26+
func TestMain(m *testing.M) {
27+
config.Current = config.LoadConfig()
28+
logz := logger.Get(config.Current)
29+
redisCache = NewCache(logz)
30+
devCache = NewDevCache(logz)
31+
32+
adminAuth = internal.Auth{
33+
AccountID: "047cfe5b-b91d-4ec6-9bc2-8f68309d8532",
34+
UserID: "5dc37900-2a2e-46d9-8a5d-6699376975ad",
35+
36+
Role: 100,
37+
Token: adminToken.Token,
38+
}
39+
document = `{"accountId":"047cfe5b-b91d-4ec6-9bc2-8f68309d8532","created":"2022-08-31T19:07:36.296787226+03:00","done":true,"id":"872c8192-c610-4fc5-bc8e-22c01f01c798","likes":0,"title":"updated","todos":[{"done":true,"title":"updated"},{"done":false,"title":"sub2"}]}`
40+
os.Exit(m.Run())
41+
}
42+
43+
func TestCacheSubscribe(t *testing.T) {
44+
tests := []suite{
45+
{name: "subscribe with redis cache", cache: redisCache},
46+
{name: "subscribe with dev mem cache", cache: devCache},
47+
}
48+
for _, tc := range tests {
49+
t.Run(tc.name, func(t *testing.T) {
50+
receiver := make(chan internal.Command)
51+
closeCn := make(chan bool)
52+
defer close(receiver)
53+
defer close(closeCn)
54+
55+
payload := internal.Command{Type: internal.MsgTypeError, Data: "invalid token", Channel: "random_cahn"}
56+
57+
go tc.cache.Subscribe(receiver, "", payload.Channel, closeCn)
58+
time.Sleep(10 * time.Millisecond) // need to wait for proper subscriber startup
59+
60+
err := tc.cache.Publish(payload)
61+
if err != nil {
62+
t.Fatal(err.Error())
63+
}
64+
timer := time.NewTimer(5 * time.Second)
65+
select {
66+
case res := <-receiver:
67+
if res != payload {
68+
t.Error("Incorrect message is received")
69+
}
70+
break
71+
case <-timer.C:
72+
t.Fatal("The channel does not received a message")
73+
74+
}
75+
closeCn <- true
76+
})
77+
}
78+
}
79+
80+
func TestCacheSubscribeOnDBEvent(t *testing.T) {
81+
tests := []suite{
82+
{name: "receive db event with redis cache", cache: redisCache},
83+
{name: "receive db event with dev mem cache", cache: devCache},
84+
}
85+
for _, tc := range tests {
86+
t.Run(tc.name, func(t *testing.T) {
87+
receiver := make(chan internal.Command)
88+
closeCn := make(chan bool)
89+
defer close(receiver)
90+
defer close(closeCn)
91+
92+
err := tc.cache.SetTyped("token", adminAuth)
93+
if err != nil {
94+
t.Fatal(err.Error())
95+
}
96+
97+
payload := internal.Command{Type: internal.MsgTypeDBUpdated, Data: document, Channel: "random_cahn"}
98+
99+
go tc.cache.Subscribe(receiver, "token", payload.Channel, closeCn)
100+
101+
time.Sleep(10 * time.Millisecond) // need to wait for proper subscriber startup
102+
103+
err = tc.cache.Publish(payload)
104+
if err != nil {
105+
t.Fatal(err.Error())
106+
}
107+
timer := time.NewTimer(5 * time.Second)
108+
defer timer.Stop()
109+
select {
110+
case res := <-receiver:
111+
if res != payload {
112+
t.Error("Incorrect message is received")
113+
}
114+
break
115+
case <-timer.C:
116+
t.Fatal("The channel does not received a message")
117+
118+
}
119+
closeCn <- true
120+
})
121+
}
122+
}
123+
124+
func TestCacheDontReceiveDBEvent(t *testing.T) {
125+
tests := []suite{
126+
{
127+
name: "DB event with incorrect token is not send to subscriber with redis cache",
128+
cache: redisCache,
129+
},
130+
{
131+
name: "DB event with incorrect token is not send to subscriber with dev cache",
132+
cache: devCache,
133+
},
134+
}
135+
for _, tc := range tests {
136+
t.Run(tc.name, func(t *testing.T) {
137+
receiver := make(chan internal.Command)
138+
closeCn := make(chan bool)
139+
defer close(receiver)
140+
defer close(closeCn)
141+
142+
wrongAuth := adminAuth
143+
wrongAuth.AccountID = "wrongId"
144+
err := tc.cache.SetTyped("token", wrongAuth)
145+
if err != nil {
146+
t.Fatal(err.Error())
147+
}
148+
event := internal.Command{Type: internal.MsgTypeDBUpdated, Data: document, Channel: "chan"}
149+
go tc.cache.Subscribe(receiver, "token", event.Channel, closeCn)
150+
151+
time.Sleep(10 * time.Millisecond) // need to wait for proper subscriber startup
152+
153+
err = tc.cache.Publish(event)
154+
if err != nil {
155+
t.Fatal(err.Error())
156+
}
157+
timer := time.NewTimer(2 * time.Second)
158+
defer timer.Stop()
159+
select {
160+
case res := <-receiver:
161+
closeCn <- true
162+
t.Fatalf("The message should not be received\nReceived: %#v", res)
163+
case <-timer.C:
164+
closeCn <- true
165+
166+
}
167+
})
168+
}
169+
}
170+
171+
func TestCachePublishDocument(t *testing.T) {
172+
tests := []suite{
173+
{name: "receive db event with redis cache", cache: redisCache},
174+
{name: "receive db event with dev mem cache", cache: devCache},
175+
}
176+
for _, tc := range tests {
177+
t.Run(tc.name, func(t *testing.T) {
178+
receiver := make(chan internal.Command)
179+
closeCn := make(chan bool)
180+
defer close(receiver)
181+
defer close(closeCn)
182+
183+
err := tc.cache.SetTyped("token", adminAuth)
184+
if err != nil {
185+
t.Fatal(err.Error())
186+
}
187+
188+
payload := internal.Command{Type: internal.MsgTypeDBUpdated, Data: document, Channel: "random_cahn"}
189+
190+
// convert to map for simulation of real usage
191+
var documentMap map[string]interface{}
192+
if err := json.Unmarshal([]byte(document), &documentMap); err != nil {
193+
t.Fatal(err.Error())
194+
}
195+
196+
go tc.cache.Subscribe(receiver, "token", payload.Channel, closeCn)
197+
198+
time.Sleep(10 * time.Millisecond) // need to wait for proper subscriber startup
199+
200+
tc.cache.PublishDocument(payload.Channel, payload.Type, documentMap)
201+
timer := time.NewTimer(5 * time.Second)
202+
defer timer.Stop()
203+
select {
204+
case res := <-receiver:
205+
if res != payload {
206+
t.Error("Incorrect message is received")
207+
}
208+
break
209+
case <-timer.C:
210+
t.Fatal("The channel does not received a message")
211+
212+
}
213+
closeCn <- true
214+
})
215+
}
216+
}

cache/dev.go

Lines changed: 120 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,26 @@ package cache
33
import (
44
"encoding/json"
55
"errors"
6-
6+
"fmt"
7+
"github.com/staticbackendhq/core/cache/observer"
78
"github.com/staticbackendhq/core/internal"
9+
"github.com/staticbackendhq/core/logger"
810
)
911

1012
type CacheDev struct {
11-
data map[string]string
13+
data map[string]string
14+
log *logger.Logger
15+
observer observer.Observer
1216
}
1317

14-
func NewDevCache() *CacheDev {
15-
return &CacheDev{data: make(map[string]string)}
18+
func NewDevCache(log *logger.Logger) *CacheDev {
19+
return &CacheDev{
20+
data: make(map[string]string),
21+
observer: observer.NewObserver(log),
22+
log: log,
23+
}
1624
}
25+
1726
func (d *CacheDev) Get(key string) (val string, err error) {
1827
val, ok := d.data[key]
1928
if !ok {
@@ -61,15 +70,119 @@ func (d *CacheDev) Dec(key string, by int64) (int64, error) {
6170
}
6271

6372
func (d *CacheDev) Subscribe(send chan internal.Command, token, channel string, close chan bool) {
64-
//TODO: implement this
73+
pubsub := d.observer.Subscribe(channel)
74+
75+
ch := pubsub.Channel()
76+
77+
for {
78+
select {
79+
case m := <-ch:
80+
var msg internal.Command
81+
if err := json.Unmarshal([]byte(m.(string)), &msg); err != nil {
82+
d.log.Error().Err(err).Msg("error parsing JSON message")
83+
_ = pubsub.Close()
84+
_ = d.observer.Unsubscribe(channel, pubsub)
85+
return
86+
}
87+
88+
// TODO: this will need more thinking
89+
if msg.Type == internal.MsgTypeChanIn {
90+
msg.Type = internal.MsgTypeChanOut
91+
} else if msg.IsSystemEvent {
92+
93+
} else if msg.IsDBEvent() && d.HasPermission(token, channel, msg.Data) == false {
94+
continue
95+
}
96+
send <- msg
97+
case <-close:
98+
_ = pubsub.Close()
99+
_ = d.observer.Unsubscribe(channel, pubsub)
100+
return
101+
}
102+
}
65103
}
66104

67105
func (d *CacheDev) Publish(msg internal.Command) error {
68-
return errors.New("not implemented")
106+
b, err := json.Marshal(msg)
107+
if err != nil {
108+
return err
109+
}
110+
111+
// Publish the event to system so server-side function can trigger
112+
go func(sysmsg internal.Command) {
113+
sysmsg.IsSystemEvent = true
114+
b, err := json.Marshal(sysmsg)
115+
if err != nil {
116+
d.log.Error().Err(err).Msg("error marshaling the system msg")
117+
return
118+
}
119+
if err := d.observer.Publish("sbsys", string(b)); err != nil {
120+
d.log.Error().Err(err).Msg("error occurred during publishing to 'sbsys' channel")
121+
}
122+
}(msg)
123+
124+
return d.observer.Publish(msg.Channel, string(b))
69125
}
70126

71127
func (d *CacheDev) PublishDocument(channel, typ string, v any) {
72-
//TODO: Implement this
128+
subs := d.observer.PubNumSub(channel)
129+
130+
count, ok := subs[channel]
131+
if !ok {
132+
d.log.Warn().Msgf("cannot find channel in subs: %s", channel)
133+
return
134+
} else if count == 0 {
135+
return
136+
}
137+
138+
b, err := json.Marshal(v)
139+
if err != nil {
140+
d.log.Error().Err(err).Msg("error publishing db doc")
141+
return
142+
}
143+
144+
msg := internal.Command{
145+
Channel: channel,
146+
Data: string(b),
147+
Type: typ,
148+
}
149+
150+
if err := d.Publish(msg); err != nil {
151+
d.log.Error().Err(err).Msg("unable to publish db doc events")
152+
}
153+
}
154+
155+
func (d *CacheDev) HasPermission(token, repo, payload string) bool {
156+
var me internal.Auth
157+
if err := d.GetTyped(token, &me); err != nil {
158+
return false
159+
}
160+
161+
docs := make(map[string]interface{})
162+
if err := json.Unmarshal([]byte(payload), &docs); err != nil {
163+
d.log.Error().Err(err).Msg("error decoding docs for permissions check")
164+
165+
return false
166+
}
167+
168+
switch internal.ReadPermission(repo) {
169+
case internal.PermGroup:
170+
acctID, ok := docs["accountId"]
171+
if !ok {
172+
return false
173+
}
174+
175+
return fmt.Sprintf("%v", acctID) == me.AccountID
176+
case internal.PermOwner:
177+
owner, ok := docs["ownerId"]
178+
if !ok {
179+
return false
180+
}
181+
182+
return fmt.Sprintf("%v", owner) == me.UserID
183+
default:
184+
return true
185+
}
73186
}
74187

75188
func (d *CacheDev) QueueWork(key, value string) error {

0 commit comments

Comments
 (0)