Support reading JSONL Streams #634
-
|
Hi, i have an API that support Streams and it responds chunks of data. It returns a big list as chunks, following JSONL format. import { Observer } from 'rxjs';
import { LinesTransformer } from './lines-transformer';
function parseJSON<T>(input: string): T {
return JSON.parse(input) as T;
}
export async function fetchJSONLines<T = unknown, TError=Error>(
input: RequestInfo | URL,
observer: Observer<T>,
init?: RequestInit,
) {
try {
const response = await fetch(input, init);
if (!response.ok) {
throw new Error(`HTTP error! Status: ${ response.status }`);
}
if (!response.body) {
throw new Error('Response body is null');
}
const ts = new TransformStream<Uint8Array, string>(new LinesTransformer());
const rs = response.body.pipeThrough(ts);
// Note: The reading and processing are handled by the transformer
// Just ensure the stream is fully read
const reader = rs.getReader();
// Initial read before entering the loop
let chunk = await reader.read();
while (!chunk.done) {
try {
const json = parseJSON<T>(chunk.value); // Parse each line of JSON here
observer.next(json);
} catch (e) {
observer.error(e as TError); // Handle JSON parsing errors here
}
chunk = await reader.read();
}
observer.complete();
} catch (e) {
observer.error(e as TError);
}
}export class LinesTransformer implements Transformer<Uint8Array, string> {
lastString: string = '';
decoder = new TextDecoder();
transform(chunk: Uint8Array, controller: TransformStreamDefaultController<string>) {
const string = `${ this.lastString }${ this.decoder.decode(chunk, { stream: true }) }`;
const lines = string.split(/\r\n|[\r\n]/g);
this.lastString = lines.pop() || '';
lines.forEach((line) => {
controller.enqueue(line);
});
}
flush(controller: TransformStreamDefaultController<string>) {
if (this.lastString) {
controller.enqueue(this.lastString);
}
}
} |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
|
ky returns standard const response = await ky.get('https://api.example.com/stream', {
headers: {
accept: 'application/x-ndjson'
}
});
// access the body stream just like with fetch
const stream = response.body
?.pipeThrough(new TextDecoderStream())
.pipeThrough(new LinesTransformer());
for await (const line of stream) {
if (line.trim()) {
const data = JSON.parse(line);
console.log(data);
}
}or with your RxJS setup: const response = await ky.get(url, {headers: {accept: 'application/x-ndjson'}});
const body$ = new Observable<string>(subscriber => {
const stream = response.body
?.pipeThrough(new TextDecoderStream())
.pipeThrough(new LinesTransformer());
// rest of your code...
});the key is that |
Beta Was this translation helpful? Give feedback.
ky returns standard
Responseobjects, so your streaming code works almost identically. just replacefetch()withky():or with your RxJS setup: