Skip to content

Commit d058631

Browse files
authored
refactor(webui): Integrate existing Mongo SocketIO plugin into new Fastify architecture; Pass logger into plugin instead of entire Fastify Instance (resolves #885). (#1027)
1 parent 08c6e8f commit d058631

File tree

5 files changed

+77
-99
lines changed

5 files changed

+77
-99
lines changed

components/webui/server/src/app.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import {
55

66
import settings from "../settings.json" with {type: "json"};
77
import DbManager from "./plugins/DbManager.js";
8-
import MongoSocketIoServer from "./plugins/MongoSocketIoServer/index.js";
98
import S3Manager from "./plugins/S3Manager.js";
109
import exampleRoutes from "./routes/example.js";
1110
import queryRoutes from "./routes/query.js";
@@ -56,11 +55,6 @@ const FastifyV1App: FastifyPluginAsync<AppPluginOptions> = async (
5655
profile: settings.StreamFilesS3Profile,
5756
}
5857
);
59-
await fastify.register(MongoSocketIoServer, {
60-
host: settings.MongoDbHost,
61-
port: settings.MongoDbPort,
62-
database: settings.MongoDbName,
63-
});
6458
}
6559

6660
// Register the routes

components/webui/server/src/plugins/MongoSocketIoServer/MongoWatcherCollection.ts renamed to components/webui/server/src/fastify-v2/plugins/app/socket/MongoSocketIoServer/MongoWatcherCollection.ts

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
import {FastifyBaseLogger} from "fastify";
12
import type {
23
Collection,
34
Db,
45
} from "mongodb";
56

