Skip to content
This repository was archived by the owner on Sep 2, 2024. It is now read-only.

Commit 0cb870b

Browse files
authored
Merge pull request #61 from rostikts/add_cache_events_in_bulk_update_func
Add cache events in bulk update func
2 parents 7c2eb20 + 9732985 commit 0cb870b

File tree

10 files changed

+331
-0
lines changed

10 files changed

+331
-0
lines changed

database/memory/base.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,20 @@ func (m *Memory) GetDocumentByID(auth model.Auth, dbName, col, id string) (doc m
117117
return
118118
}
119119

120+
func (m *Memory) GetDocumentsByIDs(auth model.Auth, dbName, col string, ids []string) (docs []map[string]interface{}, err error) {
121+
122+
for _, id := range ids {
123+
var doc map[string]interface{}
124+
if err := getByID(m, dbName, col, id, &doc); err != nil {
125+
return []map[string]interface{}{}, err
126+
}
127+
docs = append(docs, doc)
128+
}
129+
130+
docs = secureRead(auth, col, docs)
131+
return docs, nil
132+
}
133+
120134
func (m *Memory) UpdateDocument(auth model.Auth, dbName, col, id string, doc map[string]any) (exists map[string]any, err error) {
121135
exists, err = m.GetDocumentByID(auth, dbName, col, id)
122136
if err != nil {

database/memory/base_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,40 @@ func TestGetDocumentByID(t *testing.T) {
169169
}
170170
}
171171

172+
func TestGetDocumentsByIDs(t *testing.T) {
173+
input := []map[string]interface{}{newTask("getbyids123", false), newTask("getbyids123", false)}
174+
175+
var assertionTasks []map[string]interface{}
176+
var ids []string
177+
178+
for _, v := range input {
179+
res, err := datastore.CreateDocument(adminAuth, confDBName, colName, v)
180+
if err != nil {
181+
t.Fatal(err)
182+
}
183+
ids = append(ids, res["id"].(string))
184+
}
185+
186+
for _, v := range ids {
187+
m, err := datastore.GetDocumentByID(adminAuth, confDBName, colName, v)
188+
if err != nil {
189+
t.Fatal(err)
190+
}
191+
assertionTasks = append(assertionTasks, m)
192+
}
193+
194+
res, err := datastore.GetDocumentsByIDs(adminAuth, confDBName, colName, ids)
195+
if err != nil {
196+
t.Fatal(err)
197+
}
198+
if len(assertionTasks) != len(res) {
199+
t.Fatal("received incorrect number of documents")
200+
}
201+
if !reflect.DeepEqual(assertionTasks, res) {
202+
t.Errorf("Does not received expected tasks\nE: %v\nA: %v", assertionTasks, res)
203+
}
204+
}
205+
172206
func TestUpdateDocument(t *testing.T) {
173207
task1 := newTask("inserted", false)
174208

database/mongo/base.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,43 @@ func (mg *Mongo) GetDocumentByID(auth model.Auth, dbName, col, id string) (map[s
320320
return result, nil
321321
}
322322

323+
func (mg *Mongo) GetDocumentsByIDs(auth model.Auth, dbName, col string, ids []string) (docs []map[string]interface{}, err error) {
324+
db := mg.Client.Database(dbName)
325+
326+
var oids []primitive.ObjectID
327+
328+
for _, id := range ids {
329+
oid, err := primitive.ObjectIDFromHex(id)
330+
if err != nil {
331+
return []map[string]interface{}{}, err
332+
}
333+
oids = append(oids, oid)
334+
}
335+
336+
acctID, userID, err := parseObjectID(auth)
337+
if err != nil {
338+
return []map[string]interface{}{}, err
339+
}
340+
341+
filter := bson.M{FieldID: bson.D{{"$in", oids}}}
342+
343+
secureRead(acctID, userID, auth.Role, col, filter)
344+
345+
cur, err := db.Collection(model.CleanCollectionName(col)).Find(mg.Ctx, filter)
346+
if err != nil {
347+
return docs, err
348+
}
349+
for cur.Next(mg.Ctx) {
350+
var v map[string]interface{}
351+
if err := cur.Decode(&v); err != nil {
352+
return []map[string]interface{}{}, err
353+
}
354+
cleanMap(v)
355+
docs = append(docs, v)
356+
}
357+
return
358+
}
359+
323360
func (mg *Mongo) UpdateDocument(auth model.Auth, dbName, col, id string, doc map[string]interface{}) (map[string]interface{}, error) {
324361
db := mg.Client.Database(dbName)
325362

@@ -377,6 +414,26 @@ func (mg *Mongo) UpdateDocuments(auth model.Auth, dbName, col string, filters ma
377414
secureWrite(acctID, userID, auth.Role, col, filters)
378415
removeNotEditableFields(updateFields)
379416

417+
var ids []string
418+
findOpts := options.Find().SetProjection(bson.D{{"id", 1}})
419+
cur, err := db.Collection(model.CleanCollectionName(col)).Find(mg.Ctx, filters, findOpts)
420+
if err != nil {
421+
return 0, err
422+
}
423+
for cur.Next(mg.Ctx) {
424+
var v map[string]interface{}
425+
if err := cur.Decode(&v); err != nil {
426+
mg.log.Error().Err(err).Msg("")
427+
}
428+
id, ok := v[FieldID].(primitive.ObjectID)
429+
if ok {
430+
ids = append(ids, id.Hex())
431+
}
432+
}
433+
if len(ids) == 0 {
434+
return 0, nil
435+
}
436+
380437
newProps := bson.M{}
381438
for k, v := range updateFields {
382439
newProps[k] = v
@@ -388,6 +445,16 @@ func (mg *Mongo) UpdateDocuments(auth model.Auth, dbName, col string, filters ma
388445
if err != nil {
389446
return 0, err
390447
}
448+
449+
go func() {
450+
docs, err := mg.GetDocumentsByIDs(auth, dbName, col, ids)
451+
if err != nil {
452+
mg.log.Error().Err(err).Msgf("the documents with ids=%#s are not received for publishDocument event", ids)
453+
}
454+
for _, doc := range docs {
455+
mg.PublishDocument("db-"+col, model.MsgTypeDBUpdated, doc)
456+
}
457+
}()
391458
return res.ModifiedCount, err
392459
}
393460

database/mongo/base_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,40 @@ func TestGetDocumentByID(t *testing.T) {
169169
}
170170
}
171171

172+
func TestGetDocumentsByIDs(t *testing.T) {
173+
input := []map[string]interface{}{newTask("getbyids1", false), newTask("getbyids1", false)}
174+
175+
var assertionTasks []map[string]interface{}
176+
var ids []string
177+
178+
for _, v := range input {
179+
res, err := datastore.CreateDocument(adminAuth, confDBName, colName, v)
180+
if err != nil {
181+
t.Fatal(err)
182+
}
183+
ids = append(ids, res["id"].(string))
184+
}
185+
186+
for _, v := range ids {
187+
m, err := datastore.GetDocumentByID(adminAuth, confDBName, colName, v)
188+
if err != nil {
189+
t.Fatal(err)
190+
}
191+
assertionTasks = append(assertionTasks, m)
192+
}
193+
194+
res, err := datastore.GetDocumentsByIDs(adminAuth, confDBName, colName, ids)
195+
if err != nil {
196+
t.Fatal(err)
197+
}
198+
if len(assertionTasks) != len(res) {
199+
t.Fatal("received incorrect number of documents")
200+
}
201+
if !reflect.DeepEqual(assertionTasks, res) {
202+
t.Errorf("Does not received expected tasks\nE: %v\nA: %v", assertionTasks, res)
203+
}
204+
}
205+
172206
func TestUpdateDocument(t *testing.T) {
173207
task1 := newTask("inserted", false)
174208

database/mongo/mongo_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package mongo
33
import (
44
"context"
55
"errors"
6+
"github.com/staticbackendhq/core/logger"
67
"log"
78
"os"
89
"testing"
@@ -51,6 +52,7 @@ func TestMain(m *testing.M) {
5152
Client: cl,
5253
Ctx: context.Background(),
5354
PublishDocument: fakePubDocEvent,
55+
log: logger.Get(config.Current),
5456
}
5557

5658
if err := datastore.Ping(); err != nil {

database/persister.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ type Persister interface {
8787
QueryDocuments(auth model.Auth, dbName, col string, filter map[string]interface{}, params model.ListParams) (model.PagedResult, error)
8888
// GetDocumentByID returns a record by its ID
8989
GetDocumentByID(auth model.Auth, dbName, col, id string) (map[string]interface{}, error)
90+
// GetDocumentsByIDs returns a list of records by multiple ids
91+
GetDocumentsByIDs(auth model.Auth, dbName, col string, ids []string) ([]map[string]interface{}, error)
9092
// UpdateDocument updates a full or partial record
9193
UpdateDocument(auth model.Auth, dbName, col, id string, doc map[string]interface{}) (map[string]interface{}, error)
9294
// UpdateDocuments updates multiple records matching filters

database/postgresql/base.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,35 @@ func (pg *PostgreSQL) GetDocumentByID(auth model.Auth, dbName, col, id string) (
233233
return doc.Data, nil
234234
}
235235

236+
func (pg *PostgreSQL) GetDocumentsByIDs(auth model.Auth, dbName, col string, ids []string) (docs []map[string]interface{}, err error) {
237+
where := secureRead(auth, col)
238+
239+
qry := fmt.Sprintf(`
240+
SELECT *
241+
FROM %s.%s
242+
%s AND id in ('%s'::uuid)
243+
`, dbName, model.CleanCollectionName(col), where, strings.Join(ids, "'::uuid,'"))
244+
245+
rows, err := pg.DB.Query(qry, auth.AccountID, auth.UserID)
246+
if err != nil {
247+
return []map[string]interface{}{}, err
248+
}
249+
defer rows.Close()
250+
251+
for rows.Next() {
252+
var doc Document
253+
if err = scanDocument(rows, &doc); err != nil {
254+
return []map[string]interface{}{}, err
255+
}
256+
257+
doc.Data[FieldID] = doc.ID
258+
doc.Data[FieldAccountID] = doc.AccountID
259+
docs = append(docs, doc.Data)
260+
}
261+
262+
return docs, nil
263+
}
264+
236265
func (pg *PostgreSQL) UpdateDocument(auth model.Auth, dbName, col, id string, doc map[string]interface{}) (map[string]interface{}, error) {
237266
where := secureWrite(auth, col)
238267

@@ -265,7 +294,30 @@ func (pg *PostgreSQL) UpdateDocuments(auth model.Auth, dbName, col string, filte
265294
where := secureWrite(auth, col)
266295
where = applyFilter(where, filters)
267296

297+
var ids []string
268298
qry := fmt.Sprintf(`
299+
SELECT id
300+
FROM %s.%s
301+
%s
302+
`, dbName, model.CleanCollectionName(col), where)
303+
304+
rows, err := pg.DB.Query(qry, auth.AccountID, auth.UserID)
305+
if err != nil {
306+
return
307+
}
308+
for rows.Next() {
309+
var id string
310+
if err := rows.Scan(&id); err != nil {
311+
pg.log.Error().Err(err).Msg("error occurred during scanning id for UpdateDocument event")
312+
continue
313+
}
314+
ids = append(ids, id)
315+
}
316+
if len(ids) == 0 {
317+
return 0, nil
318+
}
319+
320+
qry = fmt.Sprintf(`
269321
UPDATE %s.%s SET
270322
data = data || $3
271323
%s
@@ -283,6 +335,16 @@ func (pg *PostgreSQL) UpdateDocuments(auth model.Auth, dbName, col string, filte
283335
if err != nil {
284336
return 0, err
285337
}
338+
339+
go func() {
340+
docs, err := pg.GetDocumentsByIDs(auth, dbName, col, ids)
341+
if err != nil {
342+
pg.log.Error().Err(err).Msgf("the documents with ids=%#s are not received for publishDocument event", ids)
343+
}
344+
for _, doc := range docs {
345+
pg.PublishDocument("db-"+col, model.MsgTypeDBUpdated, doc)
346+
}
347+
}()
286348
return
287349
}
288350

database/postgresql/base_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,40 @@ func TestUpdateDocument(t *testing.T) {
209209
}
210210
}
211211

212+
func TestGetDocumentsByIDs(t *testing.T) {
213+
input := []map[string]interface{}{newTask("getbyids1", false), newTask("getbyids1", false)}
214+
215+
var assertionTasks []map[string]interface{}
216+
var ids []string
217+
218+
for _, v := range input {
219+
res, err := datastore.CreateDocument(adminAuth, confDBName, colName, v)
220+
if err != nil {
221+
t.Fatal(err)
222+
}
223+
ids = append(ids, res["id"].(string))
224+
}
225+
226+
for _, v := range ids {
227+
m, err := datastore.GetDocumentByID(adminAuth, confDBName, colName, v)
228+
if err != nil {
229+
t.Fatal(err)
230+
}
231+
assertionTasks = append(assertionTasks, m)
232+
}
233+
234+
res, err := datastore.GetDocumentsByIDs(adminAuth, confDBName, colName, ids)
235+
if err != nil {
236+
t.Fatal(err)
237+
}
238+
if len(assertionTasks) != len(res) {
239+
t.Fatal("received incorrect number of documents")
240+
}
241+
if !reflect.DeepEqual(assertionTasks, res) {
242+
t.Errorf("Does not received expected tasks\nE: %v\nA: %v", assertionTasks, res)
243+
}
244+
}
245+
212246
func TestUpdateDocuments(t *testing.T) {
213247
task1 := newTask("should be completed", false)
214248
task2 := newTask("should be completed", false)

db.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ func (database *Database) dbreq(w http.ResponseWriter, r *http.Request) {
2323
if r.Method == http.MethodPost {
2424
if len(r.URL.Query().Get("bulk")) > 0 {
2525
database.bulkAdd(w, r)
26+
} else if r.URL.Query().Has("ids") {
27+
database.getByIds(w, r)
2628
} else {
2729
database.add(w, r)
2830
}
@@ -203,6 +205,38 @@ func (database *Database) query(w http.ResponseWriter, r *http.Request) {
203205
respond(w, http.StatusOK, result)
204206
}
205207

208+
func (database *Database) getByIds(w http.ResponseWriter, r *http.Request) {
209+
conf, auth, err := middleware.Extract(r, true)
210+
if err != nil {
211+
http.Error(w, err.Error(), http.StatusBadRequest)
212+
return
213+
}
214+
215+
col := ""
216+
217+
_, r.URL.Path = ShiftPath(r.URL.Path)
218+
col, r.URL.Path = ShiftPath(r.URL.Path)
219+
220+
var ids []string
221+
if err := parseBody(r.Body, &ids); err != nil {
222+
http.Error(w, err.Error(), http.StatusBadRequest)
223+
return
224+
}
225+
226+
if len(ids) < 1 {
227+
http.Error(w, "ids list can not be empty", http.StatusBadRequest)
228+
return
229+
}
230+
231+
result, err := backend.DB.GetDocumentsByIDs(auth, conf.Name, col, ids)
232+
if err != nil {
233+
http.Error(w, err.Error(), http.StatusInternalServerError)
234+
return
235+
}
236+
237+
respond(w, http.StatusOK, result)
238+
}
239+
206240
func (database *Database) update(w http.ResponseWriter, r *http.Request) {
207241
conf, auth, err := middleware.Extract(r, true)
208242
if err != nil {

0 commit comments

Comments
 (0)