Skip to content

Commit 00df2da

Browse files
wolfee001MongoDB Bot
authored andcommitted
SERVER-99640 Implement add shard coordinator shard verification phase (#31690)
GitOrigin-RevId: 972b0cf
1 parent 1eae239 commit 00df2da

9 files changed

+815
-405
lines changed

jstests/sharding/add_shard_coordinator.js

Lines changed: 103 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,45 +7,110 @@
77
* @tags: [
88
* does_not_support_stepdowns,
99
* assumes_balancer_off,
10+
* config_shard_incompatible,
1011
* ]
1112
*/
13+
14+
// TODO(SERVER-96770): this test should not exist after enabling the
15+
// featureFlagUseTopologyChangeCoordinators - the functionality should be tested through the public
16+
// API
17+
18+
import {ReplSetTest} from "jstests/libs/replsettest.js";
1219
import {ShardingTest} from "jstests/libs/shardingtest.js";
1320

14-
const st = new ShardingTest({shards: 2});
15-
16-
jsTest.log("Adding an already existing shard should return OK");
17-
assert.commandWorked(st.configRS.getPrimary().adminCommand(
18-
{_configsvrAddShardCoordinator: st.shard1.host, "writeConcern": {"w": "majority"}}));
19-
20-
jsTest.log("Adding an already existing shard with the existing name should return OK");
21-
assert.commandWorked(st.configRS.getPrimary().adminCommand({
22-
_configsvrAddShardCoordinator: st.shard1.host,
23-
name: st.shard1.shardName,
24-
"writeConcern": {"w": "majority"}
25-
}));
26-
27-
jsTest.log("Adding an already existing shard with different name should fail");
28-
assert.commandFailedWithCode(st.configRS.getPrimary().adminCommand({
29-
_configsvrAddShardCoordinator: st.shard1.host,
30-
name: "wolfee_is_smart",
31-
"writeConcern": {"w": "majority"}
32-
}),
33-
ErrorCodes.IllegalOperation);
34-
35-
jsTest.log("Adding a new shard should fail");
36-
assert.commandFailedWithCode(st.configRS.getPrimary().adminCommand({
37-
_configsvrAddShardCoordinator: "wolfee_is_smart",
38-
name: "200_IQ",
39-
"writeConcern": {"w": "majority"}
40-
}),
41-
ErrorCodes.NotImplemented);
42-
43-
jsTest.log("Empty name should fail");
44-
assert.commandFailedWithCode(st.configRS.getPrimary().adminCommand({
45-
_configsvrAddShardCoordinator: st.shard1.host,
46-
name: "",
47-
"writeConcern": {"w": "majority"}
48-
}),
49-
ErrorCodes.BadValue);
50-
51-
st.stop();
21+
{
22+
const st = new ShardingTest({shards: 2, nodes: 1});
23+
24+
jsTest.log("Adding an already existing shard should return OK");
25+
assert.commandWorked(st.configRS.getPrimary().adminCommand(
26+
{_configsvrAddShardCoordinator: st.shard1.host, "writeConcern": {"w": "majority"}}));
27+
28+
jsTest.log("Adding an already existing shard with the existing name should return OK");
29+
assert.commandWorked(st.configRS.getPrimary().adminCommand({
30+
_configsvrAddShardCoordinator: st.shard1.host,
31+
name: st.shard1.shardName,
32+
"writeConcern": {"w": "majority"}
33+
}));
34+
35+
jsTest.log("Adding an already existing shard with different name should fail");
36+
assert.commandFailedWithCode(st.configRS.getPrimary().adminCommand({
37+
_configsvrAddShardCoordinator: st.shard1.host,
38+
name: "wolfee_is_smart",
39+
"writeConcern": {"w": "majority"}
40+
}),
41+
ErrorCodes.IllegalOperation);
42+
43+
jsTest.log("Empty name should fail");
44+
assert.commandFailedWithCode(st.configRS.getPrimary().adminCommand({
45+
_configsvrAddShardCoordinator: st.shard1.host,
46+
name: "",
47+
"writeConcern": {"w": "majority"}
48+
}),
49+
ErrorCodes.BadValue);
50+
51+
jsTest.log("Invalid host should fail");
52+
assert.commandFailedWithCode(st.configRS.getPrimary().adminCommand({
53+
_configsvrAddShardCoordinator: "some.nonexistent.host",
54+
"writeConcern": {"w": "majority"}
55+
}),
56+
ErrorCodes.HostUnreachable);
57+
58+
st.stop();
59+
}
60+
61+
{
62+
const st = new ShardingTest({shards: 0});
63+
const rs = new ReplSetTest({nodes: 1});
64+
rs.startSet({shardsvr: ""});
65+
rs.initiate();
66+
67+
const foo = rs.getPrimary().getDB("foo");
68+
assert.commandWorked(foo.foo.insertOne({a: 1}));
69+
70+
jsTest.log("Non-empty RS can be added if this is the first shard");
71+
assert.commandFailedWithCode(
72+
st.configRS.getPrimary().adminCommand(
73+
{_configsvrAddShardCoordinator: rs.getURL(), "writeConcern": {"w": "majority"}}),
74+
ErrorCodes.NotImplemented);
75+
76+
jsTest.log("First RS is never locked for write");
77+
assert.commandWorked(foo.foo.insertOne({b: 1}));
78+
79+
rs.stopSet();
80+
st.stop();
81+
}
82+
83+
{
84+
const st = new ShardingTest({shards: 1});
85+
const rs = new ReplSetTest({nodes: 1});
86+
rs.startSet({shardsvr: ""});
87+
rs.initiate();
88+
89+
assert.commandWorked(rs.getPrimary().getDB("foo").foo.insertOne({a: 1}));
90+
91+
jsTest.log("Non-empty RS can't be added if it's not the first shard");
92+
assert.commandFailedWithCode(
93+
st.configRS.getPrimary().adminCommand(
94+
{_configsvrAddShardCoordinator: rs.getURL(), "writeConcern": {"w": "majority"}}),
95+
ErrorCodes.IllegalOperation);
96+
97+
jsTest.log("RS user writes is unlocked on fail");
98+
assert.commandWorked(rs.getPrimary().getDB("foo").foo.insertOne({b: 1}));
99+
assert.commandWorked(rs.getPrimary().getDB("foo").dropDatabase({w: "majority"}));
100+
101+
jsTest.log("Empty non-first RS can be added");
102+
assert.commandFailedWithCode(
103+
st.configRS.getPrimary().adminCommand(
104+
{_configsvrAddShardCoordinator: rs.getURL(), "writeConcern": {"w": "majority"}}),
105+
ErrorCodes.NotImplemented);
106+
107+
jsTest.log("Non-first RS is locked for write");
108+
try {
109+
rs.getPrimary().getDB("bar").foo.insertOne({b: 1});
110+
} catch (error) {
111+
assert.commandFailedWithCode(error, ErrorCodes.UserWritesBlocked);
112+
}
113+
114+
rs.stopSet();
115+
st.stop();
116+
}

src/mongo/db/s/add_shard_coordinator.cpp

Lines changed: 122 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@
3434

3535
namespace mongo {
3636

37+
namespace {
38+
static constexpr size_t kMaxFailedRetryCount = 3;
39+
static const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
40+
} // namespace
41+
3742
ExecutorFuture<void> AddShardCoordinator::_runImpl(
3843
std::shared_ptr<executor::ScopedTaskExecutor> executor,
3944
const CancellationToken& token) noexcept {
@@ -46,17 +51,57 @@ ExecutorFuture<void> AddShardCoordinator::_runImpl(
4651

4752
_verifyInput();
4853

49-
const auto existingShard =
50-
uassertStatusOK(topology_change_helpers::checkIfShardExists(
51-
opCtx,
52-
_doc.getConnectionString(),
53-
_doc.getProposedName(),
54-
*ShardingCatalogManager::get(opCtx)->localCatalogClient()));
54+
const auto existingShard = topology_change_helpers::getExistingShard(
55+
opCtx,
56+
_doc.getConnectionString(),
57+
_doc.getProposedName(),
58+
*ShardingCatalogManager::get(opCtx)->localCatalogClient());
5559
if (existingShard.has_value()) {
5660
_doc.setChosenName(existingShard.value().getName());
5761
_enterPhase(AddShardCoordinatorPhaseEnum::kFinal);
5862
}
5963
}))
64+
.then(_buildPhaseHandler(
65+
Phase::kCheckShardPreconditions,
66+
[this, &token, _ = shared_from_this(), executor]() {
67+
auto opCtxHolder = cc().makeOperationContext();
68+
auto* opCtx = opCtxHolder.get();
69+
70+
auto& targeter = _getTargeter(opCtx);
71+
72+
auto shardRegistry = Grid::get(opCtx)->shardRegistry();
73+
shardRegistry->reload(opCtx);
74+
const bool isFirstShard = shardRegistry->getNumShards(opCtx) == 0;
75+
76+
_runWithRetries(
77+
[&]() {
78+
topology_change_helpers::validateHostAsShard(opCtx,
79+
targeter,
80+
_doc.getConnectionString(),
81+
_doc.getIsConfigShard(),
82+
**executor);
83+
},
84+
executor,
85+
token);
86+
87+
// TODO(SERVER-97997) Remove the check after promoting to sharded cluster is
88+
// implemented correctly
89+
if (!isFirstShard) {
90+
topology_change_helpers::setUserWriteBlockingState(
91+
opCtx,
92+
targeter,
93+
topology_change_helpers::UserWriteBlockingLevel::All,
94+
true, /* block writes */
95+
boost::make_optional<
96+
std::function<OperationSessionInfo(OperationContext*)>>(
97+
[this](OperationContext* opCtx) -> OperationSessionInfo {
98+
return getNewSession(opCtx);
99+
}),
100+
**executor);
101+
102+
_checkExistingDataOnShard(opCtx, targeter, **executor);
103+
}
104+
}))
60105
.then(_buildPhaseHandler(Phase::kFinal,
61106
[this, _ = shared_from_this()]() {
62107
auto opCtxHolder = cc().makeOperationContext();
@@ -77,7 +122,26 @@ ExecutorFuture<void> AddShardCoordinator::_runImpl(
77122

78123
_result = _doc.getChosenName().value().toString();
79124
}))
80-
.onError([this, _ = shared_from_this()](const Status& status) { return status; });
125+
.onError([this, _ = shared_from_this(), executor](const Status& status) {
126+
auto opCtxHolder = cc().makeOperationContext();
127+
auto* opCtx = opCtxHolder.get();
128+
auto& targeter = _getTargeter(opCtx);
129+
130+
topology_change_helpers::setUserWriteBlockingState(
131+
opCtx,
132+
targeter,
133+
topology_change_helpers::UserWriteBlockingLevel::All,
134+
false, /* unblock writes */
135+
boost::make_optional<std::function<OperationSessionInfo(OperationContext*)>>(
136+
[this](OperationContext* opCtx) -> OperationSessionInfo {
137+
return getNewSession(opCtx);
138+
}),
139+
**executor);
140+
141+
topology_change_helpers::removeReplicaSetMonitor(opCtx, _doc.getConnectionString());
142+
143+
return status;
144+
});
81145
}
82146

83147
const std::string& AddShardCoordinator::getResult(OperationContext* opCtx) const {
@@ -102,4 +166,55 @@ void AddShardCoordinator::_verifyInput() const {
102166
!_doc.getProposedName() || !_doc.getProposedName()->empty());
103167
}
104168

169+
void AddShardCoordinator::_checkExistingDataOnShard(
170+
OperationContext* opCtx,
171+
RemoteCommandTargeter& targeter,
172+
std::shared_ptr<executor::TaskExecutor> executor) const {
173+
const auto dbNames =
174+
topology_change_helpers::getDBNamesListFromShard(opCtx, targeter, executor);
175+
176+
uassert(ErrorCodes::IllegalOperation,
177+
str::stream() << "can't add shard '" << _doc.getConnectionString().toString()
178+
<< "' because it's not empty.",
179+
dbNames.empty());
180+
}
181+
182+
RemoteCommandTargeter& AddShardCoordinator::_getTargeter(OperationContext* opCtx) {
183+
if (!_shardConnection) {
184+
const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
185+
_shardConnection = shardRegistry->createConnection(_doc.getConnectionString());
186+
}
187+
188+
return *(_shardConnection->getTargeter());
189+
}
190+
191+
void AddShardCoordinator::_runWithRetries(std::function<void()>&& function,
192+
std::shared_ptr<executor::ScopedTaskExecutor> executor,
193+
const CancellationToken& token) {
194+
size_t failCounter = 0;
195+
196+
AsyncTry([&]() {
197+
try {
198+
function();
199+
} catch (const DBException& ex) {
200+
return ex.toStatus();
201+
}
202+
return Status::OK();
203+
})
204+
.until([&](const Status& status) {
205+
if (status.isOK()) {
206+
return true;
207+
}
208+
failCounter++;
209+
if (failCounter > kMaxFailedRetryCount) {
210+
_completeOnError = true;
211+
return true;
212+
}
213+
return false;
214+
})
215+
.withBackoffBetweenIterations(kExponentialBackoff)
216+
.on(**executor, token)
217+
.get();
218+
}
219+
105220
} // namespace mongo

