Demo with node stream? #375
-
It seems that all demos were working with browsers |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 4 replies
-
Indeed, the demos only rely on Web APIs. I would recommend to use the import { ZipWriter, TextReader } from "@zip.js/zip.js";
import { open } from "node:fs/promises";
import { Writable } from "node:stream";
const TEXT_CONTENT = "Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.";
const FILENAME = "lorem.txt";
await test();
async function test() {
const filehandle = await open("./lorem.zip", "w");
const writeStream = filehandle.createWriteStream();
const zipWriter = new ZipWriter(Writable.toWeb(writeStream));
await zipWriter.add(FILENAME, new TextReader(TEXT_CONTENT));
await zipWriter.close();
} I'm moving this issue into the discussions tab to keep it visible to people. |
Beta Was this translation helpful? Give feedback.
-
Any change from 2022? the code above is not working
|
Beta Was this translation helpful? Give feedback.
-
/***
* polyfill for zipjs
*/
const streams = require('web-streams-polyfill');
const { Readable, Writable } = require('stream');
/* eslint-disable no-undef */
// node under 18.0.0
if (typeof globalThis.ReadableStream === 'undefined') {
globalThis.ReadableStream = streams.ReadableStream;
globalThis.WritableStream = streams.WritableStream;
globalThis.TransformStream = streams.TransformStream;
}
// node under17.0.0
if (typeof Readable.toWeb === 'undefined') {
const { ReadableStream, WritableStream } = streams;
// 实现 Readable.toWeb
Readable.toWeb = function(nodeReadable) {
let closed = false;
let isNodeStreamEnded = false;
const highWaterMark = 16 * 1024;
const queue = [];
/**
* Queue length in bytes
*/
let queueLength = 0;
let pullRequest = false;
function close(controller) {
if (!closed) {
closed = true;
controller.close();
}
}
// This function will process any leftover bytes
const processLeftover = controller => {
const byobRequest = controller.byobRequest;
const chunk = queue.shift();
if (chunk) {
queueLength -= chunk.length;
}
if (!byobRequest) {
if (chunk) {
controller.enqueue(chunk);
return false;
}
if (isNodeStreamEnded) {
close(controller); // Signal EOF
}
return true;
}
const view = byobRequest.view;
if (!view) return true;
if (!chunk) {
if (isNodeStreamEnded) {
close(controller); // Signal EOF
byobRequest.respond(0); // Cancel BYOB request
return false;
}
return true;
}
const bytesToCopy = Math.min(view.byteLength, chunk.length);
new Uint8Array(view.buffer, view.byteOffset, bytesToCopy).set(
chunk.subarray(0, bytesToCopy)
);
byobRequest.respond(bytesToCopy);
if (bytesToCopy < chunk.length) {
const remainder = chunk.subarray(bytesToCopy);
queue.unshift(remainder);
queueLength += remainder.length;
}
if (chunk.length === 0 && isNodeStreamEnded) {
close(controller); // Signal EOF
byobRequest.respond(0); // Cancel BYOB request
}
return false;
};
return new ReadableStream({
type: 'bytes',
start(controller) {
nodeReadable.on('data', chunk => {
queue.push(chunk);
queueLength += chunk.length;
if (pullRequest) {
pullRequest = processLeftover(controller);
}
// Apply backpressure if needed.
if (!nodeReadable.isPaused()) {
if (queueLength > highWaterMark) {
nodeReadable.pause();
}
}
});
nodeReadable.once('end', () => {
isNodeStreamEnded = true;
processLeftover(controller);
});
nodeReadable.once('error', err => {
controller.error(err);
});
},
pull(controller) {
pullRequest = processLeftover(controller);
if (nodeReadable.isPaused() && queueLength < highWaterMark) {
nodeReadable.resume();
}
},
cancel(reason) {
closed = true; // Avoid controller is closed twice
nodeReadable.destroy(reason);
}
});
};
// 实现Writable.toWeb
Writable.toWeb = function(nodeWritable) {
let isWriting = false;
let pendingWrites = [];
const writableStream = new WritableStream({
write(chunk) {
return new Promise((resolve, reject) => {
const processWrite = () => {
if (!isWriting) {
isWriting = true;
let buffer;
if (chunk instanceof Uint8Array) {
buffer = Buffer.from(chunk);
} else {
buffer = Buffer.from(chunk.toString());
}
nodeWritable.write(buffer, err => {
isWriting = false;
if (err) {
reject(err);
nodeWritable.destroy(err);
} else {
resolve();
// 处理等待中的写入
if (pendingWrites.length > 0) {
const nextChunk = pendingWrites.shift();
processWrite(nextChunk);
}
}
});
} else {
pendingWrites.push(chunk);
}
};
processWrite();
});
},
close() {
return new Promise(resolve => {
nodeWritable.end(resolve);
});
},
abort(err) {
nodeWritable.destroy(err || new Error('Stream aborted'));
}
});
// 处理背压
nodeWritable.on('drain', () => {
if (writableStream.desiredSize > 0) {
writableStream.ready = Promise.resolve();
}
});
return writableStream;
};
}
// 导出方法
module.exports = { Readable, Writable }; |
Beta Was this translation helpful? Give feedback.
Indeed, the demos only rely on Web APIs. I would recommend to use the
Readable.toWeb()
/Readable.fromWeb()
/Writable.fromWeb()
/Writable.toWeb()
methods from thenode:stream
module in order to convert Node.js streams from/into Web streams. Here is an example of code below.