Skip to content

Commit a828f27

Browse files
Merge pull request #66 from tungv/enhance-jerni-dev
Enhance jerni dev
2 parents 1ae0016 + 41c150b commit a828f27

File tree

18 files changed

+76
-57
lines changed

18 files changed

+76
-57
lines changed

packages/integrations/tests/CLI/cli-integration.spec.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ afterAll(cleanUpTestDatabase);
1111

1212
it("CLI call should project events correctly", async () => {
1313
const dbName = `jerni_integration_test_${nanoid()}`;
14-
const eventDbName = `jerni_integration_test_events_${nanoid()}`;
1514

1615
const { server } = createServer();
1716
const port = server.port;
@@ -23,8 +22,6 @@ it("CLI call should project events correctly", async () => {
2322
const process = exec(
2423
`MONGODB_DBNAME=${dbName} \
2524
MONGODB_URL=mongodb://127.0.0.1:27017 \
26-
EVENTS_DB_MONGODB_URL=mongodb://127.0.0.1:27017 \
27-
EVENTS_DB_MONGODB_NAME=${eventDbName} \
2825
EVENTS_SERVER=http://localhost:${port}/ \
2926
PORT=${healthCheckPort} \
3027
bun run ${jerniCliPath} \

packages/integrations/tests/CLI/jerni-dev.spec.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ describe("Jerni Dev Integration", () => {
1818
const port = Math.floor(Math.random() * 10000) + 10000;
1919
const dbFileName = `./test-events-db-${nanoid()}.yml`;
2020
const mongodbName = `jerni_integration_test_${nanoid()}`;
21-
const eventDbName = `jerni_integration_test_events_${nanoid()}`;
2221

2322
const devCliPath = path.resolve(__dirname, "../../../jerni/src/dev-cli/index.ts");
2423
const initFileName = path.resolve(__dirname, "./makeTestJourneyCli.ts");
@@ -27,8 +26,6 @@ describe("Jerni Dev Integration", () => {
2726
const childProcess = exec(
2827
`PORT=${port}\
2928
MONGODB_DBNAME=${mongodbName}\
30-
EVENTS_DB_MONGODB_URL=mongodb://127.0.0.1:27017 \
31-
EVENTS_DB_MONGODB_NAME=${eventDbName} \
3229
bun run ${devCliPath} ${initFileName} ${dbFilePath}`,
3330
(error, stdout, stderr) => {
3431
// console.log(`stdout: ${stdout}`);
Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
import begin from "@jerni/jerni-3/lib/begin";
22
import type { JourneyInstance } from "@jerni/jerni-3/types";
3-
import { nanoid } from "nanoid";
43

54
export default async function startWorker(journey: JourneyInstance, signal: AbortSignal) {
6-
const eventDbName = `jerni_integration_test_events_${nanoid()}`;
7-
8-
process.env.EVENTS_DB_MONGODB_URL = "mongodb://127.0.0.1:27017";
9-
process.env.EVENTS_DB_MONGODB_NAME = eventDbName;
10-
115
for await (const events of begin(journey, signal)) {
126
// console.log("events", events);
7+
8+
return events;
139
}
1410
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
{
22
"name": "@jerni/store-mongodb",
3-
"version": "0.4.4",
3+
"version": "0.4.5",
44
"exports": {
55
".": "./src/index.ts",
66
"./types": "./src/types.ts"
77
}
8-
}
8+
}

packages/jerni-store-mongodb/src/optimistic/getBulkOperations.ts

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { AnyBulkWriteOperation, Document, UpdateFilter, UpdateOneModel } from "mongodb";
1+
import type { AnyBulkWriteOperation, Document, UpdateOneModel } from "mongodb";
22
import type {
33
DeleteManyOp,
44
DeleteOneOp,
@@ -11,16 +11,9 @@ import type {
1111

1212
interface ChangeWithOp<Change> {
1313
change: Change;
14-
__op: number;
1514
__v: number;
1615
}
1716

18-
type OptimisticDocument = {
19-
__v: number;
20-
__op: number;
21-
[key: string]: unknown;
22-
};
23-
2417
interface Newer {
2518
$or: [{ __v: { $gt: number } }, { __v: number; __op: { $gte: number } }];
2619
}

packages/jerni-store-mongodb/src/store.ts

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import { setTimeout } from "node:timers/promises";
2-
import { type Collection, type Db, type Document, MongoClient } from "mongodb";
2+
import { type AnyBulkWriteOperation, type Collection, type Db, type Document, MongoClient } from "mongodb";
33
import makeTestLogger from "../tests/helpers/makeTestLogger";
44
import getCollectionName from "./getCollectionName";
55
import type MongoDBModel from "./model";
66
import getBulkOperations from "./optimistic/getBulkOperations";
77
import { Signal, clearModelSlots, runWithModel } from "./read";
8-
import type { Changes, JourneyCommittedEvent, MongoDBStore, MongoDBStoreConfig, MongoOps } from "./types";
8+
import type { Changes, JourneyCommittedEvent, MongoDBStore, MongoDBStoreConfig } from "./types";
99

1010
interface SnapshotDocument {
1111
__v: number;
@@ -294,19 +294,12 @@ export default async function makeMongoDBStore(config: MongoDBStoreConfig): Prom
294294

295295
// output is an array of changes per event.
296296
// however for efficiency, we need to handle event per model
297-
type VersionedChange = {
298-
change: MongoOps<Document>;
299-
__op: number;
300-
__v: number;
301-
};
302-
303-
const changesPerModel: VersionedChange[][] = [];
297+
// so we need to group changes per model
298+
const bulkWritesPerModel: AnyBulkWriteOperation<Document>[][] = [];
304299

305300
for (const allChangesForAnEvent of outputs) {
306301
let modelIndex = 0;
307302
for (const changesForAModel of allChangesForAnEvent) {
308-
let __op = 0;
309-
310303
if (changesForAModel === undefined || changesForAModel.length === 0) {
311304
modelIndex++;
312305
continue;
@@ -315,37 +308,37 @@ export default async function makeMongoDBStore(config: MongoDBStoreConfig): Prom
315308
const changesWithOp = changesForAModel.map((change) => {
316309
return {
317310
change,
318-
__op: __op++,
319311
__v: events[eventIndex].id,
320312
};
321313
});
322314

323-
// add changes to changesPerModel
324-
if (!changesPerModel[modelIndex]) {
325-
changesPerModel[modelIndex] = [];
315+
const operations = getBulkOperations(changesWithOp);
316+
317+
// add operations to the bulkWritesPerModel
318+
if (!bulkWritesPerModel[modelIndex]) {
319+
bulkWritesPerModel[modelIndex] = [];
326320
}
327-
changesPerModel[modelIndex].push(...changesWithOp);
321+
bulkWritesPerModel[modelIndex].push(...operations);
328322
modelIndex++;
329323
}
330324
eventIndex++;
331325
}
332326

333327
await Promise.all(
334-
changesPerModel.map(async (changesForAModel, modelIndex) => {
328+
bulkWritesPerModel.map(async (bulkWritesForAModel, modelIndex) => {
335329
const model = models[modelIndex];
336330
const outputForThisModel = changes[modelIndex];
337-
if (changesForAModel === undefined || changesForAModel.length === 0) {
331+
if (bulkWritesForAModel === undefined || bulkWritesForAModel.length === 0) {
338332
return;
339333
}
340334

341335
const collection = db.collection(getCollectionName(model));
342-
const bulkWriteOperations = getBulkOperations(changesForAModel);
343336

344337
if (abortSignal.aborted) {
345338
return;
346339
}
347340

348-
const res = await collection.bulkWrite(bulkWriteOperations, { ordered: true, writeConcern: { w: "majority" } });
341+
const res = await collection.bulkWrite(bulkWritesForAModel, { ordered: true, writeConcern: { w: "majority" } });
349342

350343
skipByModel[modelIndex] = Math.max(skipByModel[modelIndex], events[events.length - 1].id);
351344

packages/jerni-store-mongodb/src/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ export interface MongoDBStore {
6363
>,
6464
) => void;
6565

66-
getDriver<T extends Document>(model: MongoDBModel<T>): Promise<AsyncDisposable>;
66+
getDriver<T extends Document>(model: MongoDBModel<T>): Promise<Collection<T> & AsyncDisposable>;
6767
handleEvents: (
6868
events: JourneyCommittedEvent[],
6969
signal?: AbortSignal,

packages/jerni/global.d.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
declare namespace NodeJS {
2+
interface ProcessEnv {
3+
readonly NODE_ENV: string;
4+
PORT: string;
5+
EVENTS_SERVER: string;
6+
JERNI_CLI_SQLITE_PATH: string;
7+
}
8+
}

packages/jerni/src/begin.ts

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,14 @@ export default async function* begin(journey: JourneyInstance, signal: AbortSign
174174
try {
175175
// handle events must stop after 10 seconds
176176
const start = Date.now();
177-
const { output, lastId } = await handleEventBatch(config.stores, config.onError, events, logger, timeout);
177+
const { output, lastId } = await handleEventBatch(
178+
config.stores,
179+
config.onError,
180+
config.onReport,
181+
events,
182+
logger,
183+
timeout,
184+
);
178185
latestHandled = lastId;
179186
const total = Date.now() - start;
180187
const budgetPercentage = (total / timeBudget) * 100;
@@ -207,8 +214,9 @@ export default async function* begin(journey: JourneyInstance, signal: AbortSign
207214
logger.info(`${INF} [HANDLING_EVENT] retrying with maxEvents = ${maxEvents}`);
208215
continue mainLoop;
209216
}
210-
console.error(`${ERR} [HANDLING_EVENT] something went wrong while handling events`, ex);
211-
process.exit(1);
217+
218+
logger.error(`${ERR} [HANDLING_EVENT] something went wrong while handling events`, ex);
219+
break mainLoop;
212220
} finally {
213221
clearInterval(progressBarId);
214222
}
@@ -225,8 +233,16 @@ export default async function* begin(journey: JourneyInstance, signal: AbortSign
225233
resolve();
226234
});
227235

236+
// also check if the signal is aborted
237+
signal.addEventListener("abort", resolve, { once: true, signal: ctrl.signal });
238+
228239
newEventNotifier.addEventListener("latest", resolve, { once: true, signal: ctrl.signal });
229240

241+
// check if the signal is aborted before waiting
242+
if (signal.aborted) {
243+
break;
244+
}
245+
230246
await promise;
231247
// logger.debug(
232248
// `${DBG} [HANDLING_EVENT] waiting for new events… after ${prettyMilliseconds(Date.now() - lastProcessingTime)}`,
@@ -237,6 +253,7 @@ export default async function* begin(journey: JourneyInstance, signal: AbortSign
237253
async function handleEventBatch(
238254
stores: JourneyConfig["stores"],
239255
onError: JourneyConfig["onError"],
256+
onReport: JourneyConfig["onReport"],
240257
events: JourneyCommittedEvent[],
241258
logger: Logger,
242259
signal: AbortSignal,
@@ -257,6 +274,12 @@ async function handleEventBatch(
257274
stores.map(async (store) => {
258275
const output = await singleStoreHandleEvents(store, events, 0, events.length, logger, onError, signal);
259276

277+
onReport?.("store_output", {
278+
store: store,
279+
280+
output,
281+
});
282+
260283
return output;
261284
}),
262285
),

packages/jerni/src/createJourney.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ export default function createJourney(config: JourneyConfig): JourneyInstance {
112112
}
113113
},
114114
// biome-ignore lint/suspicious/noExplicitAny: because this is a placeholder, the client that uses jerni would override this type
115-
async getReader(model: any): Promise<AsyncDisposable> {
115+
async getReader(model: any): Promise<any> {
116116
registerOnce();
117117
const store = modelToStoreMap.get(model);
118118

0 commit comments

Comments
 (0)