Skip to content
This repository was archived by the owner on Sep 2, 2024. It is now read-only.

Commit 892e4a5

Browse files
committed
add publish doc events in bulkUpdate func
1 parent ac24cc3 commit 892e4a5

File tree

3 files changed

+67
-0
lines changed

3 files changed

+67
-0
lines changed

database/mongo/base.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,26 @@ func (mg *Mongo) UpdateDocuments(auth model.Auth, dbName, col string, filters ma
377377
secureWrite(acctID, userID, auth.Role, col, filters)
378378
removeNotEditableFields(updateFields)
379379

380+
var ids []string
381+
findOpts := options.Find().SetProjection(bson.D{{"id", 1}})
382+
cur, err := db.Collection(internal.CleanCollectionName(col)).Find(mg.Ctx, filters, findOpts)
383+
if err != nil {
384+
return 0, err
385+
}
386+
for cur.Next(mg.Ctx) {
387+
var v map[string]interface{}
388+
if err := cur.Decode(&v); err != nil {
389+
mg.log.Error().Err(err).Msg("")
390+
}
391+
id, ok := v[FieldID].(primitive.ObjectID)
392+
if ok {
393+
ids = append(ids, id.Hex())
394+
}
395+
}
396+
if len(ids) == 0 {
397+
return 0, nil
398+
}
399+
380400
newProps := bson.M{}
381401
for k, v := range updateFields {
382402
newProps[k] = v
@@ -388,6 +408,17 @@ func (mg *Mongo) UpdateDocuments(auth model.Auth, dbName, col string, filters ma
388408
if err != nil {
389409
return 0, err
390410
}
411+
412+
go func() {
413+
for _, id := range ids {
414+
doc, err := mg.GetDocumentByID(auth, dbName, col, id)
415+
if err != nil {
416+
mg.log.Error().Err(err).Msgf("the document with id=%s is not received for publishDocument event", id)
417+
continue
418+
}
419+
mg.PublishDocument("db-"+col, internal.MsgTypeDBUpdated, doc)
420+
}
421+
}()
391422
return res.ModifiedCount, err
392423
}
393424

database/mongo/mongo_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package mongo
33
import (
44
"context"
55
"errors"
6+
"github.com/staticbackendhq/core/logger"
67
"log"
78
"os"
89
"testing"
@@ -51,6 +52,7 @@ func TestMain(m *testing.M) {
5152
Client: cl,
5253
Ctx: context.Background(),
5354
PublishDocument: fakePubDocEvent,
55+
log: logger.Get(config.Current),
5456
}
5557

5658
if err := datastore.Ping(); err != nil {

database/postgresql/base.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,30 @@ func (pg *PostgreSQL) UpdateDocuments(auth model.Auth, dbName, col string, filte
265265
where := secureWrite(auth, col)
266266
where = applyFilter(where, filters)
267267

268+
var idsForUpdate []string
268269
qry := fmt.Sprintf(`
270+
SELECT id
271+
FROM %s.%s
272+
%s
273+
`, dbName, internal.CleanCollectionName(col), where)
274+
275+
rows, err := pg.DB.Query(qry, auth.AccountID, auth.UserID)
276+
if err != nil {
277+
return
278+
}
279+
for rows.Next() {
280+
var id string
281+
if err := rows.Scan(&id); err != nil {
282+
pg.log.Error().Err(err).Msg("error occurred during scanning id for UpdateDocument event")
283+
continue
284+
}
285+
idsForUpdate = append(idsForUpdate, id)
286+
}
287+
if len(idsForUpdate) == 0 {
288+
return 0, nil
289+
}
290+
291+
qry = fmt.Sprintf(`
269292
UPDATE %s.%s SET
270293
data = data || $3
271294
%s
@@ -283,6 +306,17 @@ func (pg *PostgreSQL) UpdateDocuments(auth model.Auth, dbName, col string, filte
283306
if err != nil {
284307
return 0, err
285308
}
309+
310+
go func() {
311+
for _, id := range idsForUpdate {
312+
doc, err := pg.GetDocumentByID(auth, dbName, col, id)
313+
if err != nil {
314+
pg.log.Error().Err(err).Msgf("the document with id=%s is not received for publishDocument event", id)
315+
continue
316+
}
317+
pg.PublishDocument("db-"+col, internal.MsgTypeDBUpdated, doc)
318+
}
319+
}()
286320
return
287321
}
288322

0 commit comments

Comments
 (0)