Skip to content

Commit 890dc75

Browse files
mbroadstdaprahamian
authored andcommitted
refactor(list-collections): support retryable reads
1 parent e953ce2 commit 890dc75

File tree

4 files changed

+104
-70
lines changed

4 files changed

+104
-70
lines changed

lib/db.js

Lines changed: 6 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ const AggregationCursor = require('./aggregation_cursor');
2828
const createListener = require('./operations/db_ops').createListener;
2929
const ensureIndex = require('./operations/db_ops').ensureIndex;
3030
const evaluate = require('./operations/db_ops').evaluate;
31-
const listCollectionsTransforms = require('./operations/db_ops').listCollectionsTransforms;
3231
const profilingInfo = require('./operations/db_ops').profilingInfo;
3332
const validateDatabaseName = require('./operations/db_ops').validateDatabaseName;
3433

@@ -42,6 +41,7 @@ const DropCollectionOperation = require('./operations/drop').DropCollectionOpera
4241
const DropDatabaseOperation = require('./operations/drop').DropDatabaseOperation;
4342
const ExecuteDbAdminCommandOperation = require('./operations/execute_db_admin_command');
4443
const IndexInformationOperation = require('./operations/index_information');
44+
const ListCollectionsOperation = require('./operations/list_collections');
4545
const ProfilingLevelOperation = require('./operations/profiling_level');
4646
const RemoveUserOperation = require('./operations/remove_user');
4747
const RenameOperation = require('./operations/rename');
@@ -572,72 +572,11 @@ Db.prototype.listCollections = function(filter, options) {
572572
filter = filter || {};
573573
options = options || {};
574574

575-
// Shallow clone the object
576-
options = Object.assign({}, options);
577-
// Set the promise library
578-
options.promiseLibrary = this.s.promiseLibrary;
579-
580-
// Ensure valid readPreference
581-
options.readPreference = resolveReadPreference(this, options);
582-
583-
// Cursor options
584-
let cursor = options.batchSize ? { batchSize: options.batchSize } : {};
585-
586-
// We have a list collections command
587-
if (this.serverConfig.capabilities().hasListCollectionsCommand) {
588-
const nameOnly = typeof options.nameOnly === 'boolean' ? options.nameOnly : false;
589-
// Build the command
590-
const command = { listCollections: 1, filter, cursor, nameOnly };
591-
// Set the AggregationCursor constructor
592-
options.cursorFactory = CommandCursor;
593-
// Create the cursor
594-
cursor = this.s.topology.cursor(
595-
this.s.namespace.withCollection('$cmd').toString(),
596-
command,
597-
options
598-
);
599-
// Do we have a readPreference, apply it
600-
if (options.readPreference) {
601-
cursor.setReadPreference(options.readPreference);
602-
}
603-
// Return the cursor
604-
return cursor;
605-
}
606-
607-
// We cannot use the listCollectionsCommand
608-
if (!this.serverConfig.capabilities().hasListCollectionsCommand) {
609-
// If we have legacy mode and have not provided a full db name filter it
610-
if (
611-
typeof filter.name === 'string' &&
612-
!new RegExp('^' + this.databaseName + '\\.').test(filter.name)
613-
) {
614-
filter = Object.assign({}, filter);
615-
filter.name = this.s.namespace.withCollection(filter.name).toString();
616-
}
617-
}
618-
619-
// No filter, filter by current database
620-
if (filter == null) {
621-
filter.name = `/${this.databaseName}/`;
622-
}
623-
624-
// Rewrite the filter to use $and to filter out indexes
625-
if (filter.name) {
626-
filter = { $and: [{ name: filter.name }, { name: /^((?!\$).)*$/ }] };
627-
} else {
628-
filter = { name: /^((?!\$).)*$/ };
629-
}
630-
631-
// Return options
632-
const _options = { transforms: listCollectionsTransforms(this.databaseName) };
633-
// Get the cursor
634-
cursor = this.collection(CONSTANTS.SYSTEM_NAMESPACE_COLLECTION).find(filter, _options);
635-
// Do we have a readPreference, apply it
636-
if (options.readPreference) cursor.setReadPreference(options.readPreference);
637-
// Set the passed in batch size if one was provided
638-
if (options.batchSize) cursor = cursor.batchSize(options.batchSize);
639-
// We have a fallback mode using legacy systems collections
640-
return cursor;
575+
return new CommandCursor(
576+
this.s.topology,
577+
new ListCollectionsOperation(this, filter, options),
578+
options
579+
);
641580
};
642581

