Skip to content

Commit 4c756a1

Browse files
committed
fixup: clean up replset and shard arg handling
1 parent 47de6fd commit 4c756a1

File tree

1 file changed

+171
-90
lines changed

1 file changed

+171
-90
lines changed

packages/mongodb-runner/src/mongocluster.ts

Lines changed: 171 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { MongoClient } from 'mongodb';
88
import { sleep, range, uuid, debug, jsonClone } from './util';
99
import { OIDCMockProviderProcess } from './oidc';
1010
import { EventEmitter } from 'events';
11+
import assert from 'assert';
1112

1213
export interface MongoDBUserDoc {
1314
username: string;
@@ -20,33 +21,163 @@ export interface RSMemberOptions {
2021
tags?: TagSet;
2122
priority?: number;
2223
args?: string[];
24+
arbiterOnly?: boolean;
2325
}
24-
export interface MongoClusterOptions
25-
extends Pick<
26-
MongoServerOptions,
27-
'logDir' | 'tmpDir' | 'args' | 'binDir' | 'docker'
28-
> {
29-
topology: 'standalone' | 'replset' | 'sharded';
30-
arbiters?: number;
31-
secondaries?: number;
32-
shards?: number;
33-
version?: string;
26+
27+
export interface CommonOptions {
3428
downloadDir?: string;
3529
downloadOptions?: DownloadOptions;
30+
3631
oidc?: string;
37-
rsMemberOptions?: RSMemberOptions[];
38-
shardArgs?: string[][];
39-
mongosArgs?: string[][];
32+
33+
version?: string;
4034
users?: MongoDBUserDoc[];
35+
36+
topology: 'standalone' | 'replset' | 'sharded';
4137
}
4238

39+
export type RSOptions =
40+
| {
41+
arbiters?: number;
42+
secondaries?: number;
43+
rsMembers?: never;
44+
}
45+
| {
46+
arbiters?: never;
47+
secondaries?: never;
48+
rsMembers: RSMemberOptions[];
49+
};
50+
51+
export type ShardedOptions = {
52+
mongosArgs?: string[][];
53+
} & (
54+
| {
55+
shards?: number;
56+
shardArgs?: never;
57+
}
58+
| {
59+
shards?: never;
60+
shardArgs?: string[][];
61+
}
62+
);
63+
64+
export type MongoClusterOptions = Pick<
65+
MongoServerOptions,
66+
'logDir' | 'tmpDir' | 'args' | 'binDir' | 'docker'
67+
> &
68+
CommonOptions &
69+
RSOptions &
70+
ShardedOptions;
71+
4372
export type MongoClusterEvents = {
4473
[k in keyof MongoServerEvents]: [serverUUID: string, ...MongoServerEvents[k]];
4574
} & {
4675
newListener: [keyof MongoClusterEvents];
4776
removeListener: [keyof MongoClusterEvents];
4877
};
4978

79+
function removePortArg([...args]: string[]): string[] {
80+
let portArgIndex = -1;
81+
if ((portArgIndex = args.indexOf('--port')) !== -1) {
82+
args.splice(portArgIndex + 1, 1);
83+
} else if (
84+
(portArgIndex = args.findIndex((arg) => arg.startsWith('--port='))) !== -1
85+
) {
86+
args.splice(portArgIndex, 1);
87+
}
88+
return args;
89+
}
90+
91+
function hasPortArg(args: string[] | undefined): boolean {
92+
if (!args) return false;
93+
return (
94+
args.includes('--port') || args.some((arg) => arg.startsWith('--port='))
95+
);
96+
}
97+
98+
function processRSMembers(options: MongoClusterOptions): {
99+
rsMembers: RSMemberOptions[];
100+
replSetName: string;
101+
} {
102+
const {
103+
secondaries = 2,
104+
arbiters = 0,
105+
args: [...args] = [],
106+
rsMembers,
107+
} = options;
108+
109+
let replSetName: string;
110+
if (!args.includes('--replSet')) {
111+
replSetName = `replSet-${uuid()}`;
112+
args.push('--replSet', replSetName);
113+
} else {
114+
replSetName = args[args.indexOf('--replSet') + 1];
115+
}
116+
117+
const primaryArgs: string[] = [...args];
118+
const secondaryArgs = [...removePortArg(args), '--port', '0'];
119+
120+
if (rsMembers) {
121+
const primary = rsMembers.find((m) =>
122+
rsMembers.every((m2) => m.priority ?? 0 >= (m2.priority ?? 0)),
123+
);
124+
return {
125+
rsMembers: rsMembers.map((m) => ({
126+
...m,
127+
args: [
128+
...(m.args ?? []),
129+
...(hasPortArg(m.args)
130+
? args
131+
: m === primary
132+
? primaryArgs
133+
: secondaryArgs),
134+
],
135+
})),
136+
replSetName,
137+
};
138+
}
139+
140+
return {
141+
rsMembers: [
142+
{ priority: 1, args: primaryArgs },
143+
...range(secondaries).map(() => ({ priority: 0, args: secondaryArgs })),
144+
...range(arbiters).map(() => ({
145+
priority: 0,
146+
arbiterOnly: true,
147+
args: secondaryArgs,
148+
})),
149+
],
150+
replSetName,
151+
};
152+
}
153+
154+
function processShardOptions(options: MongoClusterOptions): {
155+
shardArgs: string[][];
156+
mongosArgs: string[][];
157+
} {
158+
const {
159+
shardArgs = range(options.shards ?? 1).map(() => []),
160+
mongosArgs = [[]],
161+
args = [],
162+
} = options;
163+
return {
164+
shardArgs: shardArgs.map((perShardArgs, i) => [
165+
...removePortArg(args),
166+
...perShardArgs,
167+
...(perShardArgs.includes('--configsvr') ||
168+
perShardArgs.includes('--shardsvr')
169+
? []
170+
: i === 0
171+
? ['--configsvr']
172+
: ['--shardsvr']),
173+
]),
174+
mongosArgs: mongosArgs.map((perMongosArgs, i) => [
175+
...(i === 0 && !hasPortArg(perMongosArgs) ? args : removePortArg(args)),
176+
...perMongosArgs,
177+
]),
178+
};
179+
}
180+
50181
export class MongoCluster extends EventEmitter<MongoClusterEvents> {
51182
private topology: MongoClusterOptions['topology'] = 'standalone';
52183
private replSetName?: string;
@@ -199,76 +330,45 @@ export class MongoCluster extends EventEmitter<MongoClusterEvents> {
199330
}),
200331
);
201332
} else if (options.topology === 'replset') {
202-
const { secondaries = 2, arbiters = 0 } = options;
203-
204-
const args = [...(options.args ?? [])];
205-
let replSetName: string;
206-
if (!args.includes('--replSet')) {
207-
replSetName = `replSet-${uuid()}`;
208-
args.push('--replSet', replSetName);
209-
} else {
210-
replSetName = args[args.indexOf('--replSet') + 1];
211-
}
333+
const { rsMembers, replSetName } = processRSMembers(options);
212334

213-
const primaryArgs = [...args];
214-
const rsMemberOptions = options.rsMemberOptions || [{}];
215-
if (rsMemberOptions.length > 0) {
216-
primaryArgs.push(...(rsMemberOptions[0].args || []));
217-
}
218-
debug('Starting primary', primaryArgs);
219-
const primary = await MongoServer.start({
220-
...options,
221-
args: primaryArgs,
222-
binary: 'mongod',
335+
debug('Starting replica set nodes', {
336+
replSetName,
337+
secondaries: rsMembers.filter((m) => !m.arbiterOnly).length - 1,
338+
arbiters: rsMembers.filter((m) => m.arbiterOnly).length,
223339
});
224-
cluster.servers.push(primary);
225-
226-
if (args.includes('--port')) {
227-
args.splice(args.indexOf('--port') + 1, 1, '0');
228-
}
340+
const primaryIndex = rsMembers.findIndex((m) =>
341+
rsMembers.every((m2) => m.priority ?? 0 >= (m2.priority ?? 0)),
342+
);
343+
assert.notStrictEqual(primaryIndex, -1);
229344

230-
debug('Starting secondaries and arbiters', {
231-
secondaries,
232-
arbiters,
233-
args,
234-
});
235-
cluster.servers.push(
236-
...(await Promise.all(
237-
range(secondaries + arbiters).map((i) => {
238-
const secondaryArgs = [...args];
239-
if (i + 1 < rsMemberOptions.length) {
240-
secondaryArgs.push(...(rsMemberOptions[i + 1].args || []));
241-
debug('Adding secondary args', rsMemberOptions[i + 1].args || []);
242-
}
243-
return MongoServer.start({
345+
const nodes = await Promise.all(
346+
rsMembers.map(async (member) => {
347+
return [
348+
await MongoServer.start({
244349
...options,
245-
args: secondaryArgs,
350+
args: member.args,
246351
binary: 'mongod',
247-
});
248-
}),
249-
)),
352+
}),
353+
member,
354+
] as const;
355+
}),
250356
);
357+
cluster.servers.push(...nodes.map(([srv]) => srv));
358+
const primary = cluster.servers[primaryIndex];
251359

252360
await primary.withClient(async (client) => {
253361
debug('Running rs.initiate');
254362
const rsConf = {
255363
_id: replSetName,
256-
configsvr: args.includes('--configsvr'),
257-
members: cluster.servers.map((srv, i) => {
258-
let options: RSMemberOptions = {};
259-
if (i < rsMemberOptions.length) {
260-
options = rsMemberOptions[i];
261-
}
262-
let priority = i === 0 ? 1 : 0;
263-
if (options.priority !== undefined) {
264-
priority = options.priority;
265-
}
364+
configsvr: rsMembers.some((m) => m.args?.includes('--configsvr')),
365+
members: nodes.map(([srv, member], i) => {
266366
return {
267367
_id: i,
268368
host: srv.hostport,
269-
arbiterOnly: i > secondaries,
270-
priority,
271-
tags: options.tags || {},
369+
arbiterOnly: member.arbiterOnly ?? false,
370+
priority: member.priority ?? 1,
371+
tags: member.tags || {},
272372
};
273373
}),
274374
};
@@ -296,28 +396,10 @@ export class MongoCluster extends EventEmitter<MongoClusterEvents> {
296396
}
297397
});
298398
} else if (options.topology === 'sharded') {
299-
const { shards = 3 } = options;
300-
const shardArgs = [...(options.args ?? [])];
301-
if (shardArgs.includes('--port')) {
302-
shardArgs.splice(shardArgs.indexOf('--port') + 1, 1, '0');
303-
}
304-
const perShardArgs = options.shardArgs || [[]];
305-
399+
const { shardArgs, mongosArgs } = processShardOptions(options);
306400
debug('starting config server and shard servers', shardArgs);
307401
const [configsvr, ...shardsvrs] = await Promise.all(
308-
range(shards + 1).map((i) => {
309-
const args: string[] = [...shardArgs];
310-
if (i === 0) {
311-
args.push('--configsvr');
312-
} else {
313-
if (i - 1 < perShardArgs.length) {
314-
args.push(...perShardArgs[i - 1]);
315-
debug('Adding shard args', perShardArgs[i - 1]);
316-
}
317-
if (!args.includes('--shardsvr')) {
318-
args.push('--shardsvr');
319-
}
320-
}
402+
shardArgs.map((args) => {
321403
return MongoCluster.start({
322404
...options,
323405
args,
@@ -327,7 +409,6 @@ export class MongoCluster extends EventEmitter<MongoClusterEvents> {
327409
);
328410
cluster.shards.push(configsvr, ...shardsvrs);
329411

330-
const mongosArgs = options.mongosArgs ?? [[]];
331412
for (let i = 0; i < mongosArgs.length; i++) {
332413
debug('starting mongos');
333414
const mongos = await MongoServer.start({

0 commit comments

Comments
 (0)