Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8fc7c15
Initial pass - lets cache the equipment quiz result events themselves
Lan2u Jan 23, 2025
0b21e34
Table schema for cached events
Lan2u Jan 23, 2025
a903570
Getting cached data from database
Lan2u Jan 28, 2025
bc99309
Linked things up, decided to simplify cached sheet data return to jus…
Lan2u Jan 28, 2025
7bed403
Integration test for caching + restore
Lan2u Feb 3, 2025
7b537d2
Sort out how we will expose issues reading the cache - important in f…
Lan2u Feb 5, 2025
d98de9b
Time the loading of cached events
Lan2u Feb 5, 2025
0c37a56
Fix missing index on sheet id so we can handle on conflict overwrite
Lan2u Feb 5, 2025
7284de2
Fixes with implementation from tests
Lan2u Feb 5, 2025
9a52842
more tests for caching sheet data
Lan2u Feb 9, 2025
324eda7
More testing for caching events
Lan2u Feb 9, 2025
cbcb2c3
Start loading cached sheet data on a per-piece of known equipment bas…
Lan2u Feb 9, 2025
747b954
Refactor code so that the dependency on equipment being loaded before…
Lan2u Feb 9, 2025
260655d
Make order of operations requirement (equipment then get cached event…
Lan2u Feb 9, 2025
9cc9597
Make it easy to see when the last quiz sync's were (and so the next w…
Lan2u Feb 9, 2025
9b9616b
Integration type test for loading cached sheet data
Lan2u Feb 9, 2025
0731e48
Fix training test not properly inserting events for test
Lan2u Feb 9, 2025
69ae210
All tests pass
Lan2u Feb 9, 2025
1c67d15
Cleanup test noise
Lan2u Feb 9, 2025
64efa32
Revert "Cleanup test noise"
Lan2u Feb 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/dependencies.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import {Logger} from 'pino';
import {Failure, Email, DomainEvent, ResourceVersion} from './types';
import * as TE from 'fp-ts/TaskEither';
import * as t from 'io-ts';
import * as O from 'fp-ts/Option';
import {FailureWithStatus} from './types/failure-with-status';
import {StatusCodes} from 'http-status-codes';

import {Resource} from './types/resource';
import {EventName, EventOfType} from './types/domain-event';
import {SharedReadModel} from './read-models/shared-state';

type TrainingSheetId = string;

export type Dependencies = {
commitEvent: (
resource: Resource,
Expand Down Expand Up @@ -36,4 +40,24 @@ export type Dependencies = {
logger: Logger;
rateLimitSendingOfEmails: (email: Email) => TE.TaskEither<Failure, Email>;
sendEmail: (email: Email) => TE.TaskEither<Failure, string>;
getCachedSheetData: (sheetId: string) => TE.TaskEither<
FailureWithStatus,
O.Option<{
cached_at: Date;
cached_data: t.Validation<
ReadonlyArray<
| EventOfType<'EquipmentTrainingQuizResult'>
| EventOfType<'EquipmentTrainingQuizSync'>
>
>;
}>
>;
cacheSheetData: (
cacheTimestamp: Date,
sheetId: TrainingSheetId,
data: ReadonlyArray<
| EventOfType<'EquipmentTrainingQuizSync'>
| EventOfType<'EquipmentTrainingQuizResult'>
>
) => TE.TaskEither<Failure, void>;
};
36 changes: 35 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ import {createTerminus} from '@godaddy/terminus';
import http from 'http';
import {pipe} from 'fp-ts/lib/function';
import * as TE from 'fp-ts/TaskEither';
import * as O from 'fp-ts/Option';
import {ensureEventTableExists} from './init-dependencies/event-store/ensure-events-table-exists';
import {initDependencies} from './init-dependencies';
import * as libsqlClient from '@libsql/client';
import cookieSession from 'cookie-session';
import {initRoutes} from './routes';
import {ensureCachedSheetDataTableExists} from './init-dependencies/google/ensure-cached-sheet-data-table-exists';
import {loadCachedSheetData} from './load-cached-sheet-data';
import {timeAsync} from './util';

// Dependencies and Config
const conf = loadConfig();
Expand Down Expand Up @@ -88,10 +92,40 @@ server.on('close', () => {
void (async () => {
await pipe(
ensureEventTableExists(dbClient),
TE.map(ensureCachedSheetDataTableExists(dbClient)),
TE.mapLeft(e => deps.logger.error(e, 'Failed to start server'))
)();

await deps.sharedReadModel.asyncRefresh()();
deps.logger.info('Populating shared read model...');
await deps.sharedReadModel.asyncRefresh()(); // We refresh before we load cached sheet data so we know what sheets to load cached data from.
deps.logger.info('Loading cached external events...');
await timeAsync(elapsedNs =>
deps.logger.info(
'Loaded cached external events in %sms',
elapsedNs / (1000 * 1000)
)
)(
Promise.all(
deps.sharedReadModel.equipment
.getAll()
.map(
loadCachedSheetData(
deps.getCachedSheetData,
deps.logger,
deps.sharedReadModel.updateState
)
)
)
);
for (const equipment of deps.sharedReadModel.equipment.getAll()) {
deps.logger.info(
'After loading cached external events the last quiz sync for equipment %s (%s) was %s (epoch ms)',
equipment.name,
equipment.id,
O.getOrElse<string | number>(() => 'never')(equipment.lastQuizSync)
);
}

await deps.sharedReadModel.asyncApplyExternalEventSources()();

server.listen(conf.PORT, () => {
Expand Down
13 changes: 13 additions & 0 deletions src/init-dependencies/google/cached-data-table.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import * as t from 'io-ts';
import * as tt from 'io-ts-types';

export const CachedDataTable = t.strict({
rows: t.readonlyArray(
t.strict({
cached_at: tt.DateFromNumber,
sheet_id: t.string,
cached_data: t.string,
})
),
});
export type CachedDataTable = t.TypeOf<typeof CachedDataTable>;
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import {Client} from '@libsql/client/.';
import * as TE from 'fp-ts/TaskEither';
import {failure} from '../../types';

export const ensureCachedSheetDataTableExists = (dbClient: Client) =>
TE.tryCatch(
() =>
dbClient.execute(`
CREATE TABLE IF NOT EXISTS cached_sheet_data (
sheet_id TEXT PRIMARY KEY,
cached_at timestamp,
cached_data TEXT
);
`),
failure('Cached sheet data table does not exist and could not be created')
);
109 changes: 109 additions & 0 deletions src/init-dependencies/google/get-cached-sheet-data.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// This could be generalised as another event store that is intentionally ephemeral but can be used to store other things.
// Lets see how well it works for sheet data and if it has value as an approach for other stuff.

import {Client} from '@libsql/client/.';
import {Dependencies} from '../../dependencies';
import {flow, pipe} from 'fp-ts/lib/function';
import * as TE from 'fp-ts/TaskEither';
import * as E from 'fp-ts/Either';
import * as tt from 'io-ts-types';
import * as RA from 'fp-ts/ReadonlyArray';
import * as t from 'io-ts';
import {failure} from '../../types';
import {DomainEvent, EventOfType} from '../../types/domain-event';
import {CachedDataTable} from './cached-data-table';
import {
failureWithStatus,
internalCodecFailure,
} from '../../types/failure-with-status';
import {StatusCodes} from 'http-status-codes';

const extractCachedEvents = (
rawCachedData: string
): t.Validation<
ReadonlyArray<
| EventOfType<'EquipmentTrainingQuizResult'>
| EventOfType<'EquipmentTrainingQuizSync'>
>
> =>
pipe(
rawCachedData,
tt.JsonFromString.decode,
E.chain(tt.JsonArray.decode),
E.chain(t.readonlyArray(DomainEvent).decode),
E.map(
elements =>
elements as ReadonlyArray<
| EventOfType<'EquipmentTrainingQuizResult'>
| EventOfType<'EquipmentTrainingQuizSync'>
>
)
);

export const getCachedSheetData =
(dbClient: Client): Dependencies['getCachedSheetData'] =>
(sheetId: string) =>
pipe(
TE.tryCatch(
() =>
dbClient.execute({
// Currently we can do LIMIT 1 because we only expect each sheet to have a single entry within the cache sheet data
sql: 'SELECT * FROM cached_sheet_data WHERE sheet_id = $sheetId LIMIT 1',
args: {
sheetId,
},
}),
failureWithStatus(
'Failed to get cached sheet data',
StatusCodes.INTERNAL_SERVER_ERROR
)
),
TE.chainEitherK(
flow(
CachedDataTable.decode,
E.mapLeft(internalCodecFailure('Failed to decode cached sheet data'))
)
),
TE.map(table =>
pipe(
table.rows,
RA.map(row => ({
...row,
cached_data: extractCachedEvents(row.cached_data),
})),
RA.head
)
)
);

// This would be more efficient with a simple key-value store.
export const cacheSheetData =
(dbClient: Client): Dependencies['cacheSheetData'] =>
(
cacheTimestamp: Date,
sheetId: string,
data: ReadonlyArray<
| EventOfType<'EquipmentTrainingQuizResult'>
| EventOfType<'EquipmentTrainingQuizSync'>
>
) =>
TE.tryCatch(
() =>
dbClient
.execute({
sql: `
INSERT INTO cached_sheet_data (cached_at, sheet_id, cached_data)
VALUES ($cachedAt, $sheetId, $cachedData)
ON CONFLICT (sheet_id) DO UPDATE SET
cached_at = excluded.cached_at,
cached_data = excluded.cached_data;
`,
args: {
cachedAt: cacheTimestamp,
sheetId,
cachedData: JSON.stringify(data),
},
})
.then(() => {}),
failure('Failed to insert cached sheet data')
);
11 changes: 10 additions & 1 deletion src/init-dependencies/init-dependencies.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import {
} from './google/pull_sheet_data';
import {initSharedReadModel} from '../read-models/shared-state';
import {GoogleAuth} from 'google-auth-library';
import {
cacheSheetData,
getCachedSheetData,
} from './google/get-cached-sheet-data';

export const initDependencies = (
dbClient: Client,
Expand Down Expand Up @@ -77,11 +81,14 @@ export const initDependencies = (
});
}

const _cacheSheetData = cacheSheetData(dbClient);

const sharedReadModel = initSharedReadModel(
dbClient,
logger,
googleHelpers,
conf.GOOGLE_RATELIMIT_MS
conf.GOOGLE_RATELIMIT_MS,
_cacheSheetData
);

const deps: Dependencies = {
Expand All @@ -93,6 +100,8 @@ export const initDependencies = (
rateLimitSendingOfEmails: createRateLimiter(5, 24 * 3600),
sendEmail: sendEmail(emailTransporter, conf.SMTP_FROM),
logger,
getCachedSheetData: getCachedSheetData(dbClient),
cacheSheetData: _cacheSheetData,
};
return deps;
};
75 changes: 75 additions & 0 deletions src/load-cached-sheet-data.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import {Logger} from 'pino';
import * as E from 'fp-ts/Either';
import * as O from 'fp-ts/Option';
import {Dependencies} from './dependencies';
import {Equipment} from './read-models/shared-state/return-types';

export const loadCachedSheetData =
(
getCachedSheetData: Dependencies['getCachedSheetData'],
logger: Logger,
updateState: Dependencies['sharedReadModel']['updateState']
) =>
async (equipment: Equipment) => {
// We only load cached training events for equipment we know about.
logger = logger.child({section: 'loadCachedSheetData'});
const equipmentLogger = logger.child({
equipment_name: equipment.name,
equipment_id: equipment.id,
equipment_training_sheet_id: O.getOrElse<string | null>(() => null)(
equipment.trainingSheetId
),
});
if (O.isNone(equipment.trainingSheetId)) {
// If a piece of equipment removes a training sheet we won't load cached events that came from that
// training sheet even if the events were for the correct piece of equipment. This allows fixing mistakes
// where the wrong training sheet is was used previously.
equipmentLogger.info(
'Equipment has no training sheet id - not loading any cached data'
);
return;
}
equipmentLogger.info('Loading cached sheet data for sheet');
const cachedSheetData = await getCachedSheetData(
equipment.trainingSheetId.value
)();
if (E.isLeft(cachedSheetData)) {
// Potential pitfall here - transient db errors could produce large spikes in processing.
// Tradeoff is that an error/bug in cached sheet data doesn't bring down the application.
equipmentLogger.error(
cachedSheetData.left,
'Failed to load cached sheet data for sheet - skipping...'
);
} else {
if (O.isNone(cachedSheetData.right)) {
equipmentLogger.info('No cached events found');
return;
}
const loadedData = cachedSheetData.right.value;
const sheetDataLogger = equipmentLogger.child({
sheet_block_cached_at: loadedData.cached_at.toISOString(),
});
if (E.isLeft(loadedData.cached_data)) {
sheetDataLogger.info(
'Failed to parse cached sheet data block cached, skipping...'
);
} else {
sheetDataLogger.info(
'Loaded %s events from cached sheet data block, loading into shared read model...',
loadedData.cached_data.right.length
);
for (const cachedEvent of loadedData.cached_data.right) {
// This filtering makes loading cache data more predictable by only loading equipment events for the piece of equipment that is being loaded
// even if the sheet has previously generated events for other pieces of equipment.
if (cachedEvent.equipmentId !== equipment.id) {
sheetDataLogger.warn(
'Skipping event within cached sheet data block due to equipment id mismatch (cached %s)',
cachedEvent.equipmentId
);
} else {
updateState(cachedEvent);
}
}
}
}
};
Loading