Skip to content

Commit 9e8053b

Browse files
committed
feat(platform): add file-stream utility
This commit introduces a new helper for working with file streams, providing a unified interface for reading local and remote files.
1 parent dd7bb9b commit 9e8053b

File tree

7 files changed

+511
-0
lines changed

7 files changed

+511
-0
lines changed

platform/__tests__/file-stream.js

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
const {
2+
getFileStream, getFileStreamAndMetadata,
3+
} = require("../dist");
4+
const fs = require("fs");
5+
const path = require("path");
6+
const http = require("http");
7+
const os = require("os");
8+
9+
// Helper function to read content from a readable stream
10+
async function readStreamContent(stream) {
11+
let content = "";
12+
stream.on("data", (chunk) => {
13+
content += chunk.toString();
14+
});
15+
16+
await new Promise((resolve) => {
17+
stream.on("end", resolve);
18+
});
19+
20+
return content;
21+
}
22+
23+
// Helper function to wait for stream cleanup by listening to close event
24+
async function waitForStreamCleanup(stream) {
25+
return new Promise((resolve) => {
26+
stream.on("close", resolve);
27+
});
28+
}
29+
30+
describe("file-stream", () => {
31+
let testFilePath;
32+
let server;
33+
const testPort = 3892;
34+
35+
beforeAll(() => {
36+
// Create a test file
37+
testFilePath = path.join(__dirname, "test-file.txt");
38+
fs.writeFileSync(testFilePath, "test content for file stream");
39+
40+
// Create a simple HTTP server for testing remote files
41+
server = http.createServer((req, res) => {
42+
if (req.url === "/test-file.txt") {
43+
res.writeHead(200, {
44+
"Content-Type": "text/plain",
45+
"Content-Length": "28",
46+
"Last-Modified": new Date().toUTCString(),
47+
"ETag": "\"test-etag\"",
48+
});
49+
res.end("test content for file stream");
50+
} else if (req.url === "/no-content-length") {
51+
res.writeHead(200, {
52+
"Content-Type": "application/json",
53+
"Last-Modified": new Date().toUTCString(),
54+
});
55+
res.end("{\"test\": \"data\"}");
56+
} else if (req.url === "/error") {
57+
res.writeHead(404, "Not Found");
58+
res.end();
59+
} else {
60+
res.writeHead(404, "Not Found");
61+
res.end();
62+
}
63+
});
64+
65+
return new Promise((resolve) =>
66+
server.listen(testPort, resolve));
67+
});
68+
69+
afterAll(() => {
70+
// Clean up test file
71+
if (fs.existsSync(testFilePath)) {
72+
fs.unlinkSync(testFilePath);
73+
}
74+
75+
if (server) {
76+
return new Promise((resolve) =>
77+
server.close(resolve));
78+
}
79+
});
80+
81+
describe("getFileStream", () => {
82+
it("should return readable stream for local file", async () => {
83+
const stream = await getFileStream(testFilePath);
84+
expect(stream).toBeDefined();
85+
expect(typeof stream.read).toBe("function");
86+
87+
const content = await readStreamContent(stream);
88+
expect(content).toBe("test content for file stream");
89+
});
90+
91+
it("should return readable stream for remote URL", async () => {
92+
const stream = await getFileStream(`http://localhost:${testPort}/test-file.txt`);
93+
expect(stream).toBeDefined();
94+
expect(typeof stream.read).toBe("function");
95+
96+
const content = await readStreamContent(stream);
97+
expect(content).toBe("test content for file stream");
98+
});
99+
100+
it("should throw error for invalid URL", async () => {
101+
await expect(getFileStream(`http://localhost:${testPort}/error`))
102+
.rejects.toThrow("Failed to fetch");
103+
});
104+
105+
it("should throw error for non-existent local file", async () => {
106+
await expect(getFileStream("/non/existent/file.txt"))
107+
.rejects.toThrow();
108+
});
109+
});
110+
111+
describe("getFileStreamAndMetadata", () => {
112+
it("should return stream and metadata for local file", async () => {
113+
const result = await getFileStreamAndMetadata(testFilePath);
114+
115+
expect(result.stream).toBeDefined();
116+
expect(typeof result.stream.read).toBe("function");
117+
expect(result.metadata).toMatchObject({
118+
size: 28,
119+
name: "test-file.txt",
120+
});
121+
expect(result.metadata.lastModified.constructor.name).toBe("Date");
122+
const content = await readStreamContent(result.stream);
123+
expect(content).toBe("test content for file stream");
124+
});
125+
126+
it("should return stream and metadata for remote file with content-length", async () => {
127+
const result = await getFileStreamAndMetadata(`http://localhost:${testPort}/test-file.txt`);
128+
129+
expect(result.stream).toBeDefined();
130+
expect(typeof result.stream.read).toBe("function");
131+
expect(result.metadata).toMatchObject({
132+
size: 28,
133+
contentType: "text/plain",
134+
name: "test-file.txt",
135+
etag: "\"test-etag\"",
136+
});
137+
expect(result.metadata.lastModified).toBeInstanceOf(Date);
138+
const content = await readStreamContent(result.stream);
139+
expect(content).toBe("test content for file stream");
140+
});
141+
142+
it("should handle remote file without content-length", async () => {
143+
const result = await getFileStreamAndMetadata(`http://localhost:${testPort}/no-content-length`);
144+
145+
expect(result.stream).toBeDefined();
146+
expect(typeof result.stream.read).toBe("function");
147+
148+
expect(result.metadata).toMatchObject({
149+
size: 16, // Size determined after download
150+
contentType: "application/json",
151+
});
152+
expect(result.metadata.lastModified).toBeInstanceOf(Date);
153+
154+
const content = await readStreamContent(result.stream);
155+
expect(content).toBe("{\"test\": \"data\"}");
156+
});
157+
158+
it("should throw error for invalid remote URL", async () => {
159+
await expect(getFileStreamAndMetadata(`http://localhost:${testPort}/error`))
160+
.rejects.toThrow("Failed to fetch");
161+
});
162+
});
163+
164+
describe("temporary file cleanup", () => {
165+
it("should clean up temporary files after stream ends", async () => {
166+
const tmpDir = os.tmpdir();
167+
const tempFilesBefore = fs.readdirSync(tmpDir);
168+
const result = await getFileStreamAndMetadata(`http://localhost:${testPort}/no-content-length`);
169+
170+
const content = await readStreamContent(result.stream);
171+
// Wait for cleanup to complete by listening to close event
172+
await waitForStreamCleanup(result.stream);
173+
174+
// Check that temp files were cleaned up
175+
const tempFilesAfter = fs.readdirSync(tmpDir);
176+
expect(tempFilesAfter.length).toEqual(tempFilesBefore.length);
177+
expect(content).toBe("{\"test\": \"data\"}");
178+
});
179+
180+
it("should clean up temporary files on stream error", async () => {
181+
// Check temp files before
182+
const tmpDir = os.tmpdir();
183+
const tempFilesBefore = fs.readdirSync(tmpDir);
184+
185+
const result = await getFileStreamAndMetadata(`http://localhost:${testPort}/no-content-length`);
186+
187+
// Trigger an error and wait for cleanup
188+
result.stream.destroy(new Error("Test error"));
189+
await waitForStreamCleanup(result.stream);
190+
191+
// Check that temp files were cleaned up
192+
const tempFilesAfter = fs.readdirSync(tmpDir);
193+
expect(tempFilesAfter.length).toEqual(tempFilesBefore.length);
194+
});
195+
});
196+
});

