Skip to content

Commit 8749786

Browse files
m4nti5Evergreen Agent
authored andcommitted
SERVER-81229 Add replay protection to clone command of move primary
1 parent 8b8ea96 commit 8749786

File tree

3 files changed

+117
-64
lines changed

3 files changed

+117
-64
lines changed

src/mongo/db/s/clone_catalog_data_command.cpp

Lines changed: 84 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,22 @@
3939
#include "mongo/db/auth/action_type.h"
4040
#include "mongo/db/auth/authorization_session.h"
4141
#include "mongo/db/auth/resource_pattern.h"
42+
#include "mongo/db/cancelable_operation_context.h"
4243
#include "mongo/db/catalog/document_validation.h"
4344
#include "mongo/db/cloner.h"
4445
#include "mongo/db/cluster_role.h"
4546
#include "mongo/db/commands.h"
4647
#include "mongo/db/commands/feature_compatibility_version.h"
4748
#include "mongo/db/database_name.h"
49+
#include "mongo/db/dbdirectclient.h"
4850
#include "mongo/db/namespace_string.h"
4951
#include "mongo/db/operation_context.h"
5052
#include "mongo/db/repl/read_concern_level.h"
5153
#include "mongo/db/s/sharding_state.h"
5254
#include "mongo/db/server_options.h"
5355
#include "mongo/db/service_context.h"
56+
#include "mongo/db/transaction/transaction_participant.h"
57+
#include "mongo/db/write_block_bypass.h"
5458
#include "mongo/idl/idl_parser.h"
5559
#include "mongo/s/catalog/sharding_catalog_client.h"
5660
#include "mongo/s/grid.h"
@@ -66,6 +70,50 @@
6670
namespace mongo {
6771
namespace {
6872

73+
void cloneDatabase(OperationContext* opCtx,
74+
const DatabaseName& dbName,
75+
StringData from,
76+
BSONObjBuilder& result) {
77+
auto const catalogClient = Grid::get(opCtx)->catalogClient();
78+
auto shardedOrUntrackedColls = catalogClient->getShardedCollectionNamespacesForDb(
79+
opCtx, dbName, repl::ReadConcernLevel::kMajorityReadConcern, {});
80+
const auto databasePrimary =
81+
catalogClient->getDatabase(opCtx, dbName, repl::ReadConcernLevel::kMajorityReadConcern)
82+
.getPrimary()
83+
.toString();
84+
auto unsplittableCollsOutsideDbPrimary =
85+
catalogClient->getUnsplittableCollectionNamespacesForDbOutsideOfShards(
86+
opCtx, dbName, {databasePrimary}, repl::ReadConcernLevel::kMajorityReadConcern);
87+
88+
std::move(unsplittableCollsOutsideDbPrimary.begin(),
89+
unsplittableCollsOutsideDbPrimary.end(),
90+
std::back_inserter(shardedOrUntrackedColls));
91+
92+
DisableDocumentValidation disableValidation(opCtx);
93+
94+
// Clone the non-ignored collections.
95+
std::set<std::string> clonedColls;
96+
bool forceSameUUIDAsSource = false;
97+
{
98+
FixedFCVRegion fcvRegion{opCtx};
99+
forceSameUUIDAsSource =
100+
feature_flags::gTrackUnshardedCollectionsOnShardingCatalog.isEnabled(
101+
(*fcvRegion).acquireFCVSnapshot());
102+
}
103+
104+
Cloner cloner;
105+
uassertStatusOK(cloner.copyDb(opCtx,
106+
dbName,
107+
from.toString(),
108+
shardedOrUntrackedColls,
109+
forceSameUUIDAsSource,
110+
&clonedColls));
111+
{
112+
BSONArrayBuilder cloneBarr = result.subarrayStart("clonedColls");
113+
cloneBarr.append(clonedColls);
114+
}
115+
}
116+
69117
/**
70118
* Currently, _shardsvrCloneCatalogData will clone all data (including metadata). In the second part
71119
* of
@@ -93,6 +141,10 @@ class CloneCatalogDataCommand : public BasicCommand {
93141
return true;
94142
}
95143

144+
virtual bool supportsRetryableWrite() const override {
145+
return true;
146+
}
147+
96148
Status checkAuthForOperation(OperationContext* opCtx,
97149
const DatabaseName& dbName,
98150
const BSONObj&) const override {
@@ -110,7 +162,6 @@ class CloneCatalogDataCommand : public BasicCommand {
110162
const DatabaseName&,
111163
const BSONObj& cmdObj,
112164
BSONObjBuilder& result) override {
113-
114165
auto shardingState = ShardingState::get(opCtx);
115166
uassertStatusOK(shardingState->canAcceptShardedCommands());
116167

@@ -139,45 +190,39 @@ class CloneCatalogDataCommand : public BasicCommand {
139190
str::stream() << "Can't run _shardsvrCloneCatalogData without a source",
140191
!from.empty());
141192

142-
auto const catalogClient = Grid::get(opCtx)->catalogClient();
143-
auto shardedOrUntrackedColls = catalogClient->getShardedCollectionNamespacesForDb(
144-
opCtx, dbName, repl::ReadConcernLevel::kMajorityReadConcern, {});
145-
const auto databasePrimary =
146-
catalogClient->getDatabase(opCtx, dbName, repl::ReadConcernLevel::kMajorityReadConcern)
147-
.getPrimary()
148-
.toString();
149-
auto unsplittableCollsOutsideDbPrimary =
150-
catalogClient->getUnsplittableCollectionNamespacesForDbOutsideOfShards(
151-
opCtx, dbName, {databasePrimary}, repl::ReadConcernLevel::kMajorityReadConcern);
152-
153-
std::move(unsplittableCollsOutsideDbPrimary.begin(),
154-
unsplittableCollsOutsideDbPrimary.end(),
155-
std::back_inserter(shardedOrUntrackedColls));
156-
157-
DisableDocumentValidation disableValidation(opCtx);
158-
159-
// Clone the non-ignored collections.
160-
std::set<std::string> clonedColls;
161-
bool forceSameUUIDAsSource = false;
162-
{
163-
FixedFCVRegion fcvRegion{opCtx};
164-
forceSameUUIDAsSource =
165-
feature_flags::gTrackUnshardedCollectionsOnShardingCatalog.isEnabled(
166-
(*fcvRegion).acquireFCVSnapshot());
193+
// For newer versions, execute the operation in another operation context with local write
194+
// concern to prevent doing waits while we're holding resources (we have a session checked
195+
// out).
196+
if (TransactionParticipant::get(opCtx)) {
197+
{
198+
// Use ACR to have a thread holding the session while we do the cloning.
199+
auto newClient = opCtx->getServiceContext()
200+
->getService(ClusterRole::ShardServer)
201+
->makeClient("SetAllowMigrations");
202+
AlternativeClientRegion acr(newClient);
203+
auto executor =
204+
Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
205+
auto newOpCtxPtr = CancelableOperationContext(
206+
cc().makeOperationContext(), opCtx->getCancellationToken(), executor);
207+
208+
AuthorizationSession::get(newOpCtxPtr.get()->getClient())
209+
->grantInternalAuthorization(newOpCtxPtr.get()->getClient());
210+
newOpCtxPtr->setWriteConcern(ShardingCatalogClient::kLocalWriteConcern);
211+
WriteBlockBypass::get(newOpCtxPtr.get()).set(true);
212+
cloneDatabase(newOpCtxPtr.get(), dbName, from, result);
213+
}
214+
// Since no write happened on this txnNumber, we need to make a dummy write to protect
215+
// against older requests with old txnNumbers.
216+
DBDirectClient client(opCtx);
217+
client.update(NamespaceString::kServerConfigurationNamespace,
218+
BSON("_id"
219+
<< "CloneCatalogDataStats"),
220+
BSON("$inc" << BSON("count" << 1)),
221+
true /* upsert */,
222+
false /* multi */);
223+
} else {
224+
cloneDatabase(opCtx, dbName, from, result);
167225
}
168-
169-
Cloner cloner;
170-
uassertStatusOK(cloner.copyDb(opCtx,
171-
dbName,
172-
from.toString(),
173-
shardedOrUntrackedColls,
174-
forceSameUUIDAsSource,
175-
&clonedColls));
176-
{
177-
BSONArrayBuilder cloneBarr = result.subarrayStart("clonedColls");
178-
cloneBarr.append(clonedColls);
179-
}
180-
181226
return true;
182227
}
183228
};

