Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ These are the steps @Ethan-Arrowood has been following to synchronize the reposi

### Last Synchronized Commit

`e1ea920d74e919140ae89d5ca4d75614c10c2925`
`916492e0bd147b79fd11d3b7bc41902999d64c2b`

## Code of Conduct

Expand Down
35 changes: 31 additions & 4 deletions bin/run.js
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,25 @@ function startupLog(portResolutions) {
: 'disabled'
}`;
logMsg += `, unix socket: ${env.get(CONFIG_PARAMS.OPERATIONSAPI_NETWORK_DOMAINSOCKET)}\n`;
if (env.get(CONFIG_PARAMS.OPERATIONSAPI_NETWORK_PORT)) {
logMsg +=
pad('') +
'http://' +
env.get(CONFIG_PARAMS.NODE_HOSTNAME) +
':' +
env.get(CONFIG_PARAMS.OPERATIONSAPI_NETWORK_PORT) +
'/\n';
}
if (env.get(CONFIG_PARAMS.OPERATIONSAPI_NETWORK_SECUREPORT)) {
logMsg +=
'\n' +
pad('') +
'https://' +
env.get(CONFIG_PARAMS.NODE_HOSTNAME) +
':' +
env.get(CONFIG_PARAMS.OPERATIONSAPI_NETWORK_SECUREPORT) +
'/\n';
}

// MQTT Log
logMsg += pad('MQTT:');
Expand Down Expand Up @@ -311,19 +330,24 @@ function startupLog(portResolutions) {
// portResolutions is a Map of port to protocol name and component name built in threadServer.js
// we iterate through the map to build a log for REST and for any components that are using custom ports
let comps = {};
let restHostnames = [];
let restLog = `${pad('REST:')}`;
for (const [key, values] of portResolutions) {
for (const value of values) {
const name = value.name;
if (name === 'rest') {
restLog += `${value.protocol_name}: ${key}, `;
const pair = `${value.protocol_name}: ${key}, `;
if (!restLog.includes(pair) && name === 'rest') {
restLog += pair;
if (value.protocol_name === 'HTTP' || value.protocol_name === 'HTTPS') {
restHostnames.push(`${value.protocol_name.toLowerCase()}://${env.get(CONFIG_PARAMS.NODE_HOSTNAME)}:${key}/`);
}
}

