Skip to content

Commit c8709d2

Browse files
feat: makes export tool async
1 parent 8eeb5b2 commit c8709d2

File tree

5 files changed

+198
-50
lines changed

5 files changed

+198
-50
lines changed

src/common/sessionExportsManager.ts

Lines changed: 63 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,22 @@ import { LoggerBase, LogId } from "./logger.js";
1414
export const jsonExportFormat = z.enum(["relaxed", "canonical"]);
1515
export type JSONExportFormat = z.infer<typeof jsonExportFormat>;
1616

17-
type StoredExport = {
17+
interface CommonExportData {
1818
exportName: string;
1919
exportURI: string;
2020
exportPath: string;
21+
}
22+
23+
interface ReadyExport extends CommonExportData {
24+
exportStatus: "ready";
2125
exportCreatedAt: number;
22-
};
26+
}
27+
28+
interface InProgressExport extends CommonExportData {
29+
exportStatus: "in-progress";
30+
}
31+
32+
type StoredExport = ReadyExport | InProgressExport;
2333

2434
/**
2535
* Ideally just exportName and exportURI should be made publicly available but
@@ -75,7 +85,12 @@ export class SessionExportsManager extends EventEmitter<SessionExportsManagerEve
7585

7686
public get availableExports(): AvailableExport[] {
7787
return Object.values(this.sessionExports)
78-
.filter(({ exportCreatedAt: createdAt }) => !isExportExpired(createdAt, this.config.exportTimeoutMs))
88+
.filter((sessionExport) => {
89+
return (
90+
sessionExport.exportStatus === "ready" &&
91+
!isExportExpired(sessionExport.exportCreatedAt, this.config.exportTimeoutMs)
92+
);
93+
})
7994
.map(({ exportName, exportURI, exportPath }) => ({
8095
exportName,
8196
exportURI,
@@ -104,6 +119,10 @@ export class SessionExportsManager extends EventEmitter<SessionExportsManagerEve
104119
throw new Error("Requested export has either expired or does not exist!");
105120
}
106121

122+
if (exportHandle.exportStatus === "in-progress") {
123+
throw new Error("Requested export is still being generated!");
124+
}
125+
107126
const { exportPath, exportCreatedAt } = exportHandle;
108127

109128
if (isExportExpired(exportCreatedAt, this.config.exportTimeoutMs)) {
@@ -124,38 +143,61 @@ export class SessionExportsManager extends EventEmitter<SessionExportsManagerEve
124143
}
125144
}
126145

127-
public async createJSONExport({
146+
public createJSONExport({
128147
input,
129148
exportName,
130149
jsonExportFormat,
131150
}: {
132151
input: FindCursor;
133152
exportName: string;
134153
jsonExportFormat: JSONExportFormat;
135-
}): Promise<AvailableExport> {
154+
}): AvailableExport {
136155
try {
137156
const exportNameWithExtension = validateExportName(ensureExtension(exportName, "json"));
138157
const exportURI = `exported-data://${encodeURIComponent(exportNameWithExtension)}`;
139158
const exportFilePath = path.join(this.exportsDirectoryPath, exportNameWithExtension);
159+
const inProgressExport: InProgressExport = (this.sessionExports[exportNameWithExtension] = {
160+
exportName: exportNameWithExtension,
161+
exportPath: exportFilePath,
162+
exportURI: exportURI,
163+
exportStatus: "in-progress",
164+
});
140165

166+
void this.startExport({ input, jsonExportFormat, inProgressExport });
167+
return inProgressExport;
168+
} catch (error) {
169+
this.logger.error({
170+
id: LogId.exportCreationError,
171+
context: "Error when registering JSON export request",
172+
message: error instanceof Error ? error.message : String(error),
173+
});
174+
throw error;
175+
}
176+
}
177+
178+
private async startExport({
179+
input,
180+
jsonExportFormat,
181+
inProgressExport,
182+
}: {
183+
input: FindCursor;
184+
jsonExportFormat: JSONExportFormat;
185+
inProgressExport: InProgressExport;
186+
}): Promise<void> {
187+
try {
141188
await fs.mkdir(this.exportsDirectoryPath, { recursive: true });
142189
const inputStream = input.stream();
143190
const ejsonDocStream = this.docToEJSONStream(this.getEJSONOptionsForFormat(jsonExportFormat));
144-
const outputStream = createWriteStream(exportFilePath);
191+
const outputStream = createWriteStream(inProgressExport.exportPath);
145192
outputStream.write("[");
146193
let pipeSuccessful = false;
147194
try {
148195
await pipeline([inputStream, ejsonDocStream, outputStream]);
149196
pipeSuccessful = true;
150-
return {
151-
exportName,
152-
exportURI,
153-
exportPath: exportFilePath,
154-
};
155197
} catch (pipelineError) {
156198
// If the pipeline errors out then we might end up with
157199
// partial and incorrect export so we remove it entirely.
158-
await fs.unlink(exportFilePath).catch((error) => {
200+
await fs.unlink(inProgressExport.exportPath).catch((error) => {
159201
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
160202
this.logger.error({
161203
id: LogId.exportCreationCleanupError,
@@ -164,17 +206,17 @@ export class SessionExportsManager extends EventEmitter<SessionExportsManagerEve
164206
});
165207
}
166208
});
209+
delete this.sessionExports[inProgressExport.exportName];
167210
throw pipelineError;
168211
} finally {
169212
void input.close();
170213
if (pipeSuccessful) {
171-
this.sessionExports[exportNameWithExtension] = {
172-
exportName: exportNameWithExtension,
214+
this.sessionExports[inProgressExport.exportName] = {
215+
...inProgressExport,
173216
exportCreatedAt: Date.now(),
174-
exportPath: exportFilePath,
175-
exportURI: exportURI,
217+
exportStatus: "ready",
176218
};
177-
this.emit("export-available", exportURI);
219+
this.emit("export-available", inProgressExport.exportURI);
178220
}
179221
}
180222
} catch (error) {
@@ -183,7 +225,6 @@ export class SessionExportsManager extends EventEmitter<SessionExportsManagerEve
183225
context: "Error when generating JSON export",
184226
message: error instanceof Error ? error.message : String(error),
185227
});
186-
throw error;
187228
}
188229
}
189230

@@ -228,9 +269,11 @@ export class SessionExportsManager extends EventEmitter<SessionExportsManagerEve
228269
}
229270

230271
this.exportsCleanupInProgress = true;
231-
const exportsForCleanup = { ...this.sessionExports };
272+
const exportsForCleanup = Object.values({ ...this.sessionExports }).filter(
273+
(sessionExport): sessionExport is ReadyExport => sessionExport.exportStatus === "ready"
274+
);
232275
try {
233-
for (const { exportPath, exportCreatedAt, exportURI, exportName } of Object.values(exportsForCleanup)) {
276+
for (const { exportPath, exportCreatedAt, exportURI, exportName } of exportsForCleanup) {
234277
if (isExportExpired(exportCreatedAt, this.config.exportTimeoutMs)) {
235278
delete this.sessionExports[exportName];
236279
await this.silentlyRemoveExport(exportPath);

src/tools/mongodb/read/export.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export class ExportTool extends MongoDBToolBase {
4343
});
4444
const exportName = `${database}.${collection}.${Date.now()}.json`;
4545

46-
const { exportURI, exportPath } = await this.session.exportsManager.createJSONExport({
46+
const { exportURI, exportPath } = this.session.exportsManager.createJSONExport({
4747
input: findCursor,
4848
exportName,
4949
jsonExportFormat,
@@ -54,13 +54,13 @@ export class ExportTool extends MongoDBToolBase {
5454
// understand what to do with the result.
5555
{
5656
type: "text",
57-
text: `Exported data for namespace ${database}.${collection} is available under resource URI - "${exportURI}".`,
57+
text: `Data for namespace ${database}.${collection} is being exported and will be made available under resource URI - "${exportURI}".`,
5858
},
5959
{
6060
type: "resource_link",
6161
name: exportName,
6262
uri: exportURI,
63-
description: "Resource URI for fetching exported data.",
63+
description: "Resource URI for fetching exported data once it is ready.",
6464
mimeType: "application/json",
6565
},
6666
];
@@ -71,7 +71,7 @@ export class ExportTool extends MongoDBToolBase {
7171
if (this.config.transport === "stdio") {
7272
toolCallContent.push({
7373
type: "text",
74-
text: `Optionally, the exported data can also be accessed under path - "${exportPath}"`,
74+
text: `Optionally, when the export is finished, the exported data can also be accessed under path - "${exportPath}"`,
7575
});
7676
}
7777

tests/integration/resources/exportedData.test.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,13 @@ describeWithMongoDB(
5959
expect(exportedResourceURI).toBeDefined();
6060

6161
// wait for export expired
62-
await timeout(200);
62+
await timeout(250);
6363
const response = await integration.mcpClient().readResource({
6464
uri: exportedResourceURI as string,
6565
});
6666
expect(response.isError).toEqual(true);
6767
expect(response.contents[0]?.uri).toEqual(exportedResourceURI);
68-
expect(response.contents[0]?.text).toEqual(
69-
`Error reading ${exportedResourceURI}: Requested export has expired!`
70-
);
68+
expect(response.contents[0]?.text).toMatch(`Error reading ${exportedResourceURI}:`);
7169
});
7270
});
7371

@@ -78,6 +76,8 @@ describeWithMongoDB(
7876
name: "export",
7977
arguments: { database: "db", collection: "coll" },
8078
});
79+
// Small timeout to let export finish
80+
await timeout(50);
8181

8282
const exportedResourceURI = (exportResponse as CallToolResult).content.find(
8383
(part) => part.type === "resource_link"
@@ -98,6 +98,8 @@ describeWithMongoDB(
9898
name: "export",
9999
arguments: { database: "big", collection: "coll" },
100100
});
101+
// Small timeout to let export finish
102+
await timeout(50);
101103

102104
const exportedResourceURI = (exportResponse as CallToolResult).content.find(
103105
(part) => part.type === "resource_link"

tests/integration/tools/mongodb/read/export.test.ts

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import fs from "fs/promises";
22
import { beforeEach, describe, expect, it } from "vitest";
33
import {
44
databaseCollectionParameters,
5+
timeout,
56
validateThrowsForInvalidArguments,
67
validateToolMetadata,
78
} from "../../../helpers.js";
@@ -11,10 +12,7 @@ import { Long } from "bson";
1112

1213
function contentWithTextResourceURI(content: CallToolResult["content"], namespace: string) {
1314
return content.find((part) => {
14-
return (
15-
part.type === "text" &&
16-
part.text.startsWith(`Exported data for namespace ${namespace} is available under resource URI -`)
17-
);
15+
return part.type === "text" && part.text.startsWith(`Data for namespace ${namespace}`);
1816
});
1917
}
2018

@@ -28,7 +26,9 @@ function contentWithExportPath(content: CallToolResult["content"]) {
2826
return content.find((part) => {
2927
return (
3028
part.type === "text" &&
31-
part.text.startsWith(`Optionally, the exported data can also be accessed under path -`)
29+
part.text.startsWith(
30+
`Optionally, when the export is finished, the exported data can also be accessed under path -`
31+
)
3232
);
3333
});
3434
}
@@ -95,6 +95,8 @@ describeWithMongoDB("export tool", (integration) => {
9595
name: "export",
9696
arguments: { database: "non-existent", collection: "foos" },
9797
});
98+
// Small timeout to let export finish
99+
await timeout(10);
98100

99101
const content = response.content as CallToolResult["content"];
100102
const namespace = "non-existent.foos";
@@ -129,6 +131,8 @@ describeWithMongoDB("export tool", (integration) => {
129131
name: "export",
130132
arguments: { database: integration.randomDbName(), collection: "foo" },
131133
});
134+
// Small timeout to let export finish
135+
await timeout(10);
132136

133137
const localPathPart = contentWithExportPath(response.content as CallToolResult["content"]);
134138
expect(localPathPart).toBeDefined();
@@ -150,6 +154,8 @@ describeWithMongoDB("export tool", (integration) => {
150154
name: "export",
151155
arguments: { database: integration.randomDbName(), collection: "foo", filter: { name: "foo" } },
152156
});
157+
// Small timeout to let export finish
158+
await timeout(10);
153159

154160
const localPathPart = contentWithExportPath(response.content as CallToolResult["content"]);
155161
expect(localPathPart).toBeDefined();
@@ -170,6 +176,8 @@ describeWithMongoDB("export tool", (integration) => {
170176
name: "export",
171177
arguments: { database: integration.randomDbName(), collection: "foo", limit: 1 },
172178
});
179+
// Small timeout to let export finish
180+
await timeout(10);
173181

174182
const localPathPart = contentWithExportPath(response.content as CallToolResult["content"]);
175183
expect(localPathPart).toBeDefined();
@@ -195,6 +203,8 @@ describeWithMongoDB("export tool", (integration) => {
195203
sort: { longNumber: 1 },
196204
},
197205
});
206+
// Small timeout to let export finish
207+
await timeout(10);
198208

199209
const localPathPart = contentWithExportPath(response.content as CallToolResult["content"]);
200210
expect(localPathPart).toBeDefined();
@@ -220,6 +230,8 @@ describeWithMongoDB("export tool", (integration) => {
220230
projection: { _id: 0, name: 1 },
221231
},
222232
});
233+
// Small timeout to let export finish
234+
await timeout(10);
223235

224236
const localPathPart = contentWithExportPath(response.content as CallToolResult["content"]);
225237
expect(localPathPart).toBeDefined();
@@ -249,6 +261,8 @@ describeWithMongoDB("export tool", (integration) => {
249261
jsonExportFormat: "relaxed",
250262
},
251263
});
264+
// Small timeout to let export finish
265+
await timeout(10);
252266

253267
const localPathPart = contentWithExportPath(response.content as CallToolResult["content"]);
254268
expect(localPathPart).toBeDefined();
@@ -279,6 +293,8 @@ describeWithMongoDB("export tool", (integration) => {
279293
jsonExportFormat: "canonical",
280294
},
281295
});
296+
// Small timeout to let export finish
297+
await timeout(10);
282298

283299
const localPathPart = contentWithExportPath(response.content as CallToolResult["content"]);
284300
expect(localPathPart).toBeDefined();

0 commit comments

Comments
 (0)