src/mongo/db/s/move_primary_coordinator.cpp

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -518,8 +518,7 @@ void MovePrimaryCoordinator::assertNoOrphanedDataOnRecipient(
518518
};
519519
}
520520

521-
std::vector<NamespaceString> MovePrimaryCoordinator::cloneDataToRecipient(
522-
OperationContext* opCtx) const {
521+
std::vector<NamespaceString> MovePrimaryCoordinator::cloneDataToRecipient(OperationContext* opCtx) {
523522
// Enable write blocking bypass to allow cloning of catalog data even if writes are disallowed.
524523
WriteBlockBypass::get(opCtx).set(true);
525524

@@ -530,28 +529,31 @@ std::vector<NamespaceString> MovePrimaryCoordinator::cloneDataToRecipient(
530529
uassertStatusOK(shardRegistry->getShard(opCtx, ShardingState::get(opCtx)->shardId()));
531530
const auto toShard = uassertStatusOK(shardRegistry->getShard(opCtx, toShardId));
532531

533-
const auto cloneCommand = [&] {
532+
auto cloneCommand = [&](boost::optional<OperationSessionInfo> osi) {
534533
BSONObjBuilder commandBuilder;
535534
commandBuilder.append(
536535
"_shardsvrCloneCatalogData",
537536
DatabaseNameUtil::serialize(_dbName, SerializationContext::stateDefault()));
538537
commandBuilder.append("from", fromShard->getConnString().toString());
538+
if (osi.is_initialized()) {
539+
commandBuilder.appendElements(osi->toBSON());
540+
}
539541
return CommandHelpers::appendMajorityWriteConcern(commandBuilder.obj());
540-
}();
542+
};
541543

542-
const auto cloneResponse =
543-
toShard->runCommand(opCtx,
544-
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
545-
DatabaseName::kAdmin,
546-
cloneCommand,
547-
Shard::RetryPolicy::kNoRetry);
544+
auto clonedCollections = [&](const BSONObj& command) {
545+
const auto cloneResponse =
546+
toShard->runCommand(opCtx,
547+
ReadPreferenceSetting(ReadPreference::PrimaryOnly),
548+
DatabaseName::kAdmin,
549+
command,
550+
Shard::RetryPolicy::kNoRetry);
548551

549-
uassertStatusOKWithContext(
550-
Shard::CommandResponse::getEffectiveStatus(cloneResponse),
551-
"movePrimary operation on database {} failed to clone data to recipient {}"_format(
552-
_dbName.toStringForErrorMsg(), toShardId.toString()));
552+
uassertStatusOKWithContext(
553+
Shard::CommandResponse::getEffectiveStatus(cloneResponse),
554+
"movePrimary operation on database {} failed to clone data to recipient {}"_format(
555+
_dbName.toStringForErrorMsg(), toShardId.toString()));
553556

554-
auto clonedCollections = [&] {
555557
std::vector<NamespaceString> colls;
556558
for (const auto& bsonElem : cloneResponse.getValue().response["clonedColls"].Obj()) {
557559
if (bsonElem.type() == String) {
@@ -562,8 +564,15 @@ std::vector<NamespaceString> MovePrimaryCoordinator::cloneDataToRecipient(
562564

563565
std::sort(colls.begin(), colls.end());
564566
return colls;
565-
}();
566-
return clonedCollections;
567+
};
568+
569+
try {
570+
return clonedCollections(cloneCommand(getNewSession(opCtx)));
571+
} catch (const ExceptionFor<ErrorCodes::NotARetryableWriteCommand>&) {
572+
// TODO SERVER-83213: we're dealing with an older binary version, retry without the OSI
573+
// protection. Remove once 8.0 is last-lts.
574+
return clonedCollections(cloneCommand(boost::none));
575+
}
567576
}
568577

569578
void MovePrimaryCoordinator::assertClonedData(

src/mongo/db/s/move_primary_coordinator.h

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,13 @@ class MovePrimaryCoordinator final
8787
*/
8888
void cloneData(OperationContext* opCtx);
8989

90+
/**
91+
* Requests to the recipient to clone all the collections of the given database currently owned
92+
* by this shard. Once the cloning is complete, the recipient returns the list of the actually
93+
* cloned collections.
94+
*/
95+
std::vector<NamespaceString> cloneDataToRecipient(OperationContext* opCtx);
96+
9097
/**
9198
* Logs in the `config.changelog` collection a specific event for `movePrimary` operations.
9299
*/
@@ -107,14 +114,6 @@ class MovePrimaryCoordinator final
107114
void assertNoOrphanedDataOnRecipient(
108115
OperationContext* opCtx, const std::vector<NamespaceString>& collectionsToClone) const;
109116

110-
111-
/**
112-
* Requests to the recipient to clone all the collections of the given database currently owned
113-
* by this shard. Once the cloning is complete, the recipient returns the list of the actually
114-
* cloned collections.
115-
*/
116-
std::vector<NamespaceString> cloneDataToRecipient(OperationContext* opCtx) const;
117-
118117
/**
119118
* Ensures that the list of actually cloned collections (returned by the cloning command)
120119
* matches the list of collections to clone (persisted in the coordinator document).

0 commit comments

Comments
 (0)