Skip to content

Commit f9096e2

Browse files
authored
Merge pull request #93 from Makespace/cache_sheet_data
Cache the equipment quiz result events
2 parents d0b4a88 + 64efa32 commit f9096e2

File tree

14 files changed

+730
-14
lines changed

14 files changed

+730
-14
lines changed

src/dependencies.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
import {Logger} from 'pino';
22
import {Failure, Email, DomainEvent, ResourceVersion} from './types';
33
import * as TE from 'fp-ts/TaskEither';
4+
import * as t from 'io-ts';
5+
import * as O from 'fp-ts/Option';
46
import {FailureWithStatus} from './types/failure-with-status';
57
import {StatusCodes} from 'http-status-codes';
68

79
import {Resource} from './types/resource';
810
import {EventName, EventOfType} from './types/domain-event';
911
import {SharedReadModel} from './read-models/shared-state';
1012

13+
type TrainingSheetId = string;
14+
1115
export type Dependencies = {
1216
commitEvent: (
1317
resource: Resource,
@@ -36,4 +40,24 @@ export type Dependencies = {
3640
logger: Logger;
3741
rateLimitSendingOfEmails: (email: Email) => TE.TaskEither<Failure, Email>;
3842
sendEmail: (email: Email) => TE.TaskEither<Failure, string>;
43+
getCachedSheetData: (sheetId: string) => TE.TaskEither<
44+
FailureWithStatus,
45+
O.Option<{
46+
cached_at: Date;
47+
cached_data: t.Validation<
48+
ReadonlyArray<
49+
| EventOfType<'EquipmentTrainingQuizResult'>
50+
| EventOfType<'EquipmentTrainingQuizSync'>
51+
>
52+
>;
53+
}>
54+
>;
55+
cacheSheetData: (
56+
cacheTimestamp: Date,
57+
sheetId: TrainingSheetId,
58+
data: ReadonlyArray<
59+
| EventOfType<'EquipmentTrainingQuizSync'>
60+
| EventOfType<'EquipmentTrainingQuizResult'>
61+
>
62+
) => TE.TaskEither<Failure, void>;
3963
};

src/index.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,15 @@ import {createTerminus} from '@godaddy/terminus';
1313
import http from 'http';
1414
import {pipe} from 'fp-ts/lib/function';
1515
import * as TE from 'fp-ts/TaskEither';
16+
import * as O from 'fp-ts/Option';
1617
import {ensureEventTableExists} from './init-dependencies/event-store/ensure-events-table-exists';
1718
import {initDependencies} from './init-dependencies';
1819
import * as libsqlClient from '@libsql/client';
1920
import cookieSession from 'cookie-session';
2021
import {initRoutes} from './routes';
22+
import {ensureCachedSheetDataTableExists} from './init-dependencies/google/ensure-cached-sheet-data-table-exists';
23+
import {loadCachedSheetData} from './load-cached-sheet-data';
24+
import {timeAsync} from './util';
2125

2226
// Dependencies and Config
2327
const conf = loadConfig();
@@ -88,10 +92,40 @@ server.on('close', () => {
8892
void (async () => {
8993
await pipe(
9094
ensureEventTableExists(dbClient),
95+
TE.map(ensureCachedSheetDataTableExists(dbClient)),
9196
TE.mapLeft(e => deps.logger.error(e, 'Failed to start server'))
9297
)();
9398

94-
await deps.sharedReadModel.asyncRefresh()();
99+
deps.logger.info('Populating shared read model...');
100+
await deps.sharedReadModel.asyncRefresh()(); // We refresh before we load cached sheet data so we know what sheets to load cached data from.
101+
deps.logger.info('Loading cached external events...');
102+
await timeAsync(elapsedNs =>
103+
deps.logger.info(
104+
'Loaded cached external events in %sms',
105+
elapsedNs / (1000 * 1000)
106+
)
107+
)(
108+
Promise.all(
109+
deps.sharedReadModel.equipment
110+
.getAll()
111+
.map(
112+
loadCachedSheetData(
113+
deps.getCachedSheetData,
114+
deps.logger,
115+
deps.sharedReadModel.updateState
116+
)
117+
)
118+
)
119+
);
120+
for (const equipment of deps.sharedReadModel.equipment.getAll()) {
121+
deps.logger.info(
122+
'After loading cached external events the last quiz sync for equipment %s (%s) was %s (epoch ms)',
123+
equipment.name,
124+
equipment.id,
125+
O.getOrElse<string | number>(() => 'never')(equipment.lastQuizSync)
126+
);
127+
}
128+
95129
await deps.sharedReadModel.asyncApplyExternalEventSources()();
96130

97131
server.listen(conf.PORT, () => {
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import * as t from 'io-ts';
2+
import * as tt from 'io-ts-types';
3+
4+
export const CachedDataTable = t.strict({
5+
rows: t.readonlyArray(
6+
t.strict({
7+
cached_at: tt.DateFromNumber,
8+
sheet_id: t.string,
9+
cached_data: t.string,
10+
})
11+
),
12+
});
13+
export type CachedDataTable = t.TypeOf<typeof CachedDataTable>;
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import {Client} from '@libsql/client/.';
2+
import * as TE from 'fp-ts/TaskEither';
3+
import {failure} from '../../types';
4+
5+
export const ensureCachedSheetDataTableExists = (dbClient: Client) =>
6+
TE.tryCatch(
7+
() =>
8+
dbClient.execute(`
9+
CREATE TABLE IF NOT EXISTS cached_sheet_data (
10+
sheet_id TEXT PRIMARY KEY,
11+
cached_at timestamp,
12+
cached_data TEXT
13+
);
14+
`),
15+
failure('Cached sheet data table does not exist and could not be created')
16+
);
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// This could be generalised as another event store that is intentionally ephemeral but can be used to store other things.
2+
// Lets see how well it works for sheet data and if it has value as an approach for other stuff.
3+
4+
import {Client} from '@libsql/client/.';
5+
import {Dependencies} from '../../dependencies';
6+
import {flow, pipe} from 'fp-ts/lib/function';
7+
import * as TE from 'fp-ts/TaskEither';
8+
import * as E from 'fp-ts/Either';
9+
import * as tt from 'io-ts-types';
10+
import * as RA from 'fp-ts/ReadonlyArray';
11+
import * as t from 'io-ts';
12+
import {failure} from '../../types';
13+
import {DomainEvent, EventOfType} from '../../types/domain-event';
14+
import {CachedDataTable} from './cached-data-table';
15+
import {
16+
failureWithStatus,
17+
internalCodecFailure,
18+
} from '../../types/failure-with-status';
19+
import {StatusCodes} from 'http-status-codes';
20+
21+
const extractCachedEvents = (
22+
rawCachedData: string
23+
): t.Validation<
24+
ReadonlyArray<
25+
| EventOfType<'EquipmentTrainingQuizResult'>
26+
| EventOfType<'EquipmentTrainingQuizSync'>
27+
>
28+
> =>
29+
pipe(
30+
rawCachedData,
31+
tt.JsonFromString.decode,
32+
E.chain(tt.JsonArray.decode),
33+
E.chain(t.readonlyArray(DomainEvent).decode),
34+
E.map(
35+
elements =>
36+
elements as ReadonlyArray<
37+
| EventOfType<'EquipmentTrainingQuizResult'>
38+
| EventOfType<'EquipmentTrainingQuizSync'>
39+
>
40+
)
41+
);
42+
43+
export const getCachedSheetData =
44+
(dbClient: Client): Dependencies['getCachedSheetData'] =>
45+
(sheetId: string) =>
46+
pipe(
47+
TE.tryCatch(
48+
() =>
49+
dbClient.execute({
50+
// Currently we can do LIMIT 1 because we only expect each sheet to have a single entry within the cache sheet data
51+
sql: 'SELECT * FROM cached_sheet_data WHERE sheet_id = $sheetId LIMIT 1',
52+
args: {
53+
sheetId,
54+
},
55+
}),
56+
failureWithStatus(
57+
'Failed to get cached sheet data',
58+
StatusCodes.INTERNAL_SERVER_ERROR
59+
)
60+
),
61+
TE.chainEitherK(
62+
flow(
63+
CachedDataTable.decode,
64+
E.mapLeft(internalCodecFailure('Failed to decode cached sheet data'))
65+
)
66+
),
67+
TE.map(table =>
68+
pipe(
69+
table.rows,
70+
RA.map(row => ({
71+
...row,
72+
cached_data: extractCachedEvents(row.cached_data),
73+
})),
74+
RA.head
75+
)
76+
)
77+
);
78+
79+
// This would be more efficient with a simple key-value store.
80+
export const cacheSheetData =
81+
(dbClient: Client): Dependencies['cacheSheetData'] =>
82+
(
83+
cacheTimestamp: Date,
84+
sheetId: string,
85+
data: ReadonlyArray<
86+
| EventOfType<'EquipmentTrainingQuizResult'>
87+
| EventOfType<'EquipmentTrainingQuizSync'>
88+
>
89+
) =>
90+
TE.tryCatch(
91+
() =>
92+
dbClient
93+
.execute({
94+
sql: `
95+
INSERT INTO cached_sheet_data (cached_at, sheet_id, cached_data)
96+
VALUES ($cachedAt, $sheetId, $cachedData)
97+
ON CONFLICT (sheet_id) DO UPDATE SET
98+
cached_at = excluded.cached_at,
99+
cached_data = excluded.cached_data;
100+
`,
101+
args: {
102+
cachedAt: cacheTimestamp,
103+
sheetId,
104+
cachedData: JSON.stringify(data),
105+
},
106+
})
107+
.then(() => {}),
108+
failure('Failed to insert cached sheet data')
109+
);

src/init-dependencies/init-dependencies.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ import {
1717
} from './google/pull_sheet_data';
1818
import {initSharedReadModel} from '../read-models/shared-state';
1919
import {GoogleAuth} from 'google-auth-library';
20+
import {
21+
cacheSheetData,
22+
getCachedSheetData,
23+
} from './google/get-cached-sheet-data';
2024

2125
export const initDependencies = (
2226
dbClient: Client,
@@ -77,11 +81,14 @@ export const initDependencies = (
7781
});
7882
}
7983

84+
const _cacheSheetData = cacheSheetData(dbClient);
85+
8086
const sharedReadModel = initSharedReadModel(
8187
dbClient,
8288
logger,
8389
googleHelpers,
84-
conf.GOOGLE_RATELIMIT_MS
90+
conf.GOOGLE_RATELIMIT_MS,
91+
_cacheSheetData
8592
);
8693

8794
const deps: Dependencies = {
@@ -93,6 +100,8 @@ export const initDependencies = (
93100
rateLimitSendingOfEmails: createRateLimiter(5, 24 * 3600),
94101
sendEmail: sendEmail(emailTransporter, conf.SMTP_FROM),
95102
logger,
103+
getCachedSheetData: getCachedSheetData(dbClient),
104+
cacheSheetData: _cacheSheetData,
96105
};
97106
return deps;
98107
};

src/load-cached-sheet-data.ts

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import {Logger} from 'pino';
2+
import * as E from 'fp-ts/Either';
3+
import * as O from 'fp-ts/Option';
4+
import {Dependencies} from './dependencies';
5+
import {Equipment} from './read-models/shared-state/return-types';
6+
7+
export const loadCachedSheetData =
8+
(
9+
getCachedSheetData: Dependencies['getCachedSheetData'],
10+
logger: Logger,
11+
updateState: Dependencies['sharedReadModel']['updateState']
12+
) =>
13+
async (equipment: Equipment) => {
14+
// We only load cached training events for equipment we know about.
15+
logger = logger.child({section: 'loadCachedSheetData'});
16+
const equipmentLogger = logger.child({
17+
equipment_name: equipment.name,
18+
equipment_id: equipment.id,
19+
equipment_training_sheet_id: O.getOrElse<string | null>(() => null)(
20+
equipment.trainingSheetId
21+
),
22+
});
23+
if (O.isNone(equipment.trainingSheetId)) {
24+
// If a piece of equipment removes a training sheet we won't load cached events that came from that
25+
// training sheet even if the events were for the correct piece of equipment. This allows fixing mistakes
26+
// where the wrong training sheet is was used previously.
27+
equipmentLogger.info(
28+
'Equipment has no training sheet id - not loading any cached data'
29+
);
30+
return;
31+
}
32+
equipmentLogger.info('Loading cached sheet data for sheet');
33+
const cachedSheetData = await getCachedSheetData(
34+
equipment.trainingSheetId.value
35+
)();
36+
if (E.isLeft(cachedSheetData)) {
37+
// Potential pitfall here - transient db errors could produce large spikes in processing.
38+
// Tradeoff is that an error/bug in cached sheet data doesn't bring down the application.
39+
equipmentLogger.error(
40+
cachedSheetData.left,
41+
'Failed to load cached sheet data for sheet - skipping...'
42+
);
43+
} else {
44+
if (O.isNone(cachedSheetData.right)) {
45+
equipmentLogger.info('No cached events found');
46+
return;
47+
}
48+
const loadedData = cachedSheetData.right.value;
49+
const sheetDataLogger = equipmentLogger.child({
50+
sheet_block_cached_at: loadedData.cached_at.toISOString(),
51+
});
52+
if (E.isLeft(loadedData.cached_data)) {
53+
sheetDataLogger.info(
54+
'Failed to parse cached sheet data block cached, skipping...'
55+
);
56+
} else {
57+
sheetDataLogger.info(
58+
'Loaded %s events from cached sheet data block, loading into shared read model...',
59+
loadedData.cached_data.right.length
60+
);
61+
for (const cachedEvent of loadedData.cached_data.right) {
62+
// This filtering makes loading cache data more predictable by only loading equipment events for the piece of equipment that is being loaded
63+
// even if the sheet has previously generated events for other pieces of equipment.
64+
if (cachedEvent.equipmentId !== equipment.id) {
65+
sheetDataLogger.warn(
66+
'Skipping event within cached sheet data block due to equipment id mismatch (cached %s)',
67+
cachedEvent.equipmentId
68+
);
69+
} else {
70+
updateState(cachedEvent);
71+
}
72+
}
73+
}
74+
}
75+
};

0 commit comments

Comments
 (0)