diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7c0aaec13..3bf56cbeb 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -81,7 +81,7 @@ These are the steps @Ethan-Arrowood has been following to synchronize the reposi ### Last Synchronized Commit -`e1ea920d74e919140ae89d5ca4d75614c10c2925` +`916492e0bd147b79fd11d3b7bc41902999d64c2b` ## Code of Conduct diff --git a/bin/run.js b/bin/run.js index 52ed52721..47e12ca3d 100755 --- a/bin/run.js +++ b/bin/run.js @@ -270,6 +270,25 @@ function startupLog(portResolutions) { : 'disabled' }`; logMsg += `, unix socket: ${env.get(CONFIG_PARAMS.OPERATIONSAPI_NETWORK_DOMAINSOCKET)}\n`; + if (env.get(CONFIG_PARAMS.OPERATIONSAPI_NETWORK_PORT)) { + logMsg += + pad('') + + 'http://' + + env.get(CONFIG_PARAMS.NODE_HOSTNAME) + + ':' + + env.get(CONFIG_PARAMS.OPERATIONSAPI_NETWORK_PORT) + + '/\n'; + } + if (env.get(CONFIG_PARAMS.OPERATIONSAPI_NETWORK_SECUREPORT)) { + logMsg += + '\n' + + pad('') + + 'https://' + + env.get(CONFIG_PARAMS.NODE_HOSTNAME) + + ':' + + env.get(CONFIG_PARAMS.OPERATIONSAPI_NETWORK_SECUREPORT) + + '/\n'; + } // MQTT Log logMsg += pad('MQTT:'); @@ -311,19 +330,24 @@ function startupLog(portResolutions) { // portResolutions is a Map of port to protocol name and component name built in threadServer.js // we iterate through the map to build a log for REST and for any components that are using custom ports let comps = {}; + let restHostnames = []; let restLog = `${pad('REST:')}`; for (const [key, values] of portResolutions) { for (const value of values) { const name = value.name; - if (name === 'rest') { - restLog += `${value.protocol_name}: ${key}, `; + const pair = `${value.protocol_name}: ${key}, `; + if (!restLog.includes(pair) && name === 'rest') { + restLog += pair; + if (value.protocol_name === 'HTTP' || value.protocol_name === 'HTTPS') { + restHostnames.push(`${value.protocol_name.toLowerCase()}://${env.get(CONFIG_PARAMS.NODE_HOSTNAME)}:${key}/`); + } } if (components.includes(name)) { if (comps[name]) { - comps[name] += `${value.protocol_name}: ${key}, `; + comps[name] += pair; } else { - comps[name] = `${value.protocol_name}: ${key}, `; + comps[name] = pair; } } } @@ -333,6 +357,9 @@ function startupLog(portResolutions) { if (restLog.length > padding + 1) { restLog = restLog.slice(0, -2); logMsg += `${restLog}\n`; + for (const restHostname of restHostnames) { + logMsg += pad('') + restHostname + '\n'; + } } let appPortsLog = env.get(CONFIG_PARAMS.HTTP_PORT) ? `HTTP: ${env.get(CONFIG_PARAMS.HTTP_PORT)}, ` : ''; diff --git a/index.d.ts b/index.d.ts index 5b67b93f7..eac2070b3 100644 --- a/index.d.ts +++ b/index.d.ts @@ -1,9 +1,16 @@ -export { ResourceV2 as Resource } from './resources/ResourceV2.ts'; -import { ResourceV2 as ResourceImport } from './resources/ResourceV2.ts'; -export type { Query, Context, SubscriptionRequest, RequestTargetOrId } from './resources/ResourceInterface.ts'; -export { ResourceInterfaceV2 as ResourceInterface } from './resources/ResourceInterfaceV2.ts'; +export { Resource } from './resources/Resource.ts'; +import { Resource as ResourceImport } from './resources/Resource.ts'; +export type { + Query, + Context, + SourceContext, + SubscriptionRequest, + RequestTargetOrId, +} from './resources/ResourceInterface.ts'; +export { ResourceInterface } from './resources/ResourceInterface.ts'; export type { User } from './security/user.ts'; export type { RecordObject } from './resources/RecordEncoder.ts'; +export type { IterableEventQueue } from './resources/IterableEventQueue.ts'; export { RequestTarget } from './resources/RequestTarget.ts'; export { server } from './server/Server'; import { server as serverImport } from './server/Server.ts'; diff --git a/index.js b/index.js index df44adda2..0e50b714e 100644 --- a/index.js +++ b/index.js @@ -10,11 +10,13 @@ const { globals } = require('./server/threads/threadServer.js'); exports.Config = undefined; exports.ConfigValue = undefined; exports.Context = undefined; +exports.SourceContext = undefined; exports.FileAndURLPathConfig = undefined; exports.FilesOption = undefined; exports.FilesOptionObject = undefined; exports.Query = undefined; exports.RecordObject = undefined; +exports.IterableEventQueue = undefined; exports.RequestTarget = undefined; exports.RequestTargetOrId = undefined; exports.Resource = undefined; diff --git a/package.json b/package.json index 67f189d7f..e4694294e 100644 --- a/package.json +++ b/package.json @@ -95,7 +95,7 @@ "exports": { ".": "./index.js", "./v1": "./v1.js", - "./v2": "./index.js" + "./v2": "./v2.js" }, "devDependencies": { "@harperdb/code-guidelines": "^0.0.6", @@ -172,7 +172,7 @@ "json2csv": "5.0.7", "jsonata": "1.8.7", "jsonwebtoken": "9.0.3", - "lmdb": "3.4.4", + "lmdb": "3.5.1", "lodash": "4.17.21", "mathjs": "11.12.0", "micromatch": "^4.0.8", diff --git a/resources/RequestTarget.ts b/resources/RequestTarget.ts index d6d1c156c..729cd1cda 100644 --- a/resources/RequestTarget.ts +++ b/resources/RequestTarget.ts @@ -49,6 +49,7 @@ export class RequestTarget extends URLSearchParams { declare previousResidency?: string[]; declare checkPermission?: UserRoleDatabasePermissions | boolean; + declare subscribe?: boolean; declare allowFullScan?: boolean; declare allowConditionsOnDynamicAttributes?: boolean; diff --git a/resources/Resource.ts b/resources/Resource.ts index 46a7ea258..02325bc4a 100644 --- a/resources/Resource.ts +++ b/resources/Resource.ts @@ -43,7 +43,7 @@ const EXTENSION_TYPES = { */ export class Resource implements ResourceInterface { readonly #id: Id; - readonly #context: Context; + readonly #context: Context | SourceContext; #isCollection: boolean; static transactions: Transaction[] & { timestamp: number }; static directURLMapping = false; @@ -412,7 +412,7 @@ export class Resource implements ResourceInterface< return new IterableEventQueue(); } - connect(target: RequestTarget, incomingMessages: IterableEventQueue): AsyncIterable { + connect(target: RequestTarget, incomingMessages: IterableEventQueue): AsyncIterable { // convert subscription to an (async) iterator const query = this.constructor.loadAsInstance === false ? target : incomingMessages; if (query?.subscribe !== false) { @@ -450,6 +450,14 @@ export class Resource implements ResourceInterface< return this.#context; } + /** + * Get the current user for the current request, based on the context. + * @returns user object or undefined if no user is logged in + */ + getCurrentUser(): User | undefined { + return (this.getContext() as Context)?.user; + } + get?( target?: RequestTargetOrId ): diff --git a/resources/ResourceInterface.ts b/resources/ResourceInterface.ts index 539a22450..45f434360 100644 --- a/resources/ResourceInterface.ts +++ b/resources/ResourceInterface.ts @@ -157,7 +157,7 @@ export interface SubSelect { } export type Select = (string | SubSelect)[]; -export interface SubscriptionRequest { +export interface SubscriptionRequest extends RequestTarget { /** The starting time of events to return (defaults to now) */ startTime?: number; /** The count of previously recorded events to return */ @@ -168,7 +168,7 @@ export interface SubscriptionRequest { includeDescendants?: boolean; supportsTransactions?: boolean; rawEvents?: boolean; - listener: Listener; + listener?: Listener; } export type Query = RequestTarget; // for back-compat diff --git a/resources/ResourceV2.ts b/resources/ResourceV2.ts index 5d7512c5d..534fa63ee 100644 --- a/resources/ResourceV2.ts +++ b/resources/ResourceV2.ts @@ -1,5 +1,7 @@ import { _assignPackageExport } from '../globals.js'; +import { IterableEventQueue } from './IterableEventQueue.js'; import type { RecordObject } from './RecordEncoder.js'; +import { RequestTarget } from './RequestTarget.js'; import { Resource } from './Resource.ts'; import { RequestTargetOrId } from './ResourceInterface.ts'; @@ -57,6 +59,8 @@ export class ResourceV2 extends Resource { delete?(target: RequestTargetOrId): boolean | Promise; invalidate?(target: RequestTargetOrId): void | Promise; + // @ts-expect-error We swapped the order of target and record. + connect?(incomingMessages: IterableEventQueue, target: RequestTarget): AsyncIterable; publish?(target: RequestTargetOrId, record: Record): void; } diff --git a/resources/Table.ts b/resources/Table.ts index 434d63ad9..a7fc87656 100644 --- a/resources/Table.ts +++ b/resources/Table.ts @@ -139,7 +139,7 @@ export function makeTable(options) { splitSegments, replicate, } = options; - let { expirationMS: expirationMs, evictionMS: evictionMs, audit, trackDeletes: trackDeletes } = options; + let { expirationMS: expirationMs, evictionMS: evictionMs, audit, trackDeletes } = options; evictionMs ??= 0; let { attributes } = options; if (!attributes) attributes = []; @@ -1866,7 +1866,7 @@ export function makeTable(options) { TableResource._updateResource(this, existingEntry); } if (precedesExistingVersion(txnTime, existingEntry, options?.nodeId) <= 0) return; // a newer record exists locally - updateIndices(this.getId(), existingRecord); + updateIndices(id, existingRecord); logger.trace?.(`Deleting record with id: ${id}, txn timestamp: ${new Date(txnTime).toISOString()}`); if (audit || trackDeletes) { updateRecord( @@ -4219,9 +4219,9 @@ export function makeTable(options) { ? async () => { // if we are calling the sources first and waiting for blobs, do those in order await callSources(); - await blobCompletion; + await blobCompletion(); } - : () => blobCompletion; + : () => blobCompletion(); } return before; } diff --git a/resources/analytics/write.ts b/resources/analytics/write.ts index 3b4e5f2f4..1eb5ff118 100644 --- a/resources/analytics/write.ts +++ b/resources/analytics/write.ts @@ -236,16 +236,20 @@ export interface Metric { [key: string]: any; } -function storeMetric(table: Table, metric: Metric) { - const hostname = server.hostname; +function getHostNodeId(hostname: string) { let nodeId = nodeIds.get(hostname); if (nodeId) { log.trace?.('storeMetric cached nodeId:', nodeId); - } else { - nodeId = stableNodeId(hostname); - log.trace?.('storeMetric new nodeId:', nodeId); - nodeIds.set(hostname, nodeId); + return nodeId; } + nodeId = stableNodeId(hostname); + log.trace?.('storeMetric new nodeId:', nodeId); + nodeIds.set(hostname, nodeId); + return nodeId; +} + +function storeMetric(table: Table, metric: Metric) { + const nodeId = getHostNodeId(server.hostname); const metricValue = { id: [getNextMonotonicTime(), nodeId], ...metric, @@ -382,13 +386,14 @@ async function aggregation(fromPeriod, toPeriod = 60000) { }); }); let lastForPeriod; - // find the last entry for this period + const localNodeId = getHostNodeId(server.hostname); + // find the last entry for this period for the local node only for (const entry of analyticsTable.primaryStore.getRange({ start: Infinity, end: false, reverse: true, })) { - if (!entry.value?.time) continue; + if (!entry.value?.time || entry.value?.id[1] !== localNodeId) continue; lastForPeriod = entry.value.time; break; } diff --git a/resources/blob.ts b/resources/blob.ts index 0538ba157..fd20a3981 100644 --- a/resources/blob.ts +++ b/resources/blob.ts @@ -974,17 +974,25 @@ export function findBlobsInObject(object: any, callback: (blob: Blob) => void) { * @param record * @param store */ -export function startPreCommitBlobsForRecord(record: any, store: LMDBStore) { - let completion; +export function startPreCommitBlobsForRecord(record: any, store: LMDBStore): (() => Promise) | void { + let blobsNeedingSaving = []; for (const key in record) { const value = record[key]; if (value instanceof FileBackedBlob && value.saveBeforeCommit) { - currentStore = store; - const saving = saveBlob(value, true).saving ?? Promise.resolve(); - completion = completion ? Promise.all(completion, saving) : saving; + blobsNeedingSaving.push(value); } } - return completion; + if (blobsNeedingSaving.length > 0) { + // we do have blobs, start saving once the returned function is called + return () => { + currentStore = store; + return Promise.all( + blobsNeedingSaving.map((blob) => { + return saveBlob(blob, true).saving ?? Promise.resolve(); + }) + ); + }; + } } const copyingUnpacker = new Packr({ copyBuffers: true, mapsAsObjects: true }); diff --git a/resources/transactionBroadcast.ts b/resources/transactionBroadcast.ts index 1c0fd4380..af96c4cd6 100644 --- a/resources/transactionBroadcast.ts +++ b/resources/transactionBroadcast.ts @@ -206,11 +206,11 @@ export function listenToCommits(primaryStore, auditStore) { notifyFromTransactionData(subscriptions); } finally { store.threadLocalWrites[0] = subscriptions.lastTxnTime; // update shared buffer - store.unlock('thread-local-writes'); // and release the lock + store.unlock('thread-local-writes', 0); // and release the lock } }; // try to get lock or wait for it - if (!store.attemptLock('thread-local-writes', acquiredLock)) return; + if (!store.attemptLock('thread-local-writes', 0, acquiredLock)) return; acquiredLock(); }); } diff --git a/unitTests/resources/blob.test.js b/unitTests/resources/blob.test.js index 7d818989b..d9b0a59f6 100644 --- a/unitTests/resources/blob.test.js +++ b/unitTests/resources/blob.test.js @@ -137,16 +137,19 @@ describe('Blob test', () => { await delay(20); // wait for the file to be deleted assert(!existsSync(filePath)); }); - it('create a blob from a buffer and call save() but then abort', async () => { + it('create a blob from a buffer and call save() but then fail validation', async () => { let blob; - try { - await transaction({}, async () => { - let random = randomBytes(5000 * Math.random() + 20000); - blob = createBlob(random); - blob.save(BlobTest); - throw new Error('test error'); // abort the transaction - }); - } catch (error) {} + class BlobTestFailsValidation extends BlobTest { + validate() { + throw new Error('test error'); // simulate when too much queue errors are thrown + } + } + assert.throws(() => { + let random = randomBytes(5000 * Math.random() + 20000); + blob = createBlob(random); + blob.save(); + BlobTestFailsValidation.put({ id: 1, blob }); + }); assert(blob); assert(!isSaving(blob)); // ensure that it is not saving or saved }); diff --git a/unitTests/resources/crud.test.js b/unitTests/resources/crud.test.js index 4ceb62aa8..16b4e2a75 100644 --- a/unitTests/resources/crud.test.js +++ b/unitTests/resources/crud.test.js @@ -145,7 +145,7 @@ describe('CRUD operations with the Resource API', () => { if (analyticRecorded) break; } assert(analyticRecorded, 'db-write was recorded in analytics'); - assert(analyticRecorded.mean > 20, 'db-write bytes count were recorded in analytics'); + assert(analyticRecorded.mean > 2, 'db-write bytes count were recorded in analytics'); }); it('get is recorded in analytics', async function () { const start = Date.now(); @@ -257,6 +257,31 @@ describe('CRUD operations with the Resource API', () => { else created = await CRUDTable.create('three', { relatedId: 1, name: 'Three' }); }); }); + it('delete all and recreate', async function () { + await CRUDTable.put({ + id: 'one', + name: 'One', + relatedId: 1, + sparse: null, + }); + await CRUDTable.put({ + id: 'two', + name: 'Two', + relatedId: 1, + sparse: null, + }); + let target = new RequestTarget('/'); + await CRUDTable.delete(target); + await CRUDTable.put({ + id: 'one', + name: 'One', + relatedId: 2, + sparse: null, + }); + for await (let entry of CRUDTable.search([{ attribute: 'relatedId', value: 1 }])) { + throw new Error('should not have found any related records with relatedId = 1'); + } + }); } after(() => { analytics.setAnalyticsEnabled(false); // restore to normal unit test behavior diff --git a/v1.d.ts b/v1.d.ts index 7aab619a6..eac2070b3 100644 --- a/v1.d.ts +++ b/v1.d.ts @@ -1,9 +1,16 @@ export { Resource } from './resources/Resource.ts'; import { Resource as ResourceImport } from './resources/Resource.ts'; -export type { Query, Context, SubscriptionRequest, RequestTargetOrId } from './resources/ResourceInterface.ts'; +export type { + Query, + Context, + SourceContext, + SubscriptionRequest, + RequestTargetOrId, +} from './resources/ResourceInterface.ts'; export { ResourceInterface } from './resources/ResourceInterface.ts'; export type { User } from './security/user.ts'; export type { RecordObject } from './resources/RecordEncoder.ts'; +export type { IterableEventQueue } from './resources/IterableEventQueue.ts'; export { RequestTarget } from './resources/RequestTarget.ts'; export { server } from './server/Server'; import { server as serverImport } from './server/Server.ts'; diff --git a/v1.js b/v1.js index df44adda2..0e50b714e 100644 --- a/v1.js +++ b/v1.js @@ -10,11 +10,13 @@ const { globals } = require('./server/threads/threadServer.js'); exports.Config = undefined; exports.ConfigValue = undefined; exports.Context = undefined; +exports.SourceContext = undefined; exports.FileAndURLPathConfig = undefined; exports.FilesOption = undefined; exports.FilesOptionObject = undefined; exports.Query = undefined; exports.RecordObject = undefined; +exports.IterableEventQueue = undefined; exports.RequestTarget = undefined; exports.RequestTargetOrId = undefined; exports.Resource = undefined; diff --git a/v2.d.ts b/v2.d.ts index bd0d3816c..e1741cacf 100644 --- a/v2.d.ts +++ b/v2.d.ts @@ -1 +1,46 @@ -export * from './index.d.ts'; +export { ResourceV2 as Resource } from './resources/ResourceV2.ts'; +import { ResourceV2 as ResourceImport } from './resources/ResourceV2.ts'; +export type { + Query, + Context, + SourceContext, + SubscriptionRequest, + RequestTargetOrId, +} from './resources/ResourceInterface.ts'; +export { ResourceInterfaceV2 as ResourceInterface } from './resources/ResourceInterfaceV2.ts'; +export type { User } from './security/user.ts'; +export type { RecordObject } from './resources/RecordEncoder.ts'; +export type { IterableEventQueue } from './resources/IterableEventQueue.ts'; +export { RequestTarget } from './resources/RequestTarget.ts'; +export { server } from './server/Server'; +import { server as serverImport } from './server/Server.ts'; +export { tables, databases } from './resources/databases.ts'; +import { tables as dbTables, databases as dbDatabases } from './resources/databases.ts'; +import { BlobCreationOptions } from './resources/blob.ts'; +export { Scope } from './components/Scope.ts'; +export type { FilesOption, FilesOptionObject } from './components/deriveGlobOptions.ts'; +export type { FileAndURLPathConfig } from './components/Component.ts'; +export { OptionsWatcher, type Config, type ConfigValue } from './components/OptionsWatcher.ts'; +export { + EntryHandler, + type BaseEntry, + type FileEntry, + type EntryEvent, + type AddFileEvent, + type ChangeFileEvent, + type UnlinkFileEvent, + type FileEntryEvent, + type AddDirectoryEvent, + type UnlinkDirectoryEvent, + type DirectoryEntryEvent, +} from './components/EntryHandler.ts'; +declare global { + const tables: typeof dbTables; + const databases: typeof dbDatabases; + const server: typeof serverImport; + const Resource: typeof ResourceImport; + const createBlob: ( + source: Uint8Array | NodeJS.ReadableStream | string | Iterable | AsyncIterator, + options?: BlobCreationOptions + ) => Blob; +} diff --git a/v2.js b/v2.js new file mode 100644 index 000000000..0e50b714e --- /dev/null +++ b/v2.js @@ -0,0 +1,37 @@ +const workerThreads = require('node:worker_threads'); +if (!workerThreads.isMainThread) { + // Prevents server from starting in worker threads if this was directly imported from a non-server user thread + if (!workerThreads.workerData) workerThreads.workerData = {}; + workerThreads.workerData.noServerStart = true; +} +const { globals } = require('./server/threads/threadServer.js'); + +// exported types are needed for parsing as well +exports.Config = undefined; +exports.ConfigValue = undefined; +exports.Context = undefined; +exports.SourceContext = undefined; +exports.FileAndURLPathConfig = undefined; +exports.FilesOption = undefined; +exports.FilesOptionObject = undefined; +exports.Query = undefined; +exports.RecordObject = undefined; +exports.IterableEventQueue = undefined; +exports.RequestTarget = undefined; +exports.RequestTargetOrId = undefined; +exports.Resource = undefined; +exports.ResourceInterface = undefined; +exports.Scope = undefined; +exports.SubscriptionRequest = undefined; +exports.User = undefined; + +// these are all overwritten by the globals, but need to be here so that Node's static +// exports parser can analyze them +exports.tables = {}; +exports.databases = {}; +exports.getUser = undefined; +exports.server = {}; +exports.contentTypes = null; +exports.threads = []; +exports.logger = {}; +Object.assign(exports, globals);