platform/dist/file-stream.d.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/// <reference types="node" />
2+
import { Readable } from "stream";
3+
export interface FileMetadata {
4+
size?: number;
5+
contentType?: string;
6+
lastModified?: Date;
7+
name?: string;
8+
etag?: string;
9+
}
10+
/**
11+
* @param pathOrUrl - a file path or a URL
12+
* @returns a Readable stream of the file content
13+
*/
14+
export declare function getFileStream(pathOrUrl: string): Promise<Readable>;
15+
/**
16+
* @param pathOrUrl - a file path or a URL
17+
* @returns a Readable stream of the file content and its metadata
18+
*/
19+
export declare function getFileStreamAndMetadata(pathOrUrl: string): Promise<{
20+
stream: Readable;
21+
metadata: FileMetadata;
22+
}>;

platform/dist/file-stream.js

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
"use strict";
2+
Object.defineProperty(exports, "__esModule", { value: true });
3+
exports.getFileStreamAndMetadata = exports.getFileStream = void 0;
4+
const stream_1 = require("stream");
5+
const fs_1 = require("fs");
6+
const os_1 = require("os");
7+
const path_1 = require("path");
8+
const fs_2 = require("fs");
9+
const promises_1 = require("stream/promises");
10+
/**
11+
* @param pathOrUrl - a file path or a URL
12+
* @returns a Readable stream of the file content
13+
*/
14+
async function getFileStream(pathOrUrl) {
15+
if (isUrl(pathOrUrl)) {
16+
const response = await fetch(pathOrUrl);
17+
if (!response.ok) {
18+
throw new Error(`Failed to fetch ${pathOrUrl}: ${response.status} ${response.statusText}`);
19+
}
20+
return stream_1.Readable.fromWeb(response.body);
21+
}
22+
else {
23+
// Check if file exists first (this will throw if file doesn't exist)
24+
await fs_1.promises.stat(pathOrUrl);
25+
return fs_1.createReadStream(pathOrUrl);
26+
}
27+
}
28+
exports.getFileStream = getFileStream;
29+
/**
30+
* @param pathOrUrl - a file path or a URL
31+
* @returns a Readable stream of the file content and its metadata
32+
*/
33+
async function getFileStreamAndMetadata(pathOrUrl) {
34+
if (isUrl(pathOrUrl)) {
35+
return await getRemoteFileStreamAndMetadata(pathOrUrl);
36+
}
37+
else {
38+
return await getLocalFileStreamAndMetadata(pathOrUrl);
39+
}
40+
}
41+
exports.getFileStreamAndMetadata = getFileStreamAndMetadata;
42+
function isUrl(pathOrUrl) {
43+
try {
44+
new URL(pathOrUrl);
45+
return true;
46+
}
47+
catch (_a) {
48+
return false;
49+
}
50+
}
51+
async function getLocalFileStreamAndMetadata(filePath) {
52+
const stats = await fs_1.promises.stat(filePath);
53+
const metadata = {
54+
size: stats.size,
55+
lastModified: stats.mtime,
56+
name: filePath.split("/").pop() || filePath.split("\\").pop(),
57+
};
58+
const stream = fs_1.createReadStream(filePath);
59+
return {
60+
stream,
61+
metadata,
62+
};
63+
}
64+
async function getRemoteFileStreamAndMetadata(url) {
65+
const response = await fetch(url);
66+
if (!response.ok) {
67+
throw new Error(`Failed to fetch ${url}: ${response.status} ${response.statusText}`);
68+
}
69+
const headers = response.headers;
70+
const contentLength = headers.get("content-length");
71+
const contentType = headers.get("content-type") || undefined;
72+
const lastModified = headers.get("last-modified")
73+
? new Date(headers.get("last-modified"))
74+
: undefined;
75+
const etag = headers.get("etag") || undefined;
76+
const urlObj = new URL(url);
77+
const name = urlObj.pathname.split("/").pop() || undefined;
78+
const baseMetadata = {
79+
contentType,
80+
lastModified,
81+
name,
82+
etag,
83+
};
84+
// If we have content-length, we can stream directly
85+
if (contentLength) {
86+
const metadata = {
87+
...baseMetadata,
88+
size: parseInt(contentLength, 10),
89+
};
90+
const stream = stream_1.Readable.fromWeb(response.body);
91+
return {
92+
stream,
93+
metadata,
94+
};
95+
}
96+
// No content-length header - need to download to temporary file to get size
97+
return await downloadToTemporaryFile(response, baseMetadata);
98+
}
99+
async function downloadToTemporaryFile(response, baseMetadata) {
100+
// Generate unique temporary file path
101+
const tempFileName = `file-stream-${Date.now()}-${Math.random().toString(36)
102+
.substring(2)}`;
103+
const tempFilePath = path_1.join(os_1.tmpdir(), tempFileName);
104+
// Download to temporary file
105+
const fileStream = fs_2.createWriteStream(tempFilePath);
106+
const webStream = stream_1.Readable.fromWeb(response.body);
107+
await promises_1.pipeline(webStream, fileStream);
108+
// Get file stats
109+
const stats = await fs_1.promises.stat(tempFilePath);
110+
const metadata = {
111+
...baseMetadata,
112+
size: stats.size,
113+
};
114+
// Create a readable stream that cleans up the temp file when done
115+
const stream = fs_1.createReadStream(tempFilePath);
116+
// Clean up temp file when stream is closed or ends
117+
const cleanup = async () => {
118+
try {
119+
await fs_1.promises.unlink(tempFilePath);
120+
}
121+
catch (_a) {
122+
// Ignore cleanup errors (file might already be deleted)
123+
}
124+
};
125+
stream.on("close", cleanup);
126+
stream.on("end", cleanup);
127+
stream.on("error", cleanup);
128+
return {
129+
stream,
130+
metadata,
131+
};
132+
}

