Skip to content

Commit 13c2e61

Browse files
nzolnierzmdbevergreen
authored andcommitted
SERVER-42942 M/R Agg: Implement translation for cluster mapReduce command
1 parent 40fb24a commit 13c2e61

27 files changed

+1520
-1333
lines changed

buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ selector:
4949
- jstests/sharding/track_unsharded_collections_rename_collection.js
5050
- jstests/sharding/banned_txn_databases_sharded.js
5151
- jstests/sharding/split_large_key.js
52+
- jstests/sharding/change_streams_establishment_finds_new_shards.js
5253
# Enable if SERVER-41813 is backported or 4.4 becomes last-stable
5354
- jstests/sharding/invalid_system_views_sharded_collection.js
5455

buildscripts/resmokeconfig/suites/sharding_map_reduce_agg.yaml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,16 @@ test_kind: js_test
44

55
selector:
66
roots:
7-
# Placeholder test to avoid failing in resmoke.
8-
- jstests/sharding/accurate_count_with_predicate.js
7+
- jstests/sharding/auth.js
8+
- jstests/sharding/authCommands.js
9+
- jstests/sharding/authmr.js
10+
- jstests/sharding/causal_consistency_shell_support.js
11+
- jstests/sharding/localhostAuthBypass.js
12+
- jstests/sharding/max_time_ms_sharded.js
13+
- jstests/sharding/mr_and_agg_versioning.js
14+
- jstests/sharding/mr_shard_version.js
15+
- jstests/sharding/query_config.js
16+
- jstests/sharding/shard_targeting.js
917

1018
executor:
1119
config:

jstests/aggregation/sharded_agg_cleanup_on_error.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ try {
8888
try {
8989
// Enable the failpoint to fail on establishing a merging shard cursor.
9090
assert.commandWorked(mongosDB.adminCommand({
91-
configureFailPoint: "clusterAggregateFailToEstablishMergingShardCursor",
91+
configureFailPoint: "shardedAggregateFailToEstablishMergingShardCursor",
9292
mode: "alwaysOn"
9393
}));
9494

@@ -103,13 +103,13 @@ try {
103103

104104
} finally {
105105
assert.commandWorked(mongosDB.adminCommand(
106-
{configureFailPoint: "clusterAggregateFailToEstablishMergingShardCursor", mode: "off"}));
106+
{configureFailPoint: "shardedAggregateFailToEstablishMergingShardCursor", mode: "off"}));
107107
}
108108

