Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 101 additions & 63 deletions src/cmap/command_monitoring_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export class CommandStartedEvent {
serverConnectionId: bigint | null
) {
const cmd = extractCommand(command);
const commandName = extractCommandName(cmd);
const commandName = extractCommandName(command);
const { address, connectionId, serviceId } = extractConnectionDetails(connection);

// TODO: remove in major revision, this is not spec behavior
Expand Down Expand Up @@ -116,7 +116,7 @@ export class CommandSucceededEvent {
serverConnectionId: bigint | null
) {
const cmd = extractCommand(command);
const commandName = extractCommandName(cmd);
const commandName = extractCommandName(command);
const { address, connectionId, serviceId } = extractConnectionDetails(connection);

this.address = address;
Expand Down Expand Up @@ -174,7 +174,7 @@ export class CommandFailedEvent {
serverConnectionId: bigint | null
) {
const cmd = extractCommand(command);
const commandName = extractCommandName(cmd);
const commandName = extractCommandName(command);
const { address, connectionId, serviceId } = extractConnectionDetails(connection);

this.address = address;
Expand Down Expand Up @@ -213,7 +213,14 @@ export const SENSITIVE_COMMANDS = new Set([
const HELLO_COMMANDS = new Set(['hello', LEGACY_HELLO_COMMAND, LEGACY_HELLO_COMMAND_CAMEL_CASE]);

// helper methods
const extractCommandName = (commandDoc: Document) => Object.keys(commandDoc)[0];
const extractCommandName = (commandDoc: Document) => {
if (commandDoc.query != null) {
if (commandDoc.query.$query != null) return Object.keys(commandDoc.query.$query)[0];
return Object.keys(commandDoc.query)[0];
}
if (commandDoc.command != null) return Object.keys(commandDoc.command)[0];
return Object.keys(commandDoc)[0];
};
const namespace = (command: OpQueryRequest) => command.ns;
const collectionName = (command: OpQueryRequest) => command.ns.split('.')[1];
const maybeRedact = (commandName: string, commandDoc: Document, result: Error | Document) =>
Expand All @@ -236,12 +243,32 @@ const LEGACY_FIND_QUERY_MAP: { [key: string]: string } = {
$snapshot: 'snapshot'
};

const LEGACY_FIND_QUERY_MAP_rev: Record<any, string> = {
filter: '$query',
sort: '$orderby',
hint: '$hint',
comment: '$comment',
maxScan: '$maxScan',
max: '$max',
min: '$min',
returnKey: '$returnKey',
showRecordId: '$showDiskLoc',
maxTimeMS: '$maxTimeMS',
snapshot: '$snapshot'
};

const LEGACY_FIND_OPTIONS_MAP = {
numberToSkip: 'skip',
numberToReturn: 'batchSize',
returnFieldSelector: 'projection'
} as const;

const LEGACY_FIND_OPTIONS_MAP_rev: Record<any, string> = {
skip: 'numberToSkip',
batchSize: 'numberToReturn',
projection: 'returnFieldSelector'
} as const;

const OP_QUERY_KEYS = [
'tailable',
'oplogReplay',
Expand All @@ -254,70 +281,69 @@ const OP_QUERY_KEYS = [
/** Extract the actual command from the query, possibly up-converting if it's a legacy format */
function extractCommand(command: WriteProtocolMessageType): Document {
if (command instanceof OpMsgRequest) {
const cmd = deepCopy(command.command);
// For OP_MSG with payload type 1 we need to pull the documents
// array out of the document sequence for monitoring.
if (cmd.ops instanceof DocumentSequence) {
cmd.ops = cmd.ops.documents;
}
if (cmd.nsInfo instanceof DocumentSequence) {
cmd.nsInfo = cmd.nsInfo.documents;
}
const cmd = new Proxy(command.command, {
// For OP_MSG with payload type 1 we need to pull the documents
// array out of the document sequence for monitoring.
get(target, prop) {
if (prop === 'ops') {
if (target.ops instanceof DocumentSequence) {
return target.ops.documents;
} else {
return target.ops;
}
}

if (prop === 'nsInfo') {
if (target.nsInfo instanceof DocumentSequence) {
return target.nsInfo.documents;
} else {
return target.nsInfo;
}
}

return typeof prop === 'string' && prop in target ? target[prop] : undefined;
}
});
return cmd;
}

if (command.query?.$query) {
let result: Document;
if (command.ns === 'admin.$cmd') {
// up-convert legacy command
result = Object.assign({}, command.query.$query);
} else {
// up-convert legacy find command
result = { find: collectionName(command) };
Object.keys(LEGACY_FIND_QUERY_MAP).forEach(key => {
if (command.query[key] != null) {
result[LEGACY_FIND_QUERY_MAP[key]] = deepCopy(command.query[key]);
const cmd = new Proxy(command, {
get(target: Document, prop) {
if (target.ns === 'admin.$cmd') {
// up-convert legacy command
return target.query.$query[prop];
} else {
// up-convert legacy find command
if (prop === 'find') {
return collectionName(target as OpQueryRequest);
}
if (typeof prop !== 'string') return undefined;

if (LEGACY_FIND_QUERY_MAP_rev[prop] != null) {
return target.query[LEGACY_FIND_QUERY_MAP_rev[prop]];
}
}
});
}

Object.keys(LEGACY_FIND_OPTIONS_MAP).forEach(key => {
const legacyKey = key as keyof typeof LEGACY_FIND_OPTIONS_MAP;
if (command[legacyKey] != null) {
result[LEGACY_FIND_OPTIONS_MAP[legacyKey]] = deepCopy(command[legacyKey]);
}
});
if (LEGACY_FIND_OPTIONS_MAP_rev[prop] != null) {
return target[LEGACY_FIND_OPTIONS_MAP_rev[prop]];
}

if (prop === 'limit' && target.pre32Limit != null) return target.pre32Limit;

OP_QUERY_KEYS.forEach(key => {
if (command[key]) {
result[key] = command[key];
return target[prop];
}
});

if (command.pre32Limit != null) {
result.limit = command.pre32Limit;
}

if (command.query.$explain) {
return { explain: result };
}
return result;
if (command.query.$explain) return { explain: cmd };
return cmd;
}

const clonedQuery: Record<string, unknown> = {};
const clonedCommand: Record<string, unknown> = {};
if (command.query) {
for (const k in command.query) {
clonedQuery[k] = deepCopy(command.query[k]);
}
clonedCommand.query = clonedQuery;
}

for (const k in command) {
if (k === 'query') continue;
clonedCommand[k] = deepCopy((command as unknown as Record<string, unknown>)[k]);
return new Proxy(command.query, {});
} else {
return new Proxy(command, {});
}
return command.query ? clonedQuery : clonedCommand;
}

function extractReply(command: WriteProtocolMessageType, reply?: Document) {
Expand All @@ -326,22 +352,34 @@ function extractReply(command: WriteProtocolMessageType, reply?: Document) {
}

if (command instanceof OpMsgRequest) {
return deepCopy(reply.result ? reply.result : reply);
return new Proxy(reply, {});
}

// is this a legacy find command?
if (command.query && command.query.$query != null) {
return {
ok: 1,
cursor: {
id: deepCopy(reply.cursorId),
ns: namespace(command),
firstBatch: deepCopy(reply.documents)
return new Proxy(reply, {
get(target, prop) {
if (typeof prop !== 'string') return undefined;
if (prop === 'ok') {
return 1;
}
if (prop === 'cursor') {
return {
id: target.cursorId,
ns: namespace(command),
firstBatch: target.documents
};
}
return target[prop];
}
};
});
}

return deepCopy(reply.result ? reply.result : reply);
if (reply.result != null) {
return new Proxy(reply.result, {});
} else {
return new Proxy(reply, {});
}
}

function extractConnectionDetails(connection: Connection) {
Expand Down
Loading