src/mongo/db/s/add_shard_coordinator.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
#pragma once
3131

32+
#include "mongo/client/remote_command_targeter.h"
3233
#include "mongo/db/s/add_shard_coordinator_document_gen.h"
3334
#include "mongo/db/s/sharding_ddl_coordinator.h"
3435
#include "mongo/db/s/sharding_ddl_coordinator_service.h"
@@ -60,8 +61,20 @@ class AddShardCoordinator final
6061

6162
void _verifyInput() const;
6263

64+
void _checkExistingDataOnShard(OperationContext* opCtx,
65+
RemoteCommandTargeter& targeter,
66+
std::shared_ptr<executor::TaskExecutor> executor) const;
67+
68+
RemoteCommandTargeter& _getTargeter(OperationContext* opCtx);
69+
70+
void _runWithRetries(std::function<void()>&& function,
71+
std::shared_ptr<executor::ScopedTaskExecutor> executor,
72+
const CancellationToken& token);
73+
6374
// Set on successful completion of the coordinator.
6475
boost::optional<std::string> _result;
76+
77+
std::unique_ptr<Shard> _shardConnection;
6578
};
6679

6780
} // namespace mongo

src/mongo/db/s/add_shard_coordinator_document.idl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ enums:
4040
values:
4141
kUnset: "unset"
4242
kCheckLocalPreconditions: "checkLocalPreconditions"
43+
kCheckShardPreconditions: "checkShardPreconditions"
4344
kFinal: "final"
4445

4546
structs:
@@ -57,6 +58,9 @@ structs:
5758
type: string
5859
description: "A proposed name for the new shard"
5960
optional: true
61+
isConfigShard:
62+
type: bool
63+
description: "Specifies if the new shard is supposed to be a config shard"
6064
phase:
6165
type: AddShardCoordinatorPhase
6266
description: "Coordinator phase."

src/mongo/db/s/config/configsvr_add_shard_coordinator_command.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,10 @@ class ConfigSvrAddShardCoordinatorCommand
103103
audit::logAddShard(Client::getCurrent(), name ? name.value() : "", target.toString());
104104

105105
const auto addShardCoordinator =
106-
checked_pointer_cast<AddShardCoordinator>(std::invoke([&target, &name, opCtx]() {
106+
checked_pointer_cast<AddShardCoordinator>(std::invoke([&]() {
107107
auto coordinatorDoc = AddShardCoordinatorDocument();
108108
coordinatorDoc.setConnectionString(target);
109+
coordinatorDoc.setIsConfigShard(false);
109110
coordinatorDoc.setProposedName(name);
110111
coordinatorDoc.setShardingDDLCoordinatorMetadata(
111112
{{NamespaceString::kConfigsvrShardsNamespace,

0 commit comments

Comments
 (0)