109109
// Test that aggregations involving $exchange correctly clean up the producer cursors.
110110
try {
111111
assert.commandWorked(mongosDB.adminCommand({
112-
configureFailPoint: "clusterAggregateFailToDispatchExchangeConsumerPipeline",
112+
configureFailPoint: "shardedAggregateFailToDispatchExchangeConsumerPipeline",
113113
mode: "alwaysOn"
114114
}));
115115

@@ -133,7 +133,7 @@ try {
133133

134134
} finally {
135135
assert.commandWorked(mongosDB.adminCommand({
136-
configureFailPoint: "clusterAggregateFailToDispatchExchangeConsumerPipeline",
136+
configureFailPoint: "shardedAggregateFailToDispatchExchangeConsumerPipeline",
137137
mode: "off"
138138
}));
139139
}

jstests/sharding/change_streams_establishment_finds_new_shards.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@ assert.commandWorked(mongos.adminCommand({split: mongosColl.getFullName(), middl
4040

4141
// Enable the failpoint.
4242
assert.commandWorked(mongos.adminCommand(
43-
{configureFailPoint: "clusterAggregateHangBeforeEstablishingShardCursors", mode: "alwaysOn"}));
43+
{configureFailPoint: "shardedAggregateHangBeforeEstablishingShardCursors", mode: "alwaysOn"}));
4444

4545
// While opening the cursor, wait for the failpoint and add the new shard.
4646
const awaitNewShard = startParallelShell(`
4747
load("jstests/libs/check_log.js");
4848
checkLog.contains(db,
49-
"clusterAggregateHangBeforeEstablishingShardCursors fail point enabled");
49+
"shardedAggregateHangBeforeEstablishingShardCursors fail point enabled");
5050
assert.commandWorked(
5151
db.adminCommand({addShard: "${newShard.getURL()}", name: "${newShard.name}"}));
5252
// Migrate the [10, MaxKey] chunk to "newShard".
@@ -56,7 +56,7 @@ const awaitNewShard = startParallelShell(`
5656
_waitForDelete: true}));
5757
assert.commandWorked(
5858
db.adminCommand(
59-
{configureFailPoint: "clusterAggregateHangBeforeEstablishingShardCursors",
59+
{configureFailPoint: "shardedAggregateHangBeforeEstablishingShardCursors",
6060
mode: "off"}));`,
6161
mongos.port);
6262

src/mongo/db/commands/SConscript

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ env.Library(
157157
'$BUILD_DIR/mongo/db/logical_session_cache',
158158
'$BUILD_DIR/mongo/db/logical_session_id_helpers',
159159
'$BUILD_DIR/mongo/db/logical_session_id',
160+
'$BUILD_DIR/mongo/db/pipeline/pipeline',
160161
'$BUILD_DIR/mongo/db/repl/isself',
161162
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
162163
'$BUILD_DIR/mongo/db/session_catalog',
@@ -508,10 +509,10 @@ env.Library(
508509
'map_reduce_agg.cpp',
509510
],
510511
LIBDEPS=[
511-
'$BUILD_DIR/mongo/db/db_raii',
512512
'$BUILD_DIR/mongo/idl/idl_parser',
513+
'$BUILD_DIR/mongo/db/commands/servers',
514+
'$BUILD_DIR/mongo/db/db_raii',
513515
'$BUILD_DIR/mongo/db/pipeline/mongo_process_interface',
514-
'$BUILD_DIR/mongo/db/pipeline/pipeline',
515516
'$BUILD_DIR/mongo/db/query/map_reduce_output_format',
516517
'map_reduce_parser'
517518
]
@@ -541,6 +542,7 @@ env.CppUnitTest(
541542
"map_reduce_parse_test.cpp",
542543
],
543544
LIBDEPS=[
545+
'$BUILD_DIR/mongo/db/auth/authmocks',
544546
'$BUILD_DIR/mongo/db/query/query_test_service_context',
545547
'map_reduce_agg',
546548
]

src/mongo/db/commands/map_reduce_agg.cpp

Lines changed: 2 additions & 167 deletions
Original file line numberDiff line numberDiff line change
@@ -41,155 +41,18 @@
4141
#include "mongo/db/commands.h"
4242
#include "mongo/db/commands/map_reduce_agg.h"
4343
#include "mongo/db/commands/map_reduce_javascript_code.h"
44+
#include "mongo/db/commands/mr_common.h"
4445
#include "mongo/db/db_raii.h"
4546
#include "mongo/db/exec/document_value/value.h"
4647
#include "mongo/db/namespace_string.h"
47-
#include "mongo/db/pipeline/document_source.h"
48-
#include "mongo/db/pipeline/document_source_group.h"
49-
#include "mongo/db/pipeline/document_source_limit.h"
50-
#include "mongo/db/pipeline/document_source_match.h"
51-
#include "mongo/db/pipeline/document_source_merge.h"
52-
#include "mongo/db/pipeline/document_source_out.h"
53-
#include "mongo/db/pipeline/document_source_project.h"
54-
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
55-
#include "mongo/db/pipeline/document_source_sort.h"
56-
#include "mongo/db/pipeline/document_source_unwind.h"
5748
#include "mongo/db/pipeline/expression.h"
58-
#include "mongo/db/pipeline/expression_javascript.h"
59-
#include "mongo/db/pipeline/parsed_aggregation_projection_node.h"
60-
#include "mongo/db/pipeline/parsed_inclusion_projection.h"
6149
#include "mongo/db/pipeline/pipeline_d.h"
6250
#include "mongo/db/query/map_reduce_output_format.h"
63-
#include "mongo/db/query/util/make_data_structure.h"
64-
#include "mongo/util/intrusive_counter.h"
6551

6652
namespace mongo::map_reduce_agg {
6753

6854
namespace {
6955

70-
using namespace std::string_literals;
71-
72-
auto translateSort(boost::intrusive_ptr<ExpressionContext> expCtx,
73-
const BSONObj& sort,
74-
const boost::optional<std::int64_t>& limit) {
75-
return DocumentSourceSort::create(expCtx, sort, limit.get_value_or(-1));
76-
}
77-
78-
auto translateMap(boost::intrusive_ptr<ExpressionContext> expCtx, std::string code) {
79-
auto emitExpression = ExpressionInternalJsEmit::create(
80-
expCtx, ExpressionFieldPath::parse(expCtx, "$$ROOT", expCtx->variablesParseState), code);
81-
auto node = std::make_unique<parsed_aggregation_projection::InclusionNode>(
82-
ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId});
83-
node->addExpressionForPath(FieldPath{"emits"s}, std::move(emitExpression));
84-
auto inclusion = std::unique_ptr<TransformerInterface>{
85-
std::make_unique<parsed_aggregation_projection::ParsedInclusionProjection>(
86-
expCtx,
87-
ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId},
88-
std::move(node))};
89-
return make_intrusive<DocumentSourceSingleDocumentTransformation>(
90-
expCtx, std::move(inclusion), DocumentSourceProject::kStageName, false);
91-
}
92-
93-
auto translateReduce(boost::intrusive_ptr<ExpressionContext> expCtx, std::string code) {
94-
auto accumulatorArguments = ExpressionObject::create(
95-
expCtx,
96-
make_vector<std::pair<std::string, boost::intrusive_ptr<Expression>>>(
97-
std::pair{"data"s,
98-
ExpressionFieldPath::parse(expCtx, "$emits", expCtx->variablesParseState)},
99-
std::pair{"eval"s, ExpressionConstant::create(expCtx, Value{code})}));
100-
auto jsReduce = AccumulationStatement{
101-
"value",
102-
std::move(accumulatorArguments),
103-
AccumulationStatement::getFactory(AccumulatorInternalJsReduce::kAccumulatorName)};
104-
auto groupExpr = ExpressionFieldPath::parse(expCtx, "$emits.k", expCtx->variablesParseState);
105-
return DocumentSourceGroup::create(expCtx,
106-
std::move(groupExpr),
107-
make_vector<AccumulationStatement>(std::move(jsReduce)),
108-
boost::none);
109-
}
110-
111-
auto translateFinalize(boost::intrusive_ptr<ExpressionContext> expCtx, std::string code) {
112-
auto jsExpression = ExpressionInternalJs::create(
113-
expCtx,
114-
ExpressionArray::create(
115-
expCtx,
116-
make_vector<boost::intrusive_ptr<Expression>>(
117-
ExpressionFieldPath::parse(expCtx, "$_id", expCtx->variablesParseState),
118-
ExpressionFieldPath::parse(expCtx, "$value", expCtx->variablesParseState))),
119-
code);
120-
auto node = std::make_unique<parsed_aggregation_projection::InclusionNode>(
121-
ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId});
122-
node->addExpressionForPath(FieldPath{"value"s}, std::move(jsExpression));
123-
auto inclusion = std::unique_ptr<TransformerInterface>{
124-
std::make_unique<parsed_aggregation_projection::ParsedInclusionProjection>(
125-
expCtx,
126-
ProjectionPolicies{ProjectionPolicies::DefaultIdPolicy::kExcludeId},
127-
std::move(node))};
128-
return make_intrusive<DocumentSourceSingleDocumentTransformation>(
129-
expCtx, std::move(inclusion), DocumentSourceProject::kStageName, false);
130-
}
131-
132-
auto translateOutReplace(boost::intrusive_ptr<ExpressionContext> expCtx,
133-
const StringData inputDatabase,
134-
NamespaceString targetNss) {
135-
uassert(31278,
136-
"MapReduce must output to the database belonging to its input collection - Input: "s +
137-
inputDatabase + "Output: " + targetNss.db(),
138-
inputDatabase == targetNss.db());
139-
return DocumentSourceOut::create(std::move(targetNss), expCtx);
140-
}
141-
142-
auto translateOutMerge(boost::intrusive_ptr<ExpressionContext> expCtx, NamespaceString targetNss) {
143-
return DocumentSourceMerge::create(targetNss,
144-
expCtx,
145-
MergeWhenMatchedModeEnum::kReplace,
146-
MergeWhenNotMatchedModeEnum::kInsert,
147-
boost::none, // Let variables
148-
boost::none, // pipeline
149-
std::set<FieldPath>{FieldPath("_id"s)},
150-
boost::none); // targetCollectionVersion
151-
}
152-
153-
auto translateOutReduce(boost::intrusive_ptr<ExpressionContext> expCtx,
154-
NamespaceString targetNss,
155-
std::string code) {
156-
// Because of communication for sharding, $merge must hold on to a serializable BSON object
157-
// at the moment so we reparse here.
158-
auto reduceObj = BSON("args" << BSON_ARRAY("$value"
159-
<< "$$new.value")
160-
<< "eval" << code);
161-
162-
auto finalProjectSpec =
163-
BSON(DocumentSourceProject::kStageName
164-
<< BSON("value" << BSON(ExpressionInternalJs::kExpressionName << reduceObj)));
165-
auto pipelineSpec = boost::make_optional(std::vector<BSONObj>{finalProjectSpec});
166-
return DocumentSourceMerge::create(targetNss,
167-
expCtx,
168-
MergeWhenMatchedModeEnum::kPipeline,
169-
MergeWhenNotMatchedModeEnum::kInsert,
170-
boost::none, // Let variables
171-
pipelineSpec,
172-
std::set<FieldPath>{FieldPath("_id"s)},
173-
boost::none); // targetCollectionVersion
174-
}
175-
176-
auto translateOut(boost::intrusive_ptr<ExpressionContext> expCtx,
177-
const OutputType outputType,
178-
const StringData inputDatabase,
179-
NamespaceString targetNss,
180-
std::string reduceCode) {
181-
switch (outputType) {
182-
case OutputType::Replace:
183-
return boost::make_optional(translateOutReplace(expCtx, inputDatabase, targetNss));
184-
case OutputType::Merge:
185-
return boost::make_optional(translateOutMerge(expCtx, targetNss));
186-
case OutputType::Reduce:
187-
return boost::make_optional(translateOutReduce(expCtx, targetNss, reduceCode));
188-
case OutputType::InMemory:;
189-
}
190-
return boost::optional<boost::intrusive_ptr<mongo::DocumentSource>>{};
191-
}
192-
19356
auto makeExpressionContext(OperationContext* opCtx, const MapReduce& parsedMr) {
19457
// AutoGetCollectionForReadCommand will throw if the sharding version for this connection is
19558
// out of date.
@@ -247,7 +110,7 @@ bool runAggregationMapReduce(OperationContext* opCtx,
247110
auto parsedMr = MapReduce::parse(IDLParserErrorContext("MapReduce"), cmd);
248111
auto expCtx = makeExpressionContext(opCtx, parsedMr);
249112
auto runnablePipeline = [&]() {
250-
auto pipeline = translateFromMR(parsedMr, expCtx);
113+
auto pipeline = map_reduce_common::translateFromMR(parsedMr, expCtx);
251114
return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead(
252115
expCtx, pipeline.release());
253116
}();
@@ -274,32 +137,4 @@ bool runAggregationMapReduce(OperationContext* opCtx,
274137
return true;
275138
}
276139

277-
std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR(
278-
MapReduce parsedMr, boost::intrusive_ptr<ExpressionContext> expCtx) {
279-
280-
// TODO: It would be good to figure out what kind of errors this would produce in the Status.
281-
// It would be better not to produce something incomprehensible out of an internal translation.
282-
return uassertStatusOK(Pipeline::create(
283-
makeFlattenedList<boost::intrusive_ptr<DocumentSource>>(
284-
parsedMr.getQuery().map(
285-
[&](auto&& query) { return DocumentSourceMatch::create(query, expCtx); }),
286-
parsedMr.getSort().map(
287-
[&](auto&& sort) { return translateSort(expCtx, sort, parsedMr.getLimit()); }),
288-
translateMap(expCtx, parsedMr.getMap().getCode()),
289-
DocumentSourceUnwind::create(expCtx, "emits", false, boost::none),
290-
translateReduce(expCtx, parsedMr.getReduce().getCode()),
291-
parsedMr.getFinalize().map([&](auto&& finalize) {
292-
return translateFinalize(expCtx, parsedMr.getFinalize()->getCode());
293-
}),
294-
translateOut(expCtx,
295-
parsedMr.getOutOptions().getOutputType(),
296-
parsedMr.getNamespace().db(),
297-
NamespaceString{parsedMr.getOutOptions().getDatabaseName()
298-
? *parsedMr.getOutOptions().getDatabaseName()
299-
: parsedMr.getNamespace().db(),
300-
parsedMr.getOutOptions().getCollectionName()},
301-
parsedMr.getReduce().getCode())),
302-
expCtx));
303-
}
304-
305140
} // namespace mongo::map_reduce_agg

src/mongo/db/commands/map_reduce_agg.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,4 @@ bool runAggregationMapReduce(OperationContext* opCtx,
4747
std::string& errmsg,
4848
BSONObjBuilder& result);
4949

50-
std::unique_ptr<Pipeline, PipelineDeleter> translateFromMR(
51-
MapReduce parsedMr, boost::intrusive_ptr<ExpressionContext> expCtx);
52-
5350
} // namespace mongo::map_reduce_agg

0 commit comments

Comments
 (0)