Skip to content

Commit 9dc9421

Browse files
authored
refactor(webui): Migrate extract stream route and plugin to new Fastify architecture. (#1065)
1 parent 2a92184 commit 9dc9421

File tree

20 files changed

+471
-590
lines changed

20 files changed

+471
-590
lines changed

components/webui/client/src/api/query/index.ts renamed to components/webui/client/src/api/stream-files/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ const submitExtractStreamJob = async ({
3838
onUploadProgress,
3939
}: SubmitExtractStreamJobProps): Promise<AxiosResponse<ExtractStreamResp>> => {
4040
return await axios.post(
41-
"/query/extract-stream",
41+
"/api/stream-files/extract",
4242
{
4343
dataset,
4444
extractJobType,

components/webui/client/src/ui/QueryStatus.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
} from "@sinclair/typebox/value";
1111
import {isAxiosError} from "axios";
1212

13-
import {submitExtractStreamJob} from "../api/query";
13+
import {submitExtractStreamJob} from "../api/stream-files";
1414
import {Nullable} from "../typings/common";
1515
import {
1616
EXTRACT_JOB_TYPE,

components/webui/client/vite.config.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,8 @@ export default defineConfig({
2222
server: {
2323
port: 8080,
2424
proxy: {
25-
"/query/": {
26-
// Below target should match the server's configuration in
27-
// `components/webui/server/.env` (or `.env.local` if overridden)
28-
target: "http://localhost:3000/",
29-
changeOrigin: true,
30-
},
25+
// Below targets should match the server's configuration in
26+
// `components/webui/server/.env` (or `.env.local` if overridden)
3127
"/api/": {
3228
target: "http://localhost:3000/",
3329
changeOrigin: true,

components/webui/server/src/app.ts

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,49 +4,23 @@ import {
44
} from "fastify";
55

66
import settings from "../settings.json" with {type: "json"};
7-
import DbManager from "./plugins/DbManager.js";
87
import S3Manager from "./plugins/S3Manager.js";
9-
import queryRoutes from "./routes/query.js";
108
import staticRoutes from "./routes/static.js";
119

1210

13-
interface AppPluginOptions {
14-
sqlDbUser: string;
15-
sqlDbPass: string;
16-
}
17-
1811
/**
1912
* Creates the Fastify app with the given options.
2013
*
2114
* TODO: Once old webui code is refactored to new modlular fastify style, this plugin should be
2215
* removed.
2316
*
2417
* @param fastify
25-
* @param opts
2618
* @return
2719
*/
28-
const FastifyV1App: FastifyPluginAsync<AppPluginOptions> = async (
29-
fastify: FastifyInstance,
30-
opts: AppPluginOptions
20+
const FastifyV1App: FastifyPluginAsync = async (
21+
fastify: FastifyInstance
3122
) => {
32-
const {sqlDbUser, sqlDbPass} = opts;
3323
if ("test" !== process.env.NODE_ENV) {
34-
await fastify.register(DbManager, {
35-
mysqlConfig: {
36-
database: settings.SqlDbName,
37-
host: settings.SqlDbHost,
38-
password: sqlDbPass,
39-
port: settings.SqlDbPort,
40-
queryJobsTableName: settings.SqlDbQueryJobsTableName,
41-
user: sqlDbUser,
42-
},
43-
mongoConfig: {
44-
database: settings.MongoDbName,
45-
host: settings.MongoDbHost,
46-
streamFilesCollectionName: settings.MongoDbStreamFilesCollectionName,
47-
port: settings.MongoDbPort,
48-
},
49-
});
5024
await fastify.register(
5125
S3Manager,
5226
{
@@ -58,7 +32,6 @@ const FastifyV1App: FastifyPluginAsync<AppPluginOptions> = async (
5832

5933
// Register the routes
6034
await fastify.register(staticRoutes);
61-
await fastify.register(queryRoutes);
6235
};
6336

6437
export default FastifyV1App;

components/webui/server/src/fastify-v2/app.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,7 @@ export default async function serviceApp (
3939
// eslint-disable-next-line no-warning-comments
4040
// TODO: Refactor old webui code to use more modular fastify style. Temporarily, the old webui
4141
// code is loaded as a separate plugin.
42-
await fastify.register(FastifyV1App, {
43-
sqlDbUser: fastify.config.CLP_DB_USER,
44-
sqlDbPass: fastify.config.CLP_DB_PASS,
45-
});
42+
await fastify.register(FastifyV1App);
4643

4744
// Loads all application plugins.
4845
fastify.register(fastifyAutoload, {
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import {setTimeout} from "node:timers/promises";
2+
3+
import type {MySQLPromisePool} from "@fastify/mysql";
4+
import {encode} from "@msgpack/msgpack";
5+
import {FastifyInstance} from "fastify";
6+
import fp from "fastify-plugin";
7+
import {ResultSetHeader} from "mysql2";
8+
9+
import settings from "../../../../../settings.json" with {type: "json"};
10+
import {
11+
QUERY_JOB_STATUS,
12+
QUERY_JOB_STATUS_WAITING_STATES,
13+
QUERY_JOB_TYPE,
14+
QUERY_JOBS_TABLE_COLUMN_NAMES,
15+
QueryJob,
16+
} from "../../../../typings/query.js";
17+
import {JOB_COMPLETION_STATUS_POLL_INTERVAL_MILLIS} from "./typings.js";
18+
19+
20+
/**
21+
* Class for managing jobs in the CLP package query scheduler database.
22+
*/
23+
class QueryJobDbManager {
24+
readonly #sqlPool: MySQLPromisePool;
25+
26+
readonly #tableName: string;
27+
28+
private constructor (sqlPool: MySQLPromisePool, tableName: string) {
29+
this.#sqlPool = sqlPool;
30+
this.#tableName = tableName;
31+
}
32+
33+
/**
34+
* Creates a new QueryDbJobManager.
35+
*
36+
* @param fastify
37+
* @return
38+
*/
39+
static create (fastify: FastifyInstance): QueryJobDbManager {
40+
return new QueryJobDbManager(fastify.mysql, settings.SqlDbQueryJobsTableName);
41+
}
42+
43+
/**
44+
* Submits a job to the database.
45+
*
46+
* @param jobConfig
47+
* @param jobType
48+
* @return The job's ID.
49+
* @throws {Error} on error.
50+
*/
51+
async submitJob (jobConfig: object, jobType: QUERY_JOB_TYPE): Promise<number> {
52+
const [result] = await this.#sqlPool.query<ResultSetHeader>(
53+
`
54+
INSERT INTO ${settings.SqlDbQueryJobsTableName} (
55+
${QUERY_JOBS_TABLE_COLUMN_NAMES.JOB_CONFIG},
56+
${QUERY_JOBS_TABLE_COLUMN_NAMES.TYPE}
57+
)
58+
VALUES (?, ?)
59+
`,
60+
[
61+
Buffer.from(encode(jobConfig)),
62+
jobType,
63+
]
64+
);
65+
66+
return result.insertId;
67+
}
68+
69+
/**
70+
* Submits a job cancellation request to the database.
71+
*
72+
* @param jobId ID of the job to cancel.
73+
* @throws {Error} on error.
74+
*/
75+
async cancelJob (jobId: number): Promise<void> {
76+
await this.#sqlPool.query(
77+
`UPDATE ${this.#tableName}
78+
SET ${QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS} = ${QUERY_JOB_STATUS.CANCELLING}
79+
WHERE ${QUERY_JOBS_TABLE_COLUMN_NAMES.ID} = ?
80+
AND ${QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS}
81+
IN (${QUERY_JOB_STATUS.PENDING}, ${QUERY_JOB_STATUS.RUNNING})`,
82+
jobId
83+
);
84+
}
85+
86+
/**
87+
* Waits for the job with the given ID to finish.
88+
*
89+
* @param jobId
90+
* @throws {Error} on MySQL error, if the job wasn't found in the database, if the job was
91+
* cancelled, or if the job completed in an unexpected state.
92+
*/
93+
async awaitJobCompletion (jobId: number): Promise<void> {
94+
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
95+
while (true) {
96+
let queryJob: QueryJob | undefined;
97+
try {
98+
const [queryRows] = await this.#sqlPool.query<QueryJob[]>(
99+
`SELECT ${QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS}
100+
FROM ${this.#tableName}
101+
WHERE ${QUERY_JOBS_TABLE_COLUMN_NAMES.ID} = ?`,
102+
jobId
103+
);
104+
105+
[queryJob] = queryRows;
106+
} catch (e: unknown) {
107+
throw new Error(`Failed to query status for job ${jobId}`, {cause: e});
108+
}
109+
110+
if ("undefined" === typeof queryJob) {
111+
throw new Error(`Job ${jobId} not found in database.`);
112+
}
113+
114+
const status = queryJob[QUERY_JOBS_TABLE_COLUMN_NAMES.STATUS];
115+
116+
if (false === QUERY_JOB_STATUS_WAITING_STATES.has(status)) {
117+
if (QUERY_JOB_STATUS.CANCELLED === status) {
118+
throw new Error(`Job ${jobId} was cancelled.`);
119+
} else if (QUERY_JOB_STATUS.SUCCEEDED !== status) {
120+
throw new Error(
121+
`Job ${jobId} exited with unexpected status=${status}: ` +
122+
`${Object.keys(QUERY_JOB_STATUS)[status]}.`
123+
);
124+
}
125+
break;
126+
}
127+
128+
await setTimeout(JOB_COMPLETION_STATUS_POLL_INTERVAL_MILLIS);
129+
}
130+
}
131+
132+
/**
133+
* Submits a job and waits for it to complete.
134+
*
135+
* @param jobConfig
136+
* @param jobType
137+
* @return The job's ID.
138+
* @throws {Error} on error.
139+
*/
140+
async submitAndWaitForJob (jobConfig: object, jobType: QUERY_JOB_TYPE): Promise<number> {
141+
const jobId = await this.submitJob(jobConfig, jobType);
142+
await this.awaitJobCompletion(jobId);
143+
144+
return jobId;
145+
}
146+
}
147+
148+
declare module "fastify" {
149+
interface FastifyInstance {
150+
QueryJobDbManager: QueryJobDbManager;
151+
}
152+
}
153+
154+
export default fp(
155+
(fastify) => {
156+
fastify.decorate("QueryJobDbManager", QueryJobDbManager.create(fastify));
157+
},
158+
{
159+
name: "QueryJobDbManager",
160+
}
161+
);
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
/**
2+
* Interval in milliseconds for polling the completion status of a job.
3+
*/
4+
const JOB_COMPLETION_STATUS_POLL_INTERVAL_MILLIS = 500;
5+
6+
export {JOB_COMPLETION_STATUS_POLL_INTERVAL_MILLIS};

0 commit comments

Comments
 (0)