Skip to content

Commit 0028619

Browse files
csum112Evergreen Agent
authored andcommitted
SERVER-82128 Add deferred queue behaviour to $queue agg stage
1 parent 79e8901 commit 0028619

14 files changed

+352
-192
lines changed

jstests/core/query/query_settings/query_settings_debug_query_shape.js

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -127,16 +127,13 @@ runTest({
127127
},
128128
});
129129

130-
// TODO SERVER-82128 Uncomment this test case once the $querySettings recursion issue has been
131-
// addressed.
132-
133130
// Test the inception case: setting query settings on '$querySettings'.
134-
// runTest({
135-
// queryInstance: qsutils.makeAggregateQueryInstance(
136-
// /* pipeline */[{$querySettings: {showDebugQueryShape: true}}], /* collName */ 1),
137-
// expectedDebugQueryShape: {
138-
// cmdNs: {db: db.getName(), coll: "$cmd.aggregate"},
139-
// command: "aggregate",
140-
// pipeline: [{"$queue": "[]"}],
141-
// },
142-
// });
131+
runTest({
132+
queryInstance: qsutils.makeAggregateQueryInstance(
133+
/* pipeline */[{$querySettings: {showDebugQueryShape: true}}], /* collName */ 1),
134+
expectedDebugQueryShape: {
135+
cmdNs: {db: db.getName(), coll: "$cmd.aggregate"},
136+
command: "aggregate",
137+
pipeline: [{$querySettings: {showDebugQueryShape: true}}]
138+
},
139+
});

src/mongo/db/pipeline/document_source_index_stats.cpp

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@
3030
#include "mongo/db/pipeline/document_source_index_stats.h"
3131

3232
#include <boost/smart_ptr/intrusive_ptr.hpp>
33+
#include <iterator>
3334

3435
#include "mongo/bson/bsonobj.h"
3536
#include "mongo/bson/bsontypes.h"
3637
#include "mongo/db/cluster_role.h"
38+
#include "mongo/db/pipeline/document_source_queue.h"
3739
#include "mongo/db/pipeline/process_interface/mongo_process_interface.h"
3840
#include "mongo/db/query/allowed_contexts.h"
3941
#include "mongo/db/server_options.h"
@@ -50,41 +52,37 @@ REGISTER_DOCUMENT_SOURCE(indexStats,
5052
DocumentSourceIndexStats::createFromBson,
5153
AllowedWithApiStrict::kNeverInVersion1);
5254

53-
const char* DocumentSourceIndexStats::getSourceName() const {
54-
return kStageName.rawData();
55-
}
56-
57-
DocumentSource::GetNextResult DocumentSourceIndexStats::doGetNext() {
58-
if (_indexStats.empty()) {
59-
_indexStats = pExpCtx->mongoProcessInterface->getIndexStats(
60-
pExpCtx->opCtx,
61-
pExpCtx->ns,
62-
_processName,
63-
!serverGlobalParams.clusterRole.has(ClusterRole::None));
64-
_indexStatsIter = _indexStats.cbegin();
65-
}
66-
67-
if (_indexStatsIter != _indexStats.cend()) {
68-
Document doc{*_indexStatsIter};
69-
++_indexStatsIter;
70-
return doc;
71-
}
72-
73-
return GetNextResult::makeEOF();
74-
}
75-
76-
DocumentSourceIndexStats::DocumentSourceIndexStats(const intrusive_ptr<ExpressionContext>& pExpCtx)
77-
: DocumentSource(kStageName, pExpCtx), _processName(getHostNameCachedAndPort()) {}
7855

56+
// Implements 'DocumentSourceIndexStats' based on a shard-only 'DocumentSourceQueue' stage.
7957
intrusive_ptr<DocumentSource> DocumentSourceIndexStats::createFromBson(
8058
BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
8159
uassert(28803,
8260
"The $indexStats stage specification must be an empty object",
8361
elem.type() == Object && elem.Obj().isEmpty());
84-
return new DocumentSourceIndexStats(pExpCtx);
85-
}
8662