6-
import {QueryId} from "../../../../common/index.js";
7+
import {QueryId} from "../../../../../../../common/index.js";
78
import {
89
CLIENT_UPDATE_TIMEOUT_MILLIS,
910
MongoCustomSocket,
@@ -22,15 +23,19 @@ import {
2223
class MongoWatcherCollection {
2324
#collection: Collection;
2425

26+
#logger: FastifyBaseLogger;
27+
2528
// Active watchers
2629
#queryIdtoWatcherMap: Map<QueryId, Watcher> = new Map();
2730

2831
/**
2932
* @param collectionName
33+
* @param logger
3034
* @param mongoDb
3135
*/
32-
constructor (collectionName: string, mongoDb: Db) {
36+
constructor (collectionName: string, logger: FastifyBaseLogger, mongoDb: Db) {
3337
this.#collection = mongoDb.collection(collectionName);
38+
this.#logger = logger;
3439
}
3540

3641
/**
@@ -70,7 +75,7 @@ class MongoWatcherCollection {
7075
const watcher = this.#queryIdtoWatcherMap.get(queryId);
7176

7277
if ("undefined" === typeof watcher) {
73-
console.warn(`No watcher found for queryID:${queryId}`);
78+
this.#logger.warn(`No watcher found for queryID:${queryId}`);
7479

7580
return false;
7681
}
@@ -82,7 +87,7 @@ class MongoWatcherCollection {
8287
}
8388

8489
watcher.changeStream.close().catch((err: unknown) => {
85-
console.error(`Error closing watcher for queryID:${queryId}:`, err);
90+
this.#logger.error(err, `Error closing watcher for queryID:${queryId}`);
8691
});
8792
this.#queryIdtoWatcherMap.delete(queryId);
8893

@@ -146,7 +151,7 @@ class MongoWatcherCollection {
146151
const documents = await this.#collection.find(query, options).toArray();
147152
return documents;
148153
} catch (error) {
149-
console.error("Error fetching data for query:", error);
154+
this.#logger.error(error, "Error fetching data for query");
150155

151156
return [];
152157
}
@@ -187,7 +192,7 @@ class MongoWatcherCollection {
187192
};
188193

189194
fetchAndEmit().catch((error: unknown) => {
190-
console.error("Error in emitUpdatesWithTimeout:", error);
195+
this.#logger.error(error, "Error in emitUpdatesWithTimeout");
191196
});
192197
lastEmitTime = Date.now();
193198
}, delay);
@@ -196,12 +201,12 @@ class MongoWatcherCollection {
196201

197202
watcher.changeStream.on("change", (change) => {
198203
if ("invalidate" === change.operationType) {
199-
console.log("Change stream received invalidate event for queryID", queryId);
204+
this.#logger.info(`Change stream received invalidate event for queryID ${queryId}`);
200205

201206
return;
202207
}
203208
emitUpdateWithTimeout().catch((error: unknown) => {
204-
console.error("Error in emitUpdatesWithTimeout:", error);
209+
this.#logger.error(error, "Error in emitUpdatesWithTimeout");
205210
});
206211
});
207212
}

components/webui/server/src/plugins/MongoSocketIoServer/index.ts renamed to components/webui/server/src/fastify-v2/plugins/app/socket/MongoSocketIoServer/index.ts

Lines changed: 62 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import {lookup as dnsLookup} from "node:dns/promises";
77

88
import fastifyHttpProxy from "@fastify/http-proxy";
99
import {
10+
FastifyBaseLogger,
1011
FastifyInstance,
11-
FastifyPluginAsync,
1212
} from "fastify";
1313
import fastifyPlugin from "fastify-plugin";
1414
import {Db} from "mongodb";
@@ -21,18 +21,16 @@ import type {
2121
Response,
2222
ServerToClientEvents,
2323
SocketData,
24-
} from "../../../../common/index.js";
24+
} from "../../../../../../../common/index.js";
2525
import MongoWatcherCollection from "./MongoWatcherCollection.js";
2626
import {
2727
ConnectionId,
28-
DbOptions,
2928
MongoCustomSocket,
3029
QueryParameters,
3130
} from "./typings.js";
3231
import {
3332
getQuery,
3433
getQueryHash,
35-
initializeMongoClient,
3634
removeItemFromArray,
3735
} from "./utils.js";
3836

@@ -45,7 +43,7 @@ import {
4543
* names per query, limiting the number of events listeners triggered in the client.
4644
*/
4745
class MongoSocketIoServer {
48-
#fastify: FastifyInstance;
46+
#logger: FastifyBaseLogger;
4947

5048
#io: Server<ClientToServerEvents, ServerToClientEvents, InterServerEvents, SocketData>;
5149

@@ -65,36 +63,36 @@ class MongoSocketIoServer {
6563
readonly #mongoDb: Db;
6664

6765
/**
68-
* Private constructor for MongoSocketIoServer. This is not intended to be invoked publicly.
69-
* Instead, use MongoSocketIoServer.create() to create a new instance of the class.
70-
*
71-
* @param fastify
66+
* @param io
67+
* @param logger
7268
* @param mongoDb
7369
*/
74-
constructor (fastify: FastifyInstance, mongoDb: Db) {
75-
this.#fastify = fastify;
70+
private constructor (
71+
io: Server<ClientToServerEvents, ServerToClientEvents, InterServerEvents, SocketData>,
72+
logger: FastifyBaseLogger,
73+
mongoDb: Db
74+
) {
75+
this.#io = io;
76+
this.#logger = logger;
7677
this.#mongoDb = mongoDb;
77-
this.#io = new Server<
78-
ClientToServerEvents,
79-
ServerToClientEvents,
80-
InterServerEvents,
81-
SocketData
82-
>(fastify.server);
8378
this.#registerEventListeners();
8479
}
8580

8681
/**
8782
* Creates a new MongoSocketIoServer.
8883
*
8984
* @param fastify
90-
* @param options
9185
* @return
86+
* @throws {Error} When MongoDB database not found
9287
*/
9388
static async create (
94-
fastify: FastifyInstance,
95-
options: DbOptions
89+
fastify: FastifyInstance
9690
): Promise<MongoSocketIoServer> {
97-
const mongoDb = await initializeMongoClient(options);
91+
const mongoDb = fastify.mongo.db;
92+
93+
if ("undefined" === typeof mongoDb) {
94+
throw new Error("MongoDB database not found");
95+
}
9896

9997
// Fastify listens on all resolved addresses for localhost (e.g. `::1` and `127.0.0.1`), but
10098
// socket.io can only intercept requests on the main server which listens only on the
@@ -124,15 +122,22 @@ class MongoSocketIoServer {
124122
JSON.stringify(e)}`);
125123
}
126124

127-
return new MongoSocketIoServer(fastify, mongoDb);
125+
const io = new Server<
126+
ClientToServerEvents,
127+
ServerToClientEvents,
128+
InterServerEvents,
129+
SocketData
130+
>(fastify.server);
131+
132+
return new MongoSocketIoServer(io, fastify.log, mongoDb);
128133
}
129134

130135
/**
131136
* Registers event listeners on socket connection.
132137
*/
133138
#registerEventListeners () {
134139
this.#io.on("connection", (socket) => {
135-
this.#fastify.log.info(`New socket connected with ID:${socket.id}`);
140+
this.#logger.info(`New socket connected with ID:${socket.id}`);
136141
socket.on("disconnect", this.#disconnectListener.bind(this, socket));
137142
socket.on(
138143
"collection::find::subscribe",
@@ -151,7 +156,7 @@ class MongoSocketIoServer {
151156
* @param socket
152157
*/
153158
async #disconnectListener (socket: MongoCustomSocket) {
154-
this.#fastify.log.info(`Socket:${socket.id} disconnected`);
159+
this.#logger.info(`Socket:${socket.id} disconnected`);
155160
const subscribedQueryIds = this.#subscribedQueryIdsMap.get(socket.id);
156161

157162
if ("undefined" === typeof subscribedQueryIds) {
@@ -163,7 +168,7 @@ class MongoSocketIoServer {
163168
}
164169

165170
this.#subscribedQueryIdsMap.delete(socket.id);
166-
this.#fastify.log.debug(
171+
this.#logger.debug(
167172
"Subscribed queryIDs map" +
168173
` ${JSON.stringify(Array.from(this.#subscribedQueryIdsMap.entries()))}`
169174
);
@@ -232,8 +237,12 @@ class MongoSocketIoServer {
232237
: MongoWatcherCollection {
233238
let watcherCollection = this.#collections.get(collectionName);
234239
if ("undefined" === typeof watcherCollection) {
235-
watcherCollection = new MongoWatcherCollection(collectionName, this.#mongoDb);
236-
this.#fastify.log.debug(`Initialize Mongo watcher collection:${collectionName}.`);
240+
watcherCollection = new MongoWatcherCollection(
241+
collectionName,
242+
this.#logger,
243+
this.#mongoDb
244+
);
245+
this.#logger.debug(`Initialize Mongo watcher collection:${collectionName}.`);
237246
this.#collections.set(collectionName, watcherCollection);
238247
}
239248

@@ -258,14 +267,14 @@ class MongoSocketIoServer {
258267
): Promise<void> {
259268
const {collectionName, query, options} = requestArgs;
260269

261-
this.#fastify.log.debug(
270+
this.#logger.debug(
262271
`Socket:${socket.id} requested query:${JSON.stringify(query)} ` +
263272
`with options:${JSON.stringify(options)} to collection:${collectionName}`
264273
);
265274

266275
const hasCollection = await this.#hasCollection(collectionName);
267276
if (false === hasCollection) {
268-
this.#fastify.log.error(`Collection ${collectionName} does not exist in MongoDB`);
277+
this.#logger.error(`Collection ${collectionName} does not exist in MongoDB`);
269278
callback({
270279
error: `Collection ${collectionName} does not exist in MongoDB on server`,
271280
});
@@ -284,7 +293,7 @@ class MongoSocketIoServer {
284293
callback({data: {queryId, initialDocuments}});
285294

286295
this.#addQueryIdToSubscribedList(queryId, socket.id);
287-
this.#fastify.log.info(
296+
this.#logger.info(
288297
`Socket:${socket.id} subscribed to query:${JSON.stringify(query)} ` +
289298
`with options:${JSON.stringify(options)} ` +
290299
`on collection:${collectionName} with ID:${queryId}`
@@ -324,7 +333,7 @@ class MongoSocketIoServer {
324333
#unsubscribe (socket: MongoCustomSocket, queryId: number) {
325334
const queryHash: string | undefined = this.#queryIdToQueryHashMap.get(queryId);
326335
if ("undefined" === typeof queryHash) {
327-
this.#fastify.log.error(`Query:${queryId} not found in query map`);
336+
this.#logger.error(`Query:${queryId} not found in query map`);
328337

329338
return;
330339
}
@@ -333,26 +342,26 @@ class MongoSocketIoServer {
333342

334343
const collection = this.#collections.get(queryParams.collectionName);
335344
if ("undefined" === typeof collection) {
336-
this.#fastify.log.error(`${queryParams.collectionName} is missing from server`);
345+
this.#logger.error(`${queryParams.collectionName} is missing from server`);
337346

338347
return;
339348
}
340349

341350
const isLastSubscriber = collection.unsubscribe(queryId, socket.id);
342-
this.#fastify.log.info(`Socket:${socket.id} unsubscribed from query:${queryId}`);
351+
this.#logger.info(`Socket:${socket.id} unsubscribed from query:${queryId}`);
343352

344353
if (isLastSubscriber) {
345-
this.#fastify.log.debug(`Query:${queryId} deleted from query map.`);
354+
this.#logger.debug(`Query:${queryId} deleted from query map.`);
346355
this.#queryIdToQueryHashMap.delete(queryId);
347356
}
348357

349-
this.#fastify.log.debug(
358+
this.#logger.debug(
350359
"Query ID to query hash map:" +
351360
` ${JSON.stringify(Array.from(this.#queryIdToQueryHashMap.entries()))}`
352361
);
353362

354363
if (false === collection.isReferenced()) {
355-
this.#fastify.log.debug(`Collection:${queryParams.collectionName}` +
364+
this.#logger.debug(`Collection:${queryParams.collectionName}` +
356365
" deallocated from server.");
357366
this.#collections.delete(queryParams.collectionName);
358367
}
@@ -370,15 +379,15 @@ class MongoSocketIoServer {
370379
requestArgs: {queryId: number}
371380
): Promise<void> {
372381
const {queryId} = requestArgs;
373-
this.#fastify.log.debug(
382+
this.#logger.debug(
374383
`Socket:${socket.id} requested unsubscription to query:${queryId}`
375384
);
376385

377386
const subscribedQueryIds = this.#subscribedQueryIdsMap.get(socket.id);
378387
if ("undefined" === typeof subscribedQueryIds ||
379388
false === subscribedQueryIds.includes(queryId)
380389
) {
381-
this.#fastify.log.error(`Socket ${socket.id} is not subscribed to ${queryId}`);
390+
this.#logger.error(`Socket ${socket.id} is not subscribed to ${queryId}`);
382391

383392
return;
384393
}
@@ -388,27 +397,24 @@ class MongoSocketIoServer {
388397

389398
removeItemFromArray(subscribedQueryIds, queryId);
390399

391-
this.#fastify.log.debug(
400+
this.#logger.debug(
392401
`Subscribed queryIDs map ${
393402
JSON.stringify(Array.from(this.#subscribedQueryIdsMap.entries()))}`
394403
);
395404
}
396405
}
397406

398-
/**
399-
* A Fastify plugin callback for setting up the `MongoSocketIoServer`.
400-
*
401-
* @param app
402-
* @param options
403-
* @param options.database
404-
* @param options.host
405-
* @param options.port
406-
*/
407-
const MongoServerPlugin: FastifyPluginAsync<DbOptions> = async (
408-
app: FastifyInstance,
409-
options: DbOptions
410-
) => {
411-
await MongoSocketIoServer.create(app, options);
412-
};
413-
414-
export default fastifyPlugin(MongoServerPlugin);
407+
declare module "fastify" {
408+
export interface FastifyInstance {
409+
MongoSocketIoServer: MongoSocketIoServer;
410+
}
411+
}
412+
413+
export default fastifyPlugin(
414+
async (fastify) => {
415+
fastify.decorate("MongoSocketIoServer", await MongoSocketIoServer.create(fastify));
416+
},
417+
{
418+
name: "MongoSocketIoServer",
419+
}
420+
);

components/webui/server/src/plugins/MongoSocketIoServer/typings.ts renamed to components/webui/server/src/fastify-v2/plugins/app/socket/MongoSocketIoServer/typings.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
InterServerEvents,
1212
ServerToClientEvents,
1313
SocketData,
14-
} from "../../../../common/index.js";
14+
} from "../../../../../../../common/index.js";
1515

1616

1717
/**

0 commit comments

Comments
 (0)