Skip to content

Commit 6588350

Browse files
committed
Merge remote-tracking branch 'origin/main' into vm-load-plugins
2 parents 592d314 + ae63ede commit 6588350

File tree

5 files changed

+41
-88
lines changed

5 files changed

+41
-88
lines changed

resources/RecordEncoder.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ export let lastMetadata: Entry | null = null;
7575
export class RecordEncoder extends Encoder {
7676
structureUpdate?: any;
7777
isRocksDB: boolean;
78+
name: string;
7879
constructor(options) {
7980
options.useBigIntExtension = true;
8081
/**
@@ -116,14 +117,23 @@ export class RecordEncoder extends Encoder {
116117
if (expiresAt >= 0) {
117118
valueStart += 8; // make room for expiration timestamp
118119
expiresAtNextEncoding = -1; // reset indicator to mean no expiration
120+
if (!(metadata & HAS_EXPIRATION)) {
121+
throw new Error('Expiration included, but not in metadata flags');
122+
}
119123
}
120124
if (residencyId) {
121125
valueStart += 4; // make room for residency id
122126
residencyIdAtNextEncoding = 0; // reset indicator to mean no residency id
127+
if (!(metadata & HAS_RESIDENCY_ID)) {
128+
throw new Error('Residency id included, but not in metadata flags');
129+
}
123130
}
124131
if (nodeId >= 0) {
125132
valueStart += 4; // make room for node id
126133
nodeIdAtNextEncoding = -1; // reset indicator to mean no node id
134+
if (!(metadata & HAS_NODE_ID)) {
135+
throw new Error('Node id included, but not in metadata flags');
136+
}
127137
}
128138
if (additionalAuditRefs && additionalAuditRefs.length > 0) {
129139
valueStart += 1 + additionalAuditRefs.length * 12; // 1 byte for count + 8 bytes version + 4 bytes nodeId per ref
@@ -553,17 +563,17 @@ export function recordUpdater(store, tableId, auditStore) {
553563
residencyIdAtNextEncoding = residencyId;
554564
metadataInNextEncoding |= HAS_RESIDENCY_ID;
555565
extendedType |= HAS_CURRENT_RESIDENCY_ID;
556-
} // else residencyIdAtNextEncoding = 0;
566+
} else residencyIdAtNextEncoding = 0;
557567
const nodeId = options?.nodeId;
558568
if (nodeId >= 0) {
559569
nodeIdAtNextEncoding = nodeId;
560570
metadataInNextEncoding |= HAS_NODE_ID;
561-
} // else nodeIdAtNextEncoding = -1;
571+
} else nodeIdAtNextEncoding = -1;
562572
const additionalAuditRefs = options?.additionalAuditRefs;
563573
if (additionalAuditRefs && additionalAuditRefs.length > 0) {
564574
additionalAuditRefsNextEncoding = additionalAuditRefs;
565575
metadataInNextEncoding |= HAS_ADDITIONAL_AUDIT_REFS;
566-
} // else additionalAuditRefsNextEncoding = undefined;
576+
} else additionalAuditRefsNextEncoding = undefined;
567577
const previousAdditionalAuditRefs = existingEntry?.additionalAuditRefs;
568578
if (previousAdditionalAuditRefs && previousAdditionalAuditRefs.length > 0) {
569579
extendedType |= HAS_ADDITIONAL_AUDIT_REFS_AUDIT;
@@ -606,7 +616,6 @@ export function recordUpdater(store, tableId, auditStore) {
606616
const structureVersion = store.encoder.structures.length + (store.encoder.typedStructs?.length ?? 0);
607617
const nodeId = options?.nodeId ?? server.replication?.getThisNodeId(auditStore) ?? 0;
608618
const viaNodeId = options?.viaNodeId ?? nodeId;
609-
logger.debug('recording audit entry', { id, newVersion, previousVersion: existingEntry?.version, nodeId });
610619
if (resolveRecord && existingEntry?.localTime) {
611620
const replacingId = existingEntry?.localTime;
612621
const replacingEntry = auditStore.get(replacingId, tableId, id);

resources/RocksTransactionLogStore.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ export class RocksTransactionLogStore extends EventEmitter {
317317
let totalSize = 0;
318318
const logs = [];
319319
for (const log of this.loadLogs()) {
320+
if (!log) continue;
320321
const size = log.getLogFileSize();
321322
totalSize += size;
322323
logs.push({ name: log.name, size });

resources/databases.ts

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ function openRocksDatabase(path: string, options: RocksDatabaseOptions & { dupSo
113113
db = RocksDatabase.open(new RocksIndexStore(path, options)) as RocksDatabaseEx;
114114
} else {
115115
db = RocksDatabase.open(path, options) as RocksDatabaseEx;
116+
db.encoder.name = options.name;
116117
}
117118
db.env = {};
118119
return db;
@@ -847,7 +848,7 @@ export function table<TableResourceType>(tableDefinition: TableDefinition): Tabl
847848
if (attribute.expiresAt) attribute.indexed = true;
848849
}
849850
let hasChanges;
850-
let txnCommit;
851+
let releaseExclusiveLock: () => void;
851852
if (Table) {
852853
primaryKey = Table.primaryKey;
853854
if (Table.primaryStore.rootStore.status === 'closed') {
@@ -891,10 +892,10 @@ export function table<TableResourceType>(tableDefinition: TableDefinition): Tabl
891892
attributesDbi = rootStore.dbisDb = rootStore.openDB(INTERNAL_DBIS_NAME, internalDbiInit);
892893
}
893894

894-
startTxn(); // get an exclusive lock on the database so we can verify that we are the only thread creating the table (and assigning the table id)
895+
exclusiveLock(); // get an exclusive lock on the database so we can verify that we are the only thread creating the table (and assigning the table id)
895896
if (attributesDbi.getSync(dbiName)) {
896897
// table was created while we were setting up
897-
if (txnCommit) txnCommit();
898+
if (releaseExclusiveLock) releaseExclusiveLock();
898899
resetDatabases();
899900
return table(tableDefinition);
900901
}
@@ -969,7 +970,7 @@ export function table<TableResourceType>(tableDefinition: TableDefinition): Tabl
969970
const attribute = attributes.find((attribute) => attribute.name === attribute_name);
970971
const removeIndex = !attribute?.indexed && value.indexed && !value.isPrimaryKey;
971972
if (!attribute || removeIndex) {
972-
startTxn();
973+
exclusiveLock();
973974
hasChanges = true;
974975
if (!attribute) attributesDbi.remove(key);
975976
if (removeIndex) {
@@ -1012,7 +1013,7 @@ export function table<TableResourceType>(tableDefinition: TableDefinition): Tabl
10121013
if (replicate !== undefined) updatedPrimaryAttribute.replicate = replicate;
10131014
if (attribute.type) updatedPrimaryAttribute.type = attribute.type;
10141015
hasChanges = true; // send out notification of the change
1015-
startTxn();
1016+
exclusiveLock();
10161017
attributesDbi.put(dbiKey, updatedPrimaryAttribute);
10171018
}
10181019

@@ -1038,7 +1039,7 @@ export function table<TableResourceType>(tableDefinition: TableDefinition): Tabl
10381039
attributeDescriptor.restartNumber < workerData?.restartNumber
10391040
) {
10401041
hasChanges = true;
1041-
startTxn();
1042+
exclusiveLock();
10421043
attributeDescriptor = attributesDbi.getSync(dbiKey);
10431044
if (
10441045
changed ||
@@ -1068,12 +1069,12 @@ export function table<TableResourceType>(tableDefinition: TableDefinition): Tabl
10681069
indices[attribute.name] = dbi;
10691070
} else if (changed) {
10701071
hasChanges = true;
1071-
startTxn();
1072+
exclusiveLock();
10721073
attributesDbi.put(dbiKey, attribute);
10731074
}
10741075
}
10751076
} finally {
1076-
if (txnCommit) txnCommit();
1077+
if (releaseExclusiveLock) releaseExclusiveLock();
10771078
}
10781079
if (hasChanges) {
10791080
Table.schemaVersion++;
@@ -1102,15 +1103,24 @@ export function table<TableResourceType>(tableDefinition: TableDefinition): Tabl
11021103
logger.trace(`${tableName} table loaded`);
11031104

11041105
return Table as TableResourceType;
1105-
function startTxn() {
1106-
if (txnCommit) return;
1107-
rootStore.transactionSync(() => {
1108-
return {
1109-
then(callback) {
1110-
txnCommit = callback;
1111-
},
1106+
// Acquire an exclusive lock for attribute updates
1107+
function exclusiveLock() {
1108+
if (releaseExclusiveLock) return;
1109+
if (rootStore instanceof RocksDatabase) {
1110+
while (!rootStore.tryLock('update-attributes')) {} // use a spin lock, we really need an synchronous exclusive lock here
1111+
releaseExclusiveLock = () => {
1112+
rootStore.unlock('update-attributes');
11121113
};
1113-
});
1114+
} else {
1115+
// we only need an exclusive transaction lock in lmdb
1116+
rootStore.transactionSync(() => {
1117+
return {
1118+
then(callback) {
1119+
releaseExclusiveLock = callback;
1120+
},
1121+
};
1122+
});
1123+
}
11141124
}
11151125
}
11161126
const MAX_OUTSTANDING_INDEXING = 1000;

utility/lmdb/environmentUtility.js

Lines changed: 1 addition & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ const { OpenDBIObject } = require('./OpenDBIObject.js');
1111
const OpenEnvironmentObject = require('./OpenEnvironmentObject.js');
1212
const lmdbTerms = require('./terms.js');
1313
const hdbTerms = require('../hdbTerms.ts');
14-
const { table, resetDatabases } = require('../../resources/databases.ts');
14+
const { resetDatabases } = require('../../resources/databases.ts');
1515
const envMngr = require('../environment/environmentManager.js');
1616

1717
const INTERNAL_DBIS_NAME = lmdbTerms.INTERNAL_DBIS_NAME;
@@ -21,41 +21,6 @@ const MDB_LEGACY_LOCK_FILE_NAME = 'lock.mdb';
2121
const MDB_FILE_EXTENSION = '.mdb';
2222
const MDB_LOCK_FILE_SUFFIX = '-lock';
2323

24-
/**
25-
* This class is used to create the transaction & cursor objects needed to perform search on a dbi as well as a function to close both objects after use
26-
*/
27-
class TransactionCursor {
28-
/**
29-
* create the TransactionCursor object
30-
* @param {lmdb.RootDatabase} env - environment object to create the transaction & cursor from
31-
* @param {String} attribute - name of the attribute to create the cursor against
32-
* @param {Boolean} [writeCursor] - optional, dictates if the cursor created will be a readOnly cursor or not
33-
*/
34-
constructor(env, attribute, writeCursor = false) {
35-
this.dbi = openDBI(env, attribute);
36-
this.key_type = this.dbi[lmdbTerms.DBI_DEFINITION_NAME].key_type;
37-
this.isPrimaryKey = this.dbi[lmdbTerms.DBI_DEFINITION_NAME].isPrimaryKey;
38-
this.txn = env.beginTxn({ readOnly: writeCursor === false });
39-
this.cursor = new lmdb.Cursor(this.txn, this.dbi);
40-
}
41-
42-
/**
43-
* function to close the read cursor & abort the transaction
44-
*/
45-
close() {
46-
this.cursor.close();
47-
this.txn.abort();
48-
}
49-
50-
/**
51-
* function to close the read cursor & abort the transaction
52-
*/
53-
commit() {
54-
this.cursor.close();
55-
this.txn.commit();
56-
}
57-
}
58-
5924
/*** VALIDATION FUNCTIONS ***/
6025

6126
/**
@@ -172,18 +137,6 @@ async function createEnvironment(basePath, envName, isTxn = false, isV3 = false)
172137
}
173138
}
174139

175-
async function copyEnvironment(basePath, envName, _destinationPath, _compactEnvironment = true) {
176-
pathEnvNameValidation(basePath, envName);
177-
envName = envName.toString();
178-
let environmentPath = path.join(basePath, envName);
179-
return table({
180-
table: envName,
181-
database: path.parse(basePath).name,
182-
path: environmentPath,
183-
attributes: [{ name: 'id', isPrimaryKey: true }],
184-
});
185-
}
186-
187140
/**
188141
* opens an environment
189142
* @returns {lmdb.RootDatabase} - lmdb environment object
@@ -456,22 +409,6 @@ function statDBI(env, dbiName) {
456409
return stats;
457410
}
458411

459-
/**
460-
* gets the byte size of an environment file
461-
* @param {String} environmentBasePath
462-
* @param {String} tableName
463-
* @returns {Promise<number>}
464-
*/
465-
async function environmentDataSize(environmentBasePath, tableName) {
466-
try {
467-
let environmentPath = path.join(environmentBasePath, tableName + MDB_FILE_EXTENSION);
468-
let statResult = await fs.stat(environmentPath);
469-
return statResult['size'];
470-
} catch {
471-
throw new Error(LMDB_ERRORS.INVALID_ENVIRONMENT);
472-
}
473-
}
474-
475412
/**
476413
* removes a named database from an environment
477414
* @param {lmdb.RootDatabase} env - environment object used thigh level to interact with all data in an environment
@@ -536,8 +473,5 @@ module.exports = {
536473
statDBI,
537474
deleteEnvironment,
538475
initializeDBIs,
539-
TransactionCursor,
540-
environmentDataSize,
541-
copyEnvironment,
542476
closeEnvironment,
543477
};

utility/lmdb/searchUtility.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -829,7 +829,6 @@ function batchHashSearch(transactionOrEnv, hash_attribute, fetchAttributes, ids,
829829
* @param {Array.<String>} fetchAttributes - string array of attributes to pull from the object
830830
* @param {Array.<String>} ids - list of ids to search
831831
* @param {[]} [_notFound] -optional, meant to be an array passed by reference so that skipped ids can be aggregated.
832-
* @returns {TransactionCursor}
833832
*/
834833
function initializeBatchSearchByHash(transactionOrEnv, hash_attribute, fetchAttributes, ids, _notFound) {
835834
common.validateEnv(transactionOrEnv);

0 commit comments

Comments
 (0)