Skip to content

Commit fa7f710

Browse files
author
iwysiu
committed
GODRIVER-1149 migrate countDocuments to new aggregate
Change-Id: Ib378c7ec182993975cf769996c9b8865ce2a6f17
1 parent 5a9f2ce commit fa7f710

File tree

2 files changed

+70
-37
lines changed

2 files changed

+70
-37
lines changed

mongo/collection.go

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
2525
"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
2626
"go.mongodb.org/mongo-driver/x/mongo/driver/session"
27-
"go.mongodb.org/mongo-driver/x/mongo/driverlegacy"
2827
"go.mongodb.org/mongo-driver/x/network/command"
2928
)
3029

@@ -747,9 +746,14 @@ func (coll *Collection) CountDocuments(ctx context.Context, filter interface{},
747746
}
748747

749748
sess := sessionFromContext(ctx)
750-
751-
err = coll.client.validSession(sess)
752-
if err != nil {
749+
if sess == nil && coll.client.topology.SessionPool != nil {
750+
sess, err = session.NewClientSession(coll.client.topology.SessionPool, coll.client.id, session.Implicit)
751+
if err != nil {
752+
return 0, err
753+
}
754+
defer sess.EndSession()
755+
}
756+
if err = coll.client.validSession(sess); err != nil {
753757
return 0, err
754758
}
755759

@@ -758,27 +762,46 @@ func (coll *Collection) CountDocuments(ctx context.Context, filter interface{},
758762
rc = nil
759763
}
760764

761-
oldns := coll.namespace()
762-
cmd := command.CountDocuments{
763-
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
764-
Pipeline: pipelineArr,
765-
ReadPref: coll.readPreference,
766-
ReadConcern: rc,
767-
Session: sess,
768-
Clock: coll.client.clock,
765+
selector := makePinnedSelector(sess, coll.readSelector)
766+
767+
op := operation.NewAggregate(pipelineArr).Session(sess).ReadConcern(rc).ReadPreference(coll.readPreference).
768+
CommandMonitor(coll.client.monitor).ServerSelector(selector).ClusterClock(coll.client.clock).Database(coll.db.name).
769+
Collection(coll.name).Deployment(coll.client.topology)
770+
if countOpts.Collation != nil {
771+
op.Collation(bsoncore.Document(countOpts.Collation.ToDocument()))
772+
}
773+
if countOpts.MaxTime != nil {
774+
op.MaxTimeMS(int64(*countOpts.MaxTime / time.Millisecond))
775+
}
776+
if countOpts.Hint != nil {
777+
hintVal, err := transformValue(coll.registry, countOpts.Hint)
778+
if err != nil {
779+
return 0, err
780+
}
781+
op.Hint(hintVal)
782+
}
783+
784+
err = op.Execute(ctx)
785+
if err != nil {
786+
return 0, replaceErrors(err)
787+
}
788+
789+
batch := op.ResultCursorResponse().FirstBatch
790+
if batch == nil {
791+
return 0, errors.New("invalid response from server, no 'firstBatch' field")
769792
}
770793

771-
count, err := driverlegacy.CountDocuments(
772-
ctx, cmd,
773-
coll.client.topology,
774-
coll.readSelector,
775-
coll.client.id,
776-
coll.client.topology.SessionPool,
777-
coll.registry,
778-
countOpts,
779-
)
794+
docs, err := batch.Documents()
795+
if err != nil || len(docs) == 0 {
796+
return 0, nil
797+
}
798+
799+
val, ok := docs[0].Lookup("n").AsInt64OK()
800+
if !ok {
801+
return 0, errors.New("invalid response from server, no 'n' field")
802+
}
780803

781-
return count, replaceErrors(err)
804+
return val, nil
782805
}
783806

784807
// EstimatedDocumentCount gets an estimate of the count of documents in a collection using collection metadata.

mongo/mongo.go

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -348,31 +348,41 @@ func transformValue(registry *bsoncodec.Registry, val interface{}) (bsoncore.Val
348348
}
349349

350350
// Build the aggregation pipeline for the CountDocument command.
351-
func countDocumentsAggregatePipeline(registry *bsoncodec.Registry, filter interface{}, opts *options.CountOptions) (bsonx.Arr, error) {
352-
pipeline := bsonx.Arr{}
353-
filterDoc, err := transformDocument(registry, filter)
354-
351+
func countDocumentsAggregatePipeline(registry *bsoncodec.Registry, filter interface{}, opts *options.CountOptions) (bsoncore.Document, error) {
352+
filterDoc, err := transformBsoncoreDocument(registry, filter)
355353
if err != nil {
356354
return nil, err
357355
}
358-
pipeline = append(pipeline, bsonx.Document(bsonx.Doc{{"$match", bsonx.Document(filterDoc)}}))
359356

357+
aidx, arr := bsoncore.AppendArrayStart(nil)
358+
didx, arr := bsoncore.AppendDocumentElementStart(arr, strconv.Itoa(0))
359+
arr = bsoncore.AppendDocumentElement(arr, "$match", filterDoc)
360+
arr, _ = bsoncore.AppendDocumentEnd(arr, didx)
361+
362+
index := 1
360363
if opts != nil {
361364
if opts.Skip != nil {
362-
pipeline = append(pipeline, bsonx.Document(bsonx.Doc{{"$skip", bsonx.Int64(*opts.Skip)}}))
365+
didx, arr = bsoncore.AppendDocumentElementStart(arr, strconv.Itoa(index))
366+
arr = bsoncore.AppendInt64Element(arr, "$skip", *opts.Skip)
367+
arr, _ = bsoncore.AppendDocumentEnd(arr, didx)
368+
index++
363369
}
364370
if opts.Limit != nil {
365-
pipeline = append(pipeline, bsonx.Document(bsonx.Doc{{"$limit", bsonx.Int64(*opts.Limit)}}))
371+
didx, arr = bsoncore.AppendDocumentElementStart(arr, strconv.Itoa(index))
372+
arr = bsoncore.AppendInt64Element(arr, "$limit", *opts.Limit)
373+
arr, _ = bsoncore.AppendDocumentEnd(arr, didx)
374+
index++
366375
}
367376
}
368377

369-
pipeline = append(pipeline, bsonx.Document(bsonx.Doc{
370-
{"$group", bsonx.Document(bsonx.Doc{
371-
{"_id", bsonx.Int32(1)},
372-
{"n", bsonx.Document(bsonx.Doc{{"$sum", bsonx.Int32(1)}})},
373-
})},
374-
},
375-
))
378+
didx, arr = bsoncore.AppendDocumentElementStart(arr, strconv.Itoa(index))
379+
iidx, arr := bsoncore.AppendDocumentElementStart(arr, "$group")
380+
arr = bsoncore.AppendInt32Element(arr, "_id", 1)
381+
iiidx, arr := bsoncore.AppendDocumentElementStart(arr, "n")
382+
arr = bsoncore.AppendInt32Element(arr, "$sum", 1)
383+
arr, _ = bsoncore.AppendDocumentEnd(arr, iiidx)
384+
arr, _ = bsoncore.AppendDocumentEnd(arr, iidx)
385+
arr, _ = bsoncore.AppendDocumentEnd(arr, didx)
376386

377-
return pipeline, nil
387+
return bsoncore.AppendArrayEnd(arr, aidx)
378388
}

0 commit comments

Comments
 (0)