Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/mongodb-runner/src/mongocluster.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,10 @@ describe('MongoCluster', function () {
secondaries: 0,
});
cluster = await MongoCluster.deserialize(cluster.serialize());
const { ok } = await cluster.withClient(async (client) => {
return await client.db('admin').command({ ping: 1 });
const doc = await cluster.withClient(async (client) => {
return await client.db('config').collection('mongodbrunner').findOne();
});
expect(ok).to.equal(1);
expect(doc?._id).to.be.a('string');
await cluster.close();
});
});
116 changes: 99 additions & 17 deletions packages/mongodb-runner/src/mongoserver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import {
isFailureToSetupListener,
} from './mongologreader';
import { Readable } from 'stream';
import type { Document } from 'mongodb';
import type { Document, MongoClientOptions } from 'mongodb';
import { MongoClient } from 'mongodb';
import path from 'path';
import { once } from 'events';
import { uuid, debug } from './util';
import { uuid, debug, pick } from './util';

export interface MongoServerOptions {
binDir?: string;
Expand All @@ -24,32 +24,52 @@ export interface MongoServerOptions {
docker?: string | string[]; // Image or docker options
}

interface SerializedServerProperties {
_id: string;
pid?: number;
port?: number;
dbPath?: string;
startTime: string;
hasInsertedMetadataCollEntry: boolean;
}

export class MongoServer {
private uuid: string = uuid();
private buildInfo?: Document;
private childProcess?: ChildProcess;
private pid?: number;
private port?: number;
private dbPath?: string;
private closing = false;
private startTime = new Date().toISOString();
private hasInsertedMetadataCollEntry = false;

private constructor() {
/* see .start() */
}

serialize(): unknown /* JSON-serializable */ {
serialize(): SerializedServerProperties {
return {
_id: this.uuid,
pid: this.pid,
port: this.port,
dbPath: this.dbPath,
startTime: this.startTime,
hasInsertedMetadataCollEntry: this.hasInsertedMetadataCollEntry,
};
}

static async deserialize(serialized: any): Promise<MongoServer> {
static async deserialize(
serialized: SerializedServerProperties,
): Promise<MongoServer> {
const srv = new MongoServer();
srv.pid = serialized.pid;
srv.uuid = serialized._id;
srv.port = serialized.port;
srv.dbPath = serialized.dbPath;
await srv._populateBuildInfo();
srv.closing = !!(await srv._populateBuildInfo('restore-check'));
if (!srv.closing) {
srv.pid = serialized.pid;
srv.dbPath = serialized.dbPath;
}
return srv;
}

Expand Down Expand Up @@ -116,7 +136,7 @@ export class MongoServer {
const srv = new MongoServer();

if (!options.docker) {
const dbPath = path.join(options.tmpDir, `db-${uuid()}`);
const dbPath = path.join(options.tmpDir, `db-${srv.uuid}`);
await fs.mkdir(dbPath, { recursive: true });
srv.dbPath = dbPath;
}
Expand Down Expand Up @@ -234,7 +254,10 @@ export class MongoServer {
logEntryStream.resume();

srv.port = port;
await srv._populateBuildInfo();
const buildInfoError = await srv._populateBuildInfo('insert-new');
if (buildInfoError) {
debug('failed to get buildInfo', buildInfoError);
}
} catch (err) {
await srv.close();
throw err;
Expand Down Expand Up @@ -270,24 +293,83 @@ export class MongoServer {
this.dbPath = undefined;
}

private async _populateBuildInfo(): Promise<void> {
if (this.buildInfo?.version) return;
this.buildInfo = await this.withClient(
async (client) => await client.db('admin').command({ buildInfo: 1 }),
);
private async _ensureMatchingMetadataColl(
client: MongoClient,
mode: 'insert-new' | 'restore-check',
): Promise<void> {
const hello = await client.db('admin').command({ hello: 1 });
const isMongoS = hello.msg === 'isdbgrid';
const insertedInfo = pick(this.serialize(), [
'_id',
'pid',
'port',
'dbPath',
'startTime',
]);
const runnerColl = client
.db(isMongoS ? 'config' : 'local')
.collection<
Omit<SerializedServerProperties, 'hasInsertedMetadataCollEntry'>
>('mongodbrunner');
debug('ensuring metadata collection entry', insertedInfo, { isMongoS });
if (mode === 'insert-new') {
await runnerColl.insertOne(insertedInfo);
debug('inserted metadata collection entry', insertedInfo);
this.hasInsertedMetadataCollEntry = true;
} else {
if (!this.hasInsertedMetadataCollEntry) {
debug(
'skipping metadata collection match check as we never inserted metadata',
);
return;
}
const match = await runnerColl.findOne();
debug('read metadata collection entry', insertedInfo, match);
if (!match) {
throw new Error(
'Cannot find mongodbrunner entry, assuming that this instance was not started by mongodb-runner',
);
}
if (match._id !== insertedInfo._id) {
throw new Error(
`Mismatched mongodbrunner entry: ${JSON.stringify(match)} !== ${JSON.stringify(insertedInfo)}`,
);
}
}
}

private async _populateBuildInfo(
mode: 'insert-new' | 'restore-check',
): Promise<Error | null> {
try {
// directConnection + retryWrites let us write to `local` db on secondaries
const clientOpts = { retryWrites: false };
this.buildInfo = await this.withClient(async (client) => {
// Insert the metadata entry, except if we're a freshly started mongos
// (which does not have its own storage to persist)
await this._ensureMatchingMetadataColl(client, mode);
return await client.db('admin').command({ buildInfo: 1 });
}, clientOpts);
} catch (err) {
debug('failed to get buildInfo, treating as closed server', err);
return err as Error;
}
debug(
'got server build info through client',
this.serverVersion,
this.serverVariant,
);
return null;
}

async withClient<Fn extends (client: MongoClient) => any>(
fn: Fn,
clientOptions: MongoClientOptions = {},
): Promise<ReturnType<Fn>> {
const client = await MongoClient.connect(
`mongodb://${this.hostport}/?directConnection=true`,
);
const client = await MongoClient.connect(`mongodb://${this.hostport}/`, {
directConnection: true,
...clientOptions,
});
try {
return await fn(client);
} finally {
Expand Down
15 changes: 5 additions & 10 deletions packages/mongodb-runner/src/runner-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,9 @@ export async function stop(argv: {
id?: string;
all?: boolean;
}) {
const toStop: Array<StoredInstance> = [];
for await (const instance of instances(argv)) {
if (instance.id === argv.id || argv.all) toStop.push(instance);
}
await Promise.all(
toStop.map(async ({ filepath, serialized }) => {
await (await MongoCluster.deserialize(serialized)).close();
await fs.rm(filepath);
}),
);
await parallelForEach(instances(argv), async (instance: StoredInstance) => {
if (instance.id !== argv.id && !argv.all) return;
await (await MongoCluster.deserialize(instance.serialized)).close();
await fs.rm(instance.filepath);
});
}
13 changes: 13 additions & 0 deletions packages/mongodb-runner/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,16 @@ export async function parallelForEach<T>(

return await Promise.allSettled(result);
}

export function pick<T extends object, K extends keyof T>(
obj: T,
keys: K[],
): Pick<T, K> {
const ret: Partial<Pick<T, K>> = {};
for (const key of Object.keys(obj) as K[]) {
if (keys.includes(key)) {
ret[key] = obj[key];
}
}
return ret as Pick<T, K>;
}