643582
/**

lib/operations/list_collections.js

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
'use strict';
2+
3+
const CommandOperationV2 = require('./command_v2');
4+
const Aspect = require('./operation').Aspect;
5+
const defineAspects = require('./operation').defineAspects;
6+
const maxWireVersion = require('../core/utils').maxWireVersion;
7+
const listCollectionsTransforms = require('./db_ops').listCollectionsTransforms;
8+
const CONSTANTS = require('../constants');
9+
10+
const LIST_COLLECTIONS_WIRE_VERSION = 3;
11+
12+
class ListCollectionsOperation extends CommandOperationV2 {
13+
constructor(db, filter, options) {
14+
super(db, options);
15+
this.options.full = true;
16+
17+
this.db = db;
18+
this.filter = filter;
19+
this.nameOnly = !!this.options.nameOnly;
20+
21+
if (typeof this.options.batchSize === 'number') {
22+
this.batchSize = this.options.batchSize;
23+
}
24+
}
25+
26+
execute(server, callback) {
27+
if (maxWireVersion(server) < LIST_COLLECTIONS_WIRE_VERSION) {
28+
let filter = this.filter;
29+
const databaseName = this.db.s.namespace.db;
30+
31+
// If we have legacy mode and have not provided a full db name filter it
32+
if (
33+
typeof filter.name === 'string' &&
34+
!new RegExp('^' + databaseName + '\\.').test(filter.name)
35+
) {
36+
filter = Object.assign({}, filter);
37+
filter.name = this.db.s.namespace.withCollection(filter.name).toString();
38+
}
39+
40+
// No filter, filter by current database
41+
if (filter == null) {
42+
filter.name = `/${databaseName}/`;
43+
}
44+
45+
// Rewrite the filter to use $and to filter out indexes
46+
if (filter.name) {
47+
filter = { $and: [{ name: filter.name }, { name: /^((?!\$).)*$/ }] };
48+
} else {
49+
filter = { name: /^((?!\$).)*$/ };
50+
}
51+
52+
const transforms = listCollectionsTransforms(databaseName);
53+
server.query(
54+
`${databaseName}.${CONSTANTS.SYSTEM_NAMESPACE_COLLECTION}`,
55+
{ query: filter },
56+
{ batchSize: this.batchSize || 1000 },
57+
{},
58+
(err, result) => {
59+
if (
60+
result &&
61+
result.message &&
62+
result.message.documents &&
63+
Array.isArray(result.message.documents)
64+
) {
65+
result.message.documents = result.message.documents.map(transforms.doc);
66+
}
67+
68+
callback(err, result);
69+
}
70+
);
71+
72+
return;
73+
}
74+
75+
const command = {
76+
listCollections: 1,
77+
filter: this.filter,
78+
cursor: this.batchSize ? { batchSize: this.batchSize } : {},
79+
nameOnly: this.nameOnly
80+
};
81+
82+
return super.executeCommand(server, command, callback);
83+
}
84+
}
85+
86+
defineAspects(ListCollectionsOperation, [
87+
Aspect.READ_OPERATION,
88+
Aspect.RETRYABLE,
89+
Aspect.EXECUTE_WITH_SELECTION
90+
]);
91+
92+
module.exports = ListCollectionsOperation;

test/functional/retryable_reads_tests.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ describe('Retryable Reads', function() {
2121
spec.description.match(/countDocuments/i) ||
2222
spec.description.match(/listIndexes/i) ||
2323
spec.description.match(/listDatabases/i) ||
24-
spec.description.match(/listDatabaseNames/i)
24+
spec.description.match(/listDatabaseNames/i) ||
25+
spec.description.match(/listCollections/i) ||
26+
spec.description.match(/listCollectionNames/i)
2527
);
2628
});
2729
});

test/functional/runner/index.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,7 @@ function extractBulkRequests(requests) {
415415
function translateOperationName(operationName) {
416416
if (operationName === 'runCommand') return 'command';
417417
if (operationName === 'listDatabaseNames') return 'listDatabases';
418+
if (operationName === 'listCollectionNames') return 'listCollections';
418419
return operationName;
419420
}
420421

@@ -464,7 +465,7 @@ function resolveOperationArgs(operationName, operationArgs, context) {
464465
return result;
465466
}
466467

467-
const CURSOR_COMMANDS = new Set(['find', 'aggregate', 'listIndexes']);
468+
const CURSOR_COMMANDS = new Set(['find', 'aggregate', 'listIndexes', 'listCollections']);
468469
const ADMIN_COMMANDS = new Set(['listDatabases']);
469470

470471
/**
@@ -557,7 +558,7 @@ function testOperation(operation, obj, context, options) {
557558
obj = obj.db().admin();
558559
}
559560

560-
if (operation.name === 'listDatabaseNames') {
561+
if (operation.name === 'listDatabaseNames' || operation.name === 'listCollectionNames') {
561562
opOptions.nameOnly = true;
562563
}
563564

0 commit comments

Comments
 (0)