Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 196 additions & 0 deletions platform/__tests__/file-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
const {
getFileStream, getFileStreamAndMetadata,
} = require("../dist");
const fs = require("fs");
const path = require("path");
const http = require("http");
const os = require("os");

// Helper function to read content from a readable stream
async function readStreamContent(stream) {
let content = "";
stream.on("data", (chunk) => {
content += chunk.toString();
});

await new Promise((resolve) => {
stream.on("end", resolve);
});

return content;
}

// Helper function to wait for stream cleanup by listening to close event
async function waitForStreamCleanup(stream) {
return new Promise((resolve) => {
stream.on("close", resolve);
});
}

describe("file-stream", () => {
let testFilePath;
let server;
const testPort = 3892;

beforeAll(() => {
// Create a test file
testFilePath = path.join(__dirname, "test-file.txt");
fs.writeFileSync(testFilePath, "test content for file stream");

// Create a simple HTTP server for testing remote files
server = http.createServer((req, res) => {
if (req.url === "/test-file.txt") {
res.writeHead(200, {
"Content-Type": "text/plain",
"Content-Length": "28",
"Last-Modified": new Date().toUTCString(),
"ETag": "\"test-etag\"",
});
res.end("test content for file stream");
} else if (req.url === "/no-content-length") {
res.writeHead(200, {
"Content-Type": "application/json",
"Last-Modified": new Date().toUTCString(),
});
res.end("{\"test\": \"data\"}");
} else if (req.url === "/error") {
res.writeHead(404, "Not Found");
res.end();
} else {
res.writeHead(404, "Not Found");
res.end();
}
});

return new Promise((resolve) =>
server.listen(testPort, resolve));
});

afterAll(() => {
// Clean up test file
if (fs.existsSync(testFilePath)) {
fs.unlinkSync(testFilePath);
}

if (server) {
return new Promise((resolve) =>
server.close(resolve));
}
});

describe("getFileStream", () => {
it("should return readable stream for local file", async () => {
const stream = await getFileStream(testFilePath);
expect(stream).toBeDefined();
expect(typeof stream.read).toBe("function");

const content = await readStreamContent(stream);
expect(content).toBe("test content for file stream");
});

it("should return readable stream for remote URL", async () => {
const stream = await getFileStream(`http://localhost:${testPort}/test-file.txt`);
expect(stream).toBeDefined();
expect(typeof stream.read).toBe("function");

const content = await readStreamContent(stream);
expect(content).toBe("test content for file stream");
});

it("should throw error for invalid URL", async () => {
await expect(getFileStream(`http://localhost:${testPort}/error`))
.rejects.toThrow("Failed to fetch");
});

it("should throw error for non-existent local file", async () => {
await expect(getFileStream("/non/existent/file.txt"))
.rejects.toThrow();
});
});

describe("getFileStreamAndMetadata", () => {
it("should return stream and metadata for local file", async () => {
const result = await getFileStreamAndMetadata(testFilePath);

expect(result.stream).toBeDefined();
expect(typeof result.stream.read).toBe("function");
expect(result.metadata).toMatchObject({
size: 28,
name: "test-file.txt",
});
expect(result.metadata.lastModified.constructor.name).toBe("Date");
const content = await readStreamContent(result.stream);
expect(content).toBe("test content for file stream");
});

it("should return stream and metadata for remote file with content-length", async () => {
const result = await getFileStreamAndMetadata(`http://localhost:${testPort}/test-file.txt`);

expect(result.stream).toBeDefined();
expect(typeof result.stream.read).toBe("function");
expect(result.metadata).toMatchObject({
size: 28,
contentType: "text/plain",
name: "test-file.txt",
etag: "\"test-etag\"",
});
expect(result.metadata.lastModified).toBeInstanceOf(Date);
const content = await readStreamContent(result.stream);
expect(content).toBe("test content for file stream");
});

it("should handle remote file without content-length", async () => {
const result = await getFileStreamAndMetadata(`http://localhost:${testPort}/no-content-length`);

expect(result.stream).toBeDefined();
expect(typeof result.stream.read).toBe("function");

expect(result.metadata).toMatchObject({
size: 16, // Size determined after download
contentType: "application/json",
});
expect(result.metadata.lastModified).toBeInstanceOf(Date);

const content = await readStreamContent(result.stream);
expect(content).toBe("{\"test\": \"data\"}");
});

it("should throw error for invalid remote URL", async () => {
await expect(getFileStreamAndMetadata(`http://localhost:${testPort}/error`))
.rejects.toThrow("Failed to fetch");
});
});

describe("temporary file cleanup", () => {
it("should clean up temporary files after stream ends", async () => {
const tmpDir = os.tmpdir();
const tempFilesBefore = fs.readdirSync(tmpDir);
const result = await getFileStreamAndMetadata(`http://localhost:${testPort}/no-content-length`);

const content = await readStreamContent(result.stream);
// Wait for cleanup to complete by listening to close event
await waitForStreamCleanup(result.stream);

// Check that temp files were cleaned up
const tempFilesAfter = fs.readdirSync(tmpDir);
expect(tempFilesAfter.length).toEqual(tempFilesBefore.length);
expect(content).toBe("{\"test\": \"data\"}");
});

it("should clean up temporary files on stream error", async () => {
// Check temp files before
const tmpDir = os.tmpdir();
const tempFilesBefore = fs.readdirSync(tmpDir);

const result = await getFileStreamAndMetadata(`http://localhost:${testPort}/no-content-length`);

// Trigger an error and wait for cleanup
result.stream.destroy(new Error("Test error"));
await waitForStreamCleanup(result.stream);

// Check that temp files were cleaned up
const tempFilesAfter = fs.readdirSync(tmpDir);
expect(tempFilesAfter.length).toEqual(tempFilesBefore.length);
});
});
});
22 changes: 22 additions & 0 deletions platform/dist/file-stream.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/// <reference types="node" />
import { Readable } from "stream";
export interface FileMetadata {
size: number;
contentType?: string;
lastModified?: Date;
name?: string;
etag?: string;
}
/**
* @param pathOrUrl - a file path or a URL
* @returns a Readable stream of the file content
*/
export declare function getFileStream(pathOrUrl: string): Promise<Readable>;
/**
* @param pathOrUrl - a file path or a URL
* @returns a Readable stream of the file content and its metadata
*/
export declare function getFileStreamAndMetadata(pathOrUrl: string): Promise<{
stream: Readable;
metadata: FileMetadata;
}>;
150 changes: 150 additions & 0 deletions platform/dist/file-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.getFileStreamAndMetadata = exports.getFileStream = void 0;
const stream_1 = require("stream");
const fs_1 = require("fs");
const os_1 = require("os");
const path_1 = require("path");
const promises_1 = require("stream/promises");
const uuid_1 = require("uuid");
const mime = require("mime-types");
/**
* @param pathOrUrl - a file path or a URL
* @returns a Readable stream of the file content
*/
async function getFileStream(pathOrUrl) {
if (isUrl(pathOrUrl)) {
const response = await fetch(pathOrUrl);
if (!response.ok || !response.body) {
throw new Error(`Failed to fetch ${pathOrUrl}: ${response.status} ${response.statusText}`);
}
return stream_1.Readable.fromWeb(response.body);
}
else {
await safeStat(pathOrUrl);
return fs_1.createReadStream(pathOrUrl);
}
}
exports.getFileStream = getFileStream;
/**
* @param pathOrUrl - a file path or a URL
* @returns a Readable stream of the file content and its metadata
*/
async function getFileStreamAndMetadata(pathOrUrl) {
if (isUrl(pathOrUrl)) {
return await getRemoteFileStreamAndMetadata(pathOrUrl);
}
else {
return await getLocalFileStreamAndMetadata(pathOrUrl);
}
}
exports.getFileStreamAndMetadata = getFileStreamAndMetadata;
function isUrl(pathOrUrl) {
try {
new URL(pathOrUrl);
return true;
}
catch (_a) {
return false;
}
}
async function safeStat(path) {
try {
return await fs_1.promises.stat(path);
}
catch (_a) {
throw new Error(`File not found: ${path}`);
}
}
async function getLocalFileStreamAndMetadata(filePath) {
const stats = await safeStat(filePath);
const contentType = mime.lookup(filePath) || undefined;
const metadata = {
size: stats.size,
lastModified: stats.mtime,
name: path_1.basename(filePath),
contentType,
};
const stream = fs_1.createReadStream(filePath);
return {
stream,
metadata,
};
}
async function getRemoteFileStreamAndMetadata(url) {
const response = await fetch(url);
if (!response.ok || !response.body) {
throw new Error(`Failed to fetch ${url}: ${response.status} ${response.statusText}`);
}
const headers = response.headers;
const contentLength = headers.get("content-length");
const lastModified = headers.get("last-modified")
? new Date(headers.get("last-modified"))
: undefined;
const etag = headers.get("etag") || undefined;
const urlObj = new URL(url);
const name = path_1.basename(urlObj.pathname);
const contentType = headers.get("content-type") || mime.lookup(urlObj.pathname) || undefined;
const baseMetadata = {
contentType,
lastModified,
name,
etag,
};
// If we have content-length, we can stream directly
if (contentLength) {
const metadata = {
...baseMetadata,
size: parseInt(contentLength, 10),
};
const stream = stream_1.Readable.fromWeb(response.body);
return {
stream,
metadata,
};
}
// No content-length header - need to download to temporary file to get size
return await downloadToTemporaryFile(response, baseMetadata);
}
async function downloadToTemporaryFile(response, baseMetadata) {
// Generate unique temporary file path
const tempFileName = `file-stream-${uuid_1.v4()}`;
const tempFilePath = path_1.join(os_1.tmpdir(), tempFileName);
// Download to temporary file
const fileStream = fs_1.createWriteStream(tempFilePath);
const webStream = stream_1.Readable.fromWeb(response.body);
try {
await promises_1.pipeline(webStream, fileStream);
const stats = await fs_1.promises.stat(tempFilePath);
const metadata = {
...baseMetadata,
size: stats.size,
};
const stream = fs_1.createReadStream(tempFilePath);
const cleanup = async () => {
try {
await fs_1.promises.unlink(tempFilePath);
}
catch (_a) {
// Ignore cleanup errors
}
};
stream.once("close", cleanup);
stream.once("end", cleanup);
stream.once("error", cleanup);
return {
stream,
metadata,
};
}
catch (err) {
// Cleanup on error
try {
await fs_1.promises.unlink(tempFilePath);
}
catch (_a) {
// Ignore cleanup errors
}
throw err;
}
}
2 changes: 2 additions & 0 deletions platform/dist/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import axios, { transformConfigForOauth } from "./axios";
import { AxiosRequestConfig as AxiosConfig } from "axios";
export { axios, transformConfigForOauth, };
export { cloneSafe, jsonStringifySafe, } from "./utils";
export { getFileStreamAndMetadata, getFileStream, } from "./file-stream";
export type { FileMetadata, } from "./file-stream";
export { ConfigurationError, } from "./errors";
export { default as sqlProp, } from "./sql-prop";
export type { ColumnSchema, DbInfo, TableInfo, TableMetadata, TableSchema, } from "./sql-prop";
Expand Down
3 changes: 3 additions & 0 deletions platform/dist/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ Object.defineProperty(exports, "transformConfigForOauth", { enumerable: true, ge
var utils_1 = require("./utils");
Object.defineProperty(exports, "cloneSafe", { enumerable: true, get: function () { return utils_1.cloneSafe; } });
Object.defineProperty(exports, "jsonStringifySafe", { enumerable: true, get: function () { return utils_1.jsonStringifySafe; } });
var file_stream_1 = require("./file-stream");
Object.defineProperty(exports, "getFileStreamAndMetadata", { enumerable: true, get: function () { return file_stream_1.getFileStreamAndMetadata; } });
Object.defineProperty(exports, "getFileStream", { enumerable: true, get: function () { return file_stream_1.getFileStream; } });
var errors_1 = require("./errors");
Object.defineProperty(exports, "ConfigurationError", { enumerable: true, get: function () { return errors_1.ConfigurationError; } });
var sql_prop_1 = require("./sql-prop");
Expand Down
Loading
Loading