87-
Value DocumentSourceIndexStats::serialize(const SerializationOptions& opts) const {
88-
return Value(DOC(getSourceName() << Document()));
63+
// Get the index stats for the current shard and map them over a deferred queue. The queue won't
64+
// be populated until reaching the shards due to the host type requirement.
65+
DocumentSourceQueue::DeferredQueue deferredQueue{[pExpCtx]() {
66+
auto indexStats = pExpCtx->mongoProcessInterface->getIndexStats(
67+
pExpCtx->opCtx,
68+
pExpCtx->ns,
69+
getHostNameCachedAndPort(),
70+
!serverGlobalParams.clusterRole.has(ClusterRole::None));
71+
std::deque<DocumentSource::GetNextResult> queue;
72+
std::copy(std::make_move_iterator(indexStats.begin()),
73+
std::make_move_iterator(indexStats.end()),
74+
std::back_inserter(queue));
75+
return queue;
76+
}};
77+
78+
// Since the deferred queue needs to be initialized only on shards, the default
79+
// 'DocumentSourceQueue::serialize()' method needs to be avoided, so a 'serializeOverride' is
80+
// provided. Without this, 'DocumentSourceQueue::serialize()' will trigger the deferred queue
81+
// initialization on 'mongos' instances, leading to 'MONGO_UNREACHEABLE'.
82+
return make_intrusive<DocumentSourceQueue>(std::move(deferredQueue),
83+
pExpCtx,
84+
/* stageNameOverride */ kStageName,
85+
/* serializeOverride*/ Value{elem.wrap()},
86+
/* constraintsOverride */ constraints());
8987
}
9088
} // namespace mongo

src/mongo/db/pipeline/document_source_index_stats.h