if (components.includes(name)) {
if (comps[name]) {
comps[name] += `${value.protocol_name}: ${key}, `;
comps[name] += pair;
} else {
comps[name] = `${value.protocol_name}: ${key}, `;
comps[name] = pair;
}
}
}
Expand All @@ -333,6 +357,9 @@ function startupLog(portResolutions) {
if (restLog.length > padding + 1) {
restLog = restLog.slice(0, -2);
logMsg += `${restLog}\n`;
for (const restHostname of restHostnames) {
logMsg += pad('') + restHostname + '\n';
}
}

let appPortsLog = env.get(CONFIG_PARAMS.HTTP_PORT) ? `HTTP: ${env.get(CONFIG_PARAMS.HTTP_PORT)}, ` : '';
Expand Down
15 changes: 11 additions & 4 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
export { ResourceV2 as Resource } from './resources/ResourceV2.ts';
import { ResourceV2 as ResourceImport } from './resources/ResourceV2.ts';
export type { Query, Context, SubscriptionRequest, RequestTargetOrId } from './resources/ResourceInterface.ts';
export { ResourceInterfaceV2 as ResourceInterface } from './resources/ResourceInterfaceV2.ts';
export { Resource } from './resources/Resource.ts';
import { Resource as ResourceImport } from './resources/Resource.ts';
export type {
Query,
Context,
SourceContext,
SubscriptionRequest,
RequestTargetOrId,
} from './resources/ResourceInterface.ts';
export { ResourceInterface } from './resources/ResourceInterface.ts';
export type { User } from './security/user.ts';
export type { RecordObject } from './resources/RecordEncoder.ts';
export type { IterableEventQueue } from './resources/IterableEventQueue.ts';
export { RequestTarget } from './resources/RequestTarget.ts';
export { server } from './server/Server';
import { server as serverImport } from './server/Server.ts';
Expand Down
2 changes: 2 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ const { globals } = require('./server/threads/threadServer.js');
exports.Config = undefined;
exports.ConfigValue = undefined;
exports.Context = undefined;
exports.SourceContext = undefined;
exports.FileAndURLPathConfig = undefined;
exports.FilesOption = undefined;
exports.FilesOptionObject = undefined;
exports.Query = undefined;
exports.RecordObject = undefined;
exports.IterableEventQueue = undefined;
exports.RequestTarget = undefined;
exports.RequestTargetOrId = undefined;
exports.Resource = undefined;
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
"exports": {
".": "./index.js",
"./v1": "./v1.js",
"./v2": "./index.js"
"./v2": "./v2.js"
},
"devDependencies": {
"@harperdb/code-guidelines": "^0.0.6",
Expand Down Expand Up @@ -172,7 +172,7 @@
"json2csv": "5.0.7",
"jsonata": "1.8.7",
"jsonwebtoken": "9.0.3",
"lmdb": "3.4.4",
"lmdb": "3.5.1",
"lodash": "4.17.21",
"mathjs": "11.12.0",
"micromatch": "^4.0.8",
Expand Down
1 change: 1 addition & 0 deletions resources/RequestTarget.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ export class RequestTarget extends URLSearchParams {
declare previousResidency?: string[];

declare checkPermission?: UserRoleDatabasePermissions | boolean;
declare subscribe?: boolean;

declare allowFullScan?: boolean;
declare allowConditionsOnDynamicAttributes?: boolean;
Expand Down
12 changes: 10 additions & 2 deletions resources/Resource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const EXTENSION_TYPES = {
*/
export class Resource<Record extends object = any> implements ResourceInterface<Record> {
readonly #id: Id;
readonly #context: Context;
readonly #context: Context | SourceContext;
#isCollection: boolean;
static transactions: Transaction[] & { timestamp: number };
static directURLMapping = false;
Expand Down Expand Up @@ -412,7 +412,7 @@ export class Resource<Record extends object = any> implements ResourceInterface<
return new IterableEventQueue();
}

connect(target: RequestTarget, incomingMessages: IterableEventQueue): AsyncIterable<any> {
connect(target: RequestTarget, incomingMessages: IterableEventQueue<Record>): AsyncIterable<Record> {
// convert subscription to an (async) iterator
const query = this.constructor.loadAsInstance === false ? target : incomingMessages;
if (query?.subscribe !== false) {
Expand Down Expand Up @@ -450,6 +450,14 @@ export class Resource<Record extends object = any> implements ResourceInterface<
return this.#context;
}

/**
* Get the current user for the current request, based on the context.
* @returns user object or undefined if no user is logged in
*/
getCurrentUser(): User | undefined {
return (this.getContext() as Context)?.user;
}

get?(
target?: RequestTargetOrId
):
Expand Down
4 changes: 2 additions & 2 deletions resources/ResourceInterface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ export interface SubSelect {
}
export type Select = (string | SubSelect)[];

export interface SubscriptionRequest {
export interface SubscriptionRequest extends RequestTarget {
/** The starting time of events to return (defaults to now) */
startTime?: number;
/** The count of previously recorded events to return */
Expand All @@ -168,7 +168,7 @@ export interface SubscriptionRequest {
includeDescendants?: boolean;
supportsTransactions?: boolean;
rawEvents?: boolean;
listener: Listener;
listener?: Listener;
}

export type Query = RequestTarget; // for back-compat
Expand Down
4 changes: 4 additions & 0 deletions resources/ResourceV2.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { _assignPackageExport } from '../globals.js';
import { IterableEventQueue } from './IterableEventQueue.js';
import type { RecordObject } from './RecordEncoder.js';
import { RequestTarget } from './RequestTarget.js';
import { Resource } from './Resource.ts';
import { RequestTargetOrId } from './ResourceInterface.ts';

Expand Down Expand Up @@ -57,6 +59,8 @@ export class ResourceV2<Record extends object = any> extends Resource<Record> {
delete?(target: RequestTargetOrId): boolean | Promise<boolean>;
invalidate?(target: RequestTargetOrId): void | Promise<void>;

// @ts-expect-error We swapped the order of target and record.
connect?(incomingMessages: IterableEventQueue<Record>, target: RequestTarget): AsyncIterable<Record>;
publish?(target: RequestTargetOrId, record: Record): void;
}

Expand Down
8 changes: 4 additions & 4 deletions resources/Table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ export function makeTable(options) {
splitSegments,
replicate,
} = options;
let { expirationMS: expirationMs, evictionMS: evictionMs, audit, trackDeletes: trackDeletes } = options;
let { expirationMS: expirationMs, evictionMS: evictionMs, audit, trackDeletes } = options;
evictionMs ??= 0;
let { attributes } = options;
if (!attributes) attributes = [];
Expand Down Expand Up @@ -1866,7 +1866,7 @@ export function makeTable(options) {
TableResource._updateResource(this, existingEntry);
}
if (precedesExistingVersion(txnTime, existingEntry, options?.nodeId) <= 0) return; // a newer record exists locally
updateIndices(this.getId(), existingRecord);
updateIndices(id, existingRecord);
logger.trace?.(`Deleting record with id: ${id}, txn timestamp: ${new Date(txnTime).toISOString()}`);
if (audit || trackDeletes) {
updateRecord(
Expand Down Expand Up @@ -4219,9 +4219,9 @@ export function makeTable(options) {
? async () => {
// if we are calling the sources first and waiting for blobs, do those in order
await callSources();
await blobCompletion;
await blobCompletion();
}
: () => blobCompletion;
: () => blobCompletion();
}
return before;
}
Expand Down
21 changes: 13 additions & 8 deletions resources/analytics/write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,20 @@ export interface Metric {
[key: string]: any;
}

function storeMetric(table: Table, metric: Metric) {
const hostname = server.hostname;
function getHostNodeId(hostname: string) {
let nodeId = nodeIds.get(hostname);
if (nodeId) {
log.trace?.('storeMetric cached nodeId:', nodeId);
} else {
nodeId = stableNodeId(hostname);
log.trace?.('storeMetric new nodeId:', nodeId);
nodeIds.set(hostname, nodeId);
return nodeId;
}
nodeId = stableNodeId(hostname);
log.trace?.('storeMetric new nodeId:', nodeId);
nodeIds.set(hostname, nodeId);
return nodeId;
}

function storeMetric(table: Table, metric: Metric) {
const nodeId = getHostNodeId(server.hostname);
const metricValue = {
id: [getNextMonotonicTime(), nodeId],
...metric,
Expand Down Expand Up @@ -382,13 +386,14 @@ async function aggregation(fromPeriod, toPeriod = 60000) {
});
});
let lastForPeriod;
// find the last entry for this period
const localNodeId = getHostNodeId(server.hostname);
// find the last entry for this period for the local node only
for (const entry of analyticsTable.primaryStore.getRange({
start: Infinity,
end: false,
reverse: true,
})) {
if (!entry.value?.time) continue;
if (!entry.value?.time || entry.value?.id[1] !== localNodeId) continue;
lastForPeriod = entry.value.time;
break;
}
Expand Down
20 changes: 14 additions & 6 deletions resources/blob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -974,17 +974,25 @@ export function findBlobsInObject(object: any, callback: (blob: Blob) => void) {
* @param record
* @param store
*/
export function startPreCommitBlobsForRecord(record: any, store: LMDBStore) {
let completion;
export function startPreCommitBlobsForRecord(record: any, store: LMDBStore): (() => Promise<void[]>) | void {
let blobsNeedingSaving = [];
for (const key in record) {
const value = record[key];
if (value instanceof FileBackedBlob && value.saveBeforeCommit) {
currentStore = store;
const saving = saveBlob(value, true).saving ?? Promise.resolve();
completion = completion ? Promise.all(completion, saving) : saving;
blobsNeedingSaving.push(value);
}
}
return completion;
if (blobsNeedingSaving.length > 0) {
// we do have blobs, start saving once the returned function is called
return () => {
currentStore = store;
return Promise.all(
blobsNeedingSaving.map((blob) => {
return saveBlob(blob, true).saving ?? Promise.resolve();
})
);
};
}
}

const copyingUnpacker = new Packr({ copyBuffers: true, mapsAsObjects: true });
Expand Down
4 changes: 2 additions & 2 deletions resources/transactionBroadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ export function listenToCommits(primaryStore, auditStore) {
notifyFromTransactionData(subscriptions);
} finally {
store.threadLocalWrites[0] = subscriptions.lastTxnTime; // update shared buffer
store.unlock('thread-local-writes'); // and release the lock
store.unlock('thread-local-writes', 0); // and release the lock
}
};
// try to get lock or wait for it
if (!store.attemptLock('thread-local-writes', acquiredLock)) return;
if (!store.attemptLock('thread-local-writes', 0, acquiredLock)) return;
acquiredLock();
});
}
Expand Down
21 changes: 12 additions & 9 deletions unitTests/resources/blob.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,19 @@ describe('Blob test', () => {
await delay(20); // wait for the file to be deleted
assert(!existsSync(filePath));
});
it('create a blob from a buffer and call save() but then abort', async () => {
it('create a blob from a buffer and call save() but then fail validation', async () => {
let blob;
try {
await transaction({}, async () => {
let random = randomBytes(5000 * Math.random() + 20000);
blob = createBlob(random);
blob.save(BlobTest);
throw new Error('test error'); // abort the transaction
});
} catch (error) {}
class BlobTestFailsValidation extends BlobTest {
validate() {
throw new Error('test error'); // simulate when too much queue errors are thrown
}
}
assert.throws(() => {
let random = randomBytes(5000 * Math.random() + 20000);
blob = createBlob(random);
blob.save();
BlobTestFailsValidation.put({ id: 1, blob });
});
assert(blob);
assert(!isSaving(blob)); // ensure that it is not saving or saved
});
Expand Down
27 changes: 26 additions & 1 deletion unitTests/resources/crud.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ describe('CRUD operations with the Resource API', () => {
if (analyticRecorded) break;
}
assert(analyticRecorded, 'db-write was recorded in analytics');
assert(analyticRecorded.mean > 20, 'db-write bytes count were recorded in analytics');
assert(analyticRecorded.mean > 2, 'db-write bytes count were recorded in analytics');
});
it('get is recorded in analytics', async function () {
const start = Date.now();
Expand Down Expand Up @@ -257,6 +257,31 @@ describe('CRUD operations with the Resource API', () => {
else created = await CRUDTable.create('three', { relatedId: 1, name: 'Three' });
});
});
it('delete all and recreate', async function () {
await CRUDTable.put({
id: 'one',
name: 'One',
relatedId: 1,
sparse: null,
});
await CRUDTable.put({
id: 'two',
name: 'Two',
relatedId: 1,
sparse: null,
});
let target = new RequestTarget('/');
await CRUDTable.delete(target);
await CRUDTable.put({
id: 'one',
name: 'One',
relatedId: 2,
sparse: null,
});
for await (let entry of CRUDTable.search([{ attribute: 'relatedId', value: 1 }])) {
throw new Error('should not have found any related records with relatedId = 1');
}
});
}
after(() => {
analytics.setAnalyticsEnabled(false); // restore to normal unit test behavior
Expand Down
Loading
Loading