|
| 1 | +import z from "zod"; |
| 2 | +import path from "path"; |
| 3 | +import fs from "fs/promises"; |
| 4 | +import { createWriteStream } from "fs"; |
| 5 | +import { lock } from "proper-lockfile"; |
| 6 | +import { FindCursor } from "mongodb"; |
| 7 | +import { EJSON, EJSONOptions } from "bson"; |
| 8 | +import { Transform } from "stream"; |
| 9 | +import { pipeline } from "stream/promises"; |
| 10 | + |
| 11 | +import { UserConfig } from "./config.js"; |
| 12 | +import { Session } from "./session.js"; |
| 13 | +import logger, { LogId } from "./logger.js"; |
| 14 | + |
| 15 | +export const jsonExportFormat = z.enum(["relaxed", "canonical"]); |
| 16 | +export type JSONExportFormat = z.infer<typeof jsonExportFormat>; |
| 17 | + |
| 18 | +export type Export = { |
| 19 | + name: string; |
| 20 | + uri: string; |
| 21 | + createdAt: number; |
| 22 | +}; |
| 23 | + |
| 24 | +export type SessionExportsManagerConfig = Pick< |
| 25 | + UserConfig, |
| 26 | + "exportPath" | "exportTimeoutMs" | "exportCleanupIntervalMs" |
| 27 | +>; |
| 28 | + |
| 29 | +export class SessionExportsManager { |
| 30 | + private mutableExports: Export[] = []; |
| 31 | + private exportsCleanupInterval: NodeJS.Timeout; |
| 32 | + private exportsCleanupInProgress: boolean = false; |
| 33 | + |
| 34 | + constructor( |
| 35 | + private readonly session: Session, |
| 36 | + private readonly config: SessionExportsManagerConfig |
| 37 | + ) { |
| 38 | + this.exportsCleanupInterval = setInterval( |
| 39 | + () => void this.cleanupExpiredExports(), |
| 40 | + this.config.exportCleanupIntervalMs |
| 41 | + ); |
| 42 | + } |
| 43 | + |
| 44 | + public close() { |
| 45 | + clearInterval(this.exportsCleanupInterval); |
| 46 | + } |
| 47 | + |
| 48 | + public exportNameToResourceURI(nameWithExtension: string): string { |
| 49 | + if (!path.extname(nameWithExtension)) { |
| 50 | + throw new Error("Provided export name has no extension"); |
| 51 | + } |
| 52 | + return `exported-data://${nameWithExtension}`; |
| 53 | + } |
| 54 | + |
| 55 | + public exportsDirectoryPath(): string { |
| 56 | + // If the session is not connected, we can't cannot work with exports |
| 57 | + // for that session. |
| 58 | + if (!this.session.sessionId) { |
| 59 | + throw new Error("Cannot retrieve exports directory, no active session. Try to reconnect to the MCP server"); |
| 60 | + } |
| 61 | + |
| 62 | + return path.join(this.config.exportPath, this.session.sessionId); |
| 63 | + } |
| 64 | + |
| 65 | + public exportFilePath(exportsDirectoryPath: string, exportNameWithExtension: string): string { |
| 66 | + if (!path.extname(exportNameWithExtension)) { |
| 67 | + throw new Error("Provided export name has no extension"); |
| 68 | + } |
| 69 | + return path.join(exportsDirectoryPath, exportNameWithExtension); |
| 70 | + } |
| 71 | + |
| 72 | + public listAvailableExports(): Export[] { |
| 73 | + // Note that we don't account for ongoing cleanup or creation operation, |
| 74 | + // by not acquiring a lock on read. That is because this we require this |
| 75 | + // interface to be fast and just accurate enough for MCP completions |
| 76 | + // API. |
| 77 | + return this.mutableExports.filter(({ createdAt }) => { |
| 78 | + return !this.isExportExpired(createdAt); |
| 79 | + }); |
| 80 | + } |
| 81 | + |
| 82 | + public async readExport(exportNameWithExtension: string): Promise<string> { |
| 83 | + try { |
| 84 | + const exportsDirectoryPath = await this.ensureExportsDirectory(); |
| 85 | + const exportFilePath = this.exportFilePath(exportsDirectoryPath, exportNameWithExtension); |
| 86 | + if (await this.isExportFileExpired(exportFilePath)) { |
| 87 | + throw new Error("Export has expired"); |
| 88 | + } |
| 89 | + |
| 90 | + return await fs.readFile(exportFilePath, "utf8"); |
| 91 | + } catch (error) { |
| 92 | + logger.error( |
| 93 | + LogId.exportReadError, |
| 94 | + "Error when reading export", |
| 95 | + error instanceof Error ? error.message : String(error) |
| 96 | + ); |
| 97 | + throw error; |
| 98 | + } |
| 99 | + } |
| 100 | + |
| 101 | + public async createJSONExport({ |
| 102 | + input, |
| 103 | + exportName, |
| 104 | + jsonExportFormat, |
| 105 | + }: { |
| 106 | + input: FindCursor; |
| 107 | + exportName: string; |
| 108 | + jsonExportFormat: JSONExportFormat; |
| 109 | + }): Promise<void> { |
| 110 | + try { |
| 111 | + await this.withExportsLock<void>(async (exportsDirectoryPath) => { |
| 112 | + const exportNameWithExtension = this.withExtension(exportName, "json"); |
| 113 | + const exportFilePath = path.join(exportsDirectoryPath, exportNameWithExtension); |
| 114 | + const outputStream = createWriteStream(exportFilePath); |
| 115 | + outputStream.write("["); |
| 116 | + try { |
| 117 | + const inputStream = input.stream(); |
| 118 | + const ejsonOptions = this.getEJSONOptionsForFormat(jsonExportFormat); |
| 119 | + await pipeline([inputStream, this.docToEJSONStream(ejsonOptions), outputStream]); |
| 120 | + } finally { |
| 121 | + outputStream.write("]\n"); |
| 122 | + const resourceURI = this.exportNameToResourceURI(exportNameWithExtension); |
| 123 | + this.mutableExports = [ |
| 124 | + ...this.mutableExports, |
| 125 | + { |
| 126 | + createdAt: (await fs.stat(exportFilePath)).birthtimeMs, |
| 127 | + name: exportNameWithExtension, |
| 128 | + uri: resourceURI, |
| 129 | + }, |
| 130 | + ]; |
| 131 | + this.session.emit("export-available", resourceURI); |
| 132 | + void input.close(); |
| 133 | + } |
| 134 | + }); |
| 135 | + } catch (error) { |
| 136 | + logger.error( |
| 137 | + LogId.exportCreationError, |
| 138 | + "Error when generating JSON export", |
| 139 | + error instanceof Error ? error.message : String(error) |
| 140 | + ); |
| 141 | + throw error; |
| 142 | + } |
| 143 | + } |
| 144 | + |
| 145 | + private getEJSONOptionsForFormat(format: JSONExportFormat): EJSONOptions | undefined { |
| 146 | + if (format === "relaxed") { |
| 147 | + return { |
| 148 | + relaxed: true, |
| 149 | + }; |
| 150 | + } |
| 151 | + return format === "canonical" |
| 152 | + ? { |
| 153 | + relaxed: false, |
| 154 | + } |
| 155 | + : undefined; |
| 156 | + } |
| 157 | + |
| 158 | + private docToEJSONStream(ejsonOptions: EJSONOptions | undefined) { |
| 159 | + let docsTransformed = 0; |
| 160 | + return new Transform({ |
| 161 | + objectMode: true, |
| 162 | + transform: function (chunk: unknown, encoding, callback) { |
| 163 | + ++docsTransformed; |
| 164 | + try { |
| 165 | + const doc: string = EJSON.stringify(chunk, undefined, 2, ejsonOptions); |
| 166 | + const line = `${docsTransformed > 1 ? ",\n" : ""}${doc}`; |
| 167 | + |
| 168 | + callback(null, line); |
| 169 | + } catch (err: unknown) { |
| 170 | + callback(err as Error); |
| 171 | + } |
| 172 | + }, |
| 173 | + final: function (callback) { |
| 174 | + this.push("]"); |
| 175 | + callback(null); |
| 176 | + }, |
| 177 | + }); |
| 178 | + } |
| 179 | + |
| 180 | + private async cleanupExpiredExports(): Promise<void> { |
| 181 | + if (this.exportsCleanupInProgress) { |
| 182 | + return; |
| 183 | + } |
| 184 | + |
| 185 | + this.exportsCleanupInProgress = true; |
| 186 | + try { |
| 187 | + await this.withExportsLock(async (exportsDirectoryPath) => { |
| 188 | + const exports = await this.listExportFiles(); |
| 189 | + for (const exportName of exports) { |
| 190 | + const exportPath = this.exportFilePath(exportsDirectoryPath, exportName); |
| 191 | + if (await this.isExportFileExpired(exportPath)) { |
| 192 | + await fs.unlink(exportPath); |
| 193 | + this.mutableExports = this.mutableExports.filter(({ name }) => name !== exportName); |
| 194 | + this.session.emit("export-expired", this.exportNameToResourceURI(exportName)); |
| 195 | + } |
| 196 | + } |
| 197 | + }); |
| 198 | + } catch (error) { |
| 199 | + logger.error( |
| 200 | + LogId.exportCleanupError, |
| 201 | + "Error when cleaning up exports", |
| 202 | + error instanceof Error ? error.message : String(error) |
| 203 | + ); |
| 204 | + } finally { |
| 205 | + this.exportsCleanupInProgress = false; |
| 206 | + } |
| 207 | + } |
| 208 | + |
| 209 | + /** |
| 210 | + * Small utility to centrally determine if an export is expired or not */ |
| 211 | + private async isExportFileExpired(exportFilePath: string): Promise<boolean> { |
| 212 | + const stats = await fs.stat(exportFilePath); |
| 213 | + return this.isExportExpired(stats.birthtimeMs); |
| 214 | + } |
| 215 | + |
| 216 | + private isExportExpired(createdAt: number) { |
| 217 | + return Date.now() - createdAt > this.config.exportTimeoutMs; |
| 218 | + } |
| 219 | + |
| 220 | + /** |
| 221 | + * Ensures the path ends with the provided extension */ |
| 222 | + private withExtension(pathOrName: string, extension: string): string { |
| 223 | + const extWithDot = extension.startsWith(".") ? extension : `.${extension}`; |
| 224 | + if (path.extname(pathOrName) === extWithDot) { |
| 225 | + return pathOrName; |
| 226 | + } |
| 227 | + return `${pathOrName}${extWithDot}`; |
| 228 | + } |
| 229 | + |
| 230 | + /** |
| 231 | + * Creates the session exports directory and returns the path */ |
| 232 | + private async ensureExportsDirectory(): Promise<string> { |
| 233 | + const exportsDirectoryPath = this.exportsDirectoryPath(); |
| 234 | + await fs.mkdir(exportsDirectoryPath, { recursive: true }); |
| 235 | + return exportsDirectoryPath; |
| 236 | + } |
| 237 | + |
| 238 | + /** |
| 239 | + * Acquires a lock on the session exports directory. */ |
| 240 | + private async withExportsLock<R>(callback: (lockedPath: string) => Promise<R>): Promise<R> { |
| 241 | + let releaseLock: (() => Promise<void>) | undefined; |
| 242 | + const exportsDirectoryPath = await this.ensureExportsDirectory(); |
| 243 | + try { |
| 244 | + releaseLock = await lock(exportsDirectoryPath, { retries: 10 }); |
| 245 | + return await callback(exportsDirectoryPath); |
| 246 | + } finally { |
| 247 | + await releaseLock?.(); |
| 248 | + } |
| 249 | + } |
| 250 | + |
| 251 | + /** |
| 252 | + * Lists exported files in the session export directory, while ignoring the |
| 253 | + * hidden files and files without extensions. */ |
| 254 | + private async listExportFiles(): Promise<string[]> { |
| 255 | + const exportsDirectory = await this.ensureExportsDirectory(); |
| 256 | + const directoryContents = await fs.readdir(exportsDirectory, "utf8"); |
| 257 | + return directoryContents.filter((maybeExportName) => { |
| 258 | + return !maybeExportName.startsWith(".") && !!path.extname(maybeExportName); |
| 259 | + }); |
| 260 | + } |
| 261 | +} |
0 commit comments