Lines changed: 17 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ namespace mongo {
6262
* Provides a document source interface to retrieve index statistics for a given namespace.
6363
* Each document returned represents a single index and mongod instance.
6464
*/
65-
class DocumentSourceIndexStats final : public DocumentSource {
65+
class DocumentSourceIndexStats final {
6666
public:
6767
static constexpr StringData kStageName = "$indexStats"_sd;
6868

@@ -97,40 +97,28 @@ class DocumentSourceIndexStats final : public DocumentSource {
9797
const NamespaceString _nss;
9898
};
9999

100-
// virtuals from DocumentSource
101-
const char* getSourceName() const final;
102-
Value serialize(const SerializationOptions& opts = SerializationOptions{}) const final override;
103-
104-
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
105-
StageConstraints constraints(StreamType::kStreaming,
106-
PositionRequirement::kFirst,
107-
HostTypeRequirement::kAnyShard,
108-
DiskUseRequirement::kNoDiskUse,
109-
FacetRequirement::kNotAllowed,
110-
TransactionRequirement::kNotAllowed,
111-
LookupRequirement::kAllowed,
112-
UnionRequirement::kAllowed);
113-
100+
/**
101+
* Returns the stage constraints used to override 'DocumentSourceQueue'.
102+
*
103+
* This stage must be executed on each and every shard. Trying to call
104+
* 'MongoProcessInterface::getIndexStats()' on a 'mongos' instance will result in
105+
* 'MONGO_UNREACHEABLE'.
106+
*/
107+
static StageConstraints constraints() {
108+
StageConstraints constraints(DocumentSource::StreamType::kStreaming,
109+
DocumentSource::PositionRequirement::kFirst,
110+
DocumentSource::HostTypeRequirement::kAnyShard,
111+
DocumentSource::DiskUseRequirement::kNoDiskUse,
112+
DocumentSource::FacetRequirement::kNotAllowed,
113+
DocumentSource::TransactionRequirement::kNotAllowed,
114+
DocumentSource::LookupRequirement::kAllowed,
115+
DocumentSource::UnionRequirement::kAllowed);
114116
constraints.requiresInputDocSource = false;
115117
return constraints;
116118
}
117119

118-
boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
119-
return boost::none;
120-
}
121-
122-
void addVariableRefs(std::set<Variables::Id>* refs) const final {}
123-
124120
static boost::intrusive_ptr<DocumentSource> createFromBson(
125121
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
126-
127-
private:
128-
DocumentSourceIndexStats(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
129-
GetNextResult doGetNext() final;
130-
131-
std::vector<Document> _indexStats;
132-
std::vector<Document>::const_iterator _indexStatsIter;
133-
std::string _processName;
134122
};
135123

136124
} // namespace mongo

src/mongo/db/pipeline/document_source_list_cached_and_active_users.cpp

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -30,55 +30,42 @@
3030
#include "mongo/db/pipeline/document_source_list_cached_and_active_users.h"
3131

3232
#include <boost/smart_ptr/intrusive_ptr.hpp>
33+
#include <iterator>
3334

3435
#include "mongo/base/error_codes.h"
3536
#include "mongo/bson/bsonmisc.h"
3637
#include "mongo/bson/bsonobj.h"
3738
#include "mongo/bson/bsonobjbuilder.h"
3839
#include "mongo/db/auth/user_name.h"
3940
#include "mongo/db/operation_context.h"
41+
#include "mongo/db/pipeline/document_source_queue.h"
4042
#include "mongo/util/assert_util.h"
4143
#include "mongo/util/intrusive_counter.h"
4244
#include "mongo/util/str.h"
4345

4446
namespace mongo {
47+
namespace {
48+
Document makeDocumentFromCachedUserInfo(const AuthorizationManager::CachedUserInfo& user) {
49+
return Document(BSON("username" << user.userName.getUser() << "db" << user.userName.getDB()
50+
<< "active" << user.active));
51+
}
52+
} // namespace
4553

4654
REGISTER_TEST_DOCUMENT_SOURCE(listCachedAndActiveUsers,
4755
DocumentSourceListCachedAndActiveUsers::LiteParsed::parse,
4856
DocumentSourceListCachedAndActiveUsers::createFromBson);
4957

50-
DocumentSource::GetNextResult DocumentSourceListCachedAndActiveUsers::doGetNext() {
51-
if (!_users.empty()) {
52-
const auto info = std::move(_users.back());
53-
_users.pop_back();
54-
return Document(BSON("username" << info.userName.getUser() << "db" << info.userName.getDB()
55-
<< "active" << info.active));
56-
}
57-
58-
return GetNextResult::makeEOF();
59-
}
60-
6158
boost::intrusive_ptr<DocumentSource> DocumentSourceListCachedAndActiveUsers::createFromBson(
6259
BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) {
63-
64-
uassert(
65-
ErrorCodes::InvalidNamespace,
66-
str::stream() << kStageName
67-
<< " must be run against the database with {aggregate: 1}, not a collection",
68-
pExpCtx->ns.isCollectionlessAggregateNS());
69-
7060
uassert(ErrorCodes::BadValue,
7161
str::stream() << kStageName << " must be run as { " << kStageName << ": {}}",
7262
spec.isABSONObj() && spec.Obj().isEmpty());
63+
auto users = AuthorizationManager::get(pExpCtx->opCtx->getServiceContext())->getUserCacheInfo();
64+
std::deque<DocumentSource::GetNextResult> queue;
65+
std::transform(
66+
users.begin(), users.end(), std::back_inserter(queue), makeDocumentFromCachedUserInfo);
7367

74-
return new DocumentSourceListCachedAndActiveUsers(pExpCtx);
75-
}
76-
77-
DocumentSourceListCachedAndActiveUsers::DocumentSourceListCachedAndActiveUsers(
78-
const boost::intrusive_ptr<ExpressionContext>& pExpCtx)
79-
: DocumentSource(kStageName, pExpCtx), _users() {
80-
auto authMgr = AuthorizationManager::get(pExpCtx->opCtx->getServiceContext());
81-
_users = authMgr->getUserCacheInfo();
68+
return make_intrusive<DocumentSourceQueue>(std::move(queue), pExpCtx, kStageName);
8269
}
8370

8471
} // namespace mongo

src/mongo/db/pipeline/document_source_list_cached_and_active_users.h

Lines changed: 1 addition & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ namespace mongo {
6767
* This implements an aggregation document source that lists the active/cached users in the
6868
* authorization manager. It is intended for diagnostic and reporting purposes.
6969
*/
70-
class DocumentSourceListCachedAndActiveUsers final : public DocumentSource {
70+
class DocumentSourceListCachedAndActiveUsers final {
7171
public:
7272
static constexpr StringData kStageName = "$listCachedAndActiveUsers"_sd;
7373

@@ -109,45 +109,8 @@ class DocumentSourceListCachedAndActiveUsers final : public DocumentSource {
109109
const Privilege _requiredPrivilege;
110110
};
111111

112-
const char* getSourceName() const final {
113-
return kStageName.rawData();
114-
}
115-
116-
Value serialize(
117-
const SerializationOptions& opts = SerializationOptions{}) const final override {
118-
return Value(Document{{getSourceName(), Document{}}});
119-
}
120-
121-
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
122-
StageConstraints constraints(StreamType::kStreaming,
123-
PositionRequirement::kFirst,
124-
HostTypeRequirement::kLocalOnly,
125-
DiskUseRequirement::kNoDiskUse,
126-
FacetRequirement::kNotAllowed,
127-
TransactionRequirement::kNotAllowed,
128-
LookupRequirement::kAllowed,
129-
UnionRequirement::kNotAllowed);
130-
131-
constraints.isIndependentOfAnyCollection = true;
132-
constraints.requiresInputDocSource = false;
133-
return constraints;
134-
}
135-
136-
boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
137-
return boost::none;
138-
}
139-
140-
void addVariableRefs(std::set<Variables::Id>* refs) const final {}
141-
142112
static boost::intrusive_ptr<DocumentSource> createFromBson(
143113
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
144-
145-
private:
146-
DocumentSourceListCachedAndActiveUsers(const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
147-
148-
GetNextResult doGetNext() final;
149-
150-
std::vector<AuthorizationManager::CachedUserInfo> _users;
151114
};
152115

153116
} // namespace mongo

0 commit comments

Comments
 (0)