platform/dist/index.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import axios, { transformConfigForOauth } from "./axios";
33
import { AxiosRequestConfig as AxiosConfig } from "axios";
44
export { axios, transformConfigForOauth, };
55
export { cloneSafe, jsonStringifySafe, } from "./utils";
6+
export { getFileStreamAndMetadata, getFileStream, } from "./file-stream";
7+
export type { FileMetadata, } from "./file-stream";
68
export { ConfigurationError, } from "./errors";
79
export { default as sqlProp, } from "./sql-prop";
810
export type { ColumnSchema, DbInfo, TableInfo, TableMetadata, TableSchema, } from "./sql-prop";

platform/dist/index.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ Object.defineProperty(exports, "transformConfigForOauth", { enumerable: true, ge
88
var utils_1 = require("./utils");
99
Object.defineProperty(exports, "cloneSafe", { enumerable: true, get: function () { return utils_1.cloneSafe; } });
1010
Object.defineProperty(exports, "jsonStringifySafe", { enumerable: true, get: function () { return utils_1.jsonStringifySafe; } });
11+
var file_stream_1 = require("./file-stream");
12+
Object.defineProperty(exports, "getFileStreamAndMetadata", { enumerable: true, get: function () { return file_stream_1.getFileStreamAndMetadata; } });
13+
Object.defineProperty(exports, "getFileStream", { enumerable: true, get: function () { return file_stream_1.getFileStream; } });
1114
var errors_1 = require("./errors");
1215
Object.defineProperty(exports, "ConfigurationError", { enumerable: true, get: function () { return errors_1.ConfigurationError; } });
1316
var sql_prop_1 = require("./sql-prop");

0 commit comments

Comments
 (0)