Skip to content

Commit 071a6de

Browse files
committed
bump api.ts
1 parent 4ab0b05 commit 071a6de

File tree

1 file changed

+107
-22
lines changed

1 file changed

+107
-22
lines changed

api.ts

Lines changed: 107 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,23 @@ export type ItemId = ItemIdV1;
1212
export interface Base {
1313
_id: ItemId;
1414
name: string;
15+
time: number;
1516
description?: string;
1617

1718
owner?: ItemId;
1819
group?: ItemId;
20+
21+
links?: Record<string, Link[]>;
1922
}
2023

2124
export type Fresh = Omit<Base, "_id">;
2225

26+
export interface Link {
27+
session: string;
28+
time: number;
29+
id: ItemId;
30+
}
31+
2332

2433
/* ----- api/v1/models ----- */
2534

@@ -424,7 +433,8 @@ export class NginxError extends ApiError {
424433
}
425434

426435
export function nodeReq<Q = unknown, S = unknown>(
427-
opts: RequestOptions<Q>
436+
opts: RequestOptions<Q>,
437+
{ returnStream }: { returnStream?: boolean } = {}
428438
): Promise<S> {
429439
const { protocol, hostname, port, pathname, search } = new URL(opts.url);
430440

@@ -446,6 +456,10 @@ export function nodeReq<Q = unknown, S = unknown>(
446456
const req = proto.request(
447457
options,
448458
(res: EventEmitter0 & { statusCode: number }) => {
459+
if (returnStream) {
460+
resolve(res as any);
461+
return;
462+
}
449463
let resp = "";
450464
res.on("data", (chunk: string) => (resp += chunk.toString()));
451465
res.on("end", () => {
@@ -487,11 +501,7 @@ function tryGetFetch(): (
487501
input: RequestInfo,
488502
init?: RequestInit
489503
) => Promise<Response> {
490-
try {
491-
return require("node-fetch");
492-
} catch (_) {
493-
return (globalThis as any)["fetch"] as any;
494-
}
504+
return (globalThis as any)["fetch"] as any;
495505
}
496506

497507
class EventEmitter0 {
@@ -516,6 +526,12 @@ function read(response: ReadableStream<Uint8Array>) {
516526
let cancellationToken: { cancel: () => void };
517527
let cancellationRequest = false;
518528

529+
const deno = (globalThis as any)["Deno"] as any;
530+
let streamOptions = { stream: true } as any;
531+
if (deno) {
532+
streamOptions = {};
533+
}
534+
519535
const events = new EventEmitter0();
520536
const stream = new ReadableStream({
521537
start(controller) {
@@ -547,7 +563,7 @@ function read(response: ReadableStream<Uint8Array>) {
547563
return;
548564
}
549565

550-
const data = decoder.decode(result.value, { stream: true } as any);
566+
const data = decoder.decode(result.value, streamOptions);
551567
buf += data;
552568
totalBytes += buf.length;
553569

@@ -667,27 +683,87 @@ export async function req<Q = unknown, S = unknown>(
667683
}
668684

669685
function nodeReadStream(stream: any) {
670-
const JSONStream = require("JSONStream");
671686
const events = new EventEmitter0();
687+
const batches = [];
672688

673-
const s0 = JSONStream.parse();
674-
const s1 = stream.pipe(s0);
675-
689+
let batch = [];
676690
let totalBytes = 0;
677691

692+
const batchInterval = setInterval(() => {
693+
const stats = { totalBytes };
694+
events.emit("batch", { batch, stats });
695+
batches.push(batch);
696+
batch = [];
697+
}, 500);
698+
699+
let prevTail = "";
700+
678701
stream.on("data", (data: string) => {
679702
totalBytes += data.length;
680-
const stats = { bytes: data.length, totalBytes };
681-
events.emit("batch", { stats });
703+
704+
let text = data.toString();
705+
706+
if (text.startsWith("[\n")) {
707+
text = text.slice(1);
708+
events.emit("start");
709+
}
710+
711+
if (prevTail) {
712+
text = `${prevTail}${text}`;
713+
prevTail = "";
714+
}
715+
716+
let lines = text.split("\n,\n").filter((l) => !!l);
717+
const last = lines[lines.length - 1];
718+
719+
if (last && !last.endsWith("]\n")) {
720+
try {
721+
JSON.parse(last);
722+
} catch (err) {
723+
prevTail = last;
724+
lines = lines.slice(0, -1);
725+
}
726+
}
727+
if (last && last.endsWith("]\n")) {
728+
lines[lines.length - 1] = last.slice(0, -2);
729+
}
730+
731+
lines = lines.filter((l) => l != "\n");
732+
733+
for (let l of lines) {
734+
if (l.startsWith(",\n")) {
735+
l = l.slice(2);
736+
}
737+
if (l === "\n]\n") {
738+
continue;
739+
}
740+
batch.push(JSON.parse(l));
741+
}
742+
});
743+
744+
stream.on("error", (err) => {
745+
events.emit("end", err);
746+
clearInterval(batchInterval);
682747
});
683748

684749
stream.on("end", () => {
685750
events.emit("end");
751+
clearInterval(batchInterval);
686752
});
687753

688754
return {
689755
events,
690-
stream: s1,
756+
stream,
757+
readAll: () =>
758+
new Promise((resolve, reject) => {
759+
events.on("error", reject);
760+
events.on("end", () => {
761+
if (batch.length) {
762+
batches.push(batch);
763+
}
764+
resolve([].concat(...batches));
765+
});
766+
}),
691767
};
692768
}
693769

@@ -697,7 +773,9 @@ export async function reqReader<Q = unknown, S = unknown>(
697773
const _fetch = tryGetFetch();
698774

699775
if (typeof _fetch !== "function") {
700-
throw new Error("readable stream available for fetch api only");
776+
/* try with node streams */
777+
const resp = await nodeReq(opts, { returnStream: true });
778+
return nodeReadStream(resp) as StreamReader<S>;
701779
}
702780

703781
const resp = await _fetch(opts.url, {
@@ -710,10 +788,6 @@ export async function reqReader<Q = unknown, S = unknown>(
710788
throw ApiError.fromJson(opts, await resp.json(), resp.status);
711789
}
712790

713-
if (typeof ReadableStream === "undefined") {
714-
return nodeReadStream(resp.body);
715-
}
716-
717791
return read(resp.body!);
718792
}
719793

@@ -776,10 +850,13 @@ export interface MoreTypedClient {
776850
export interface Client {
777851
new (opts?: ClientOpts): Client;
778852

779-
get<T = unknown>(path: string): Promise<T[]>;
853+
get<T = unknown>(path: string): Promise<T>;
780854
post<T = unknown>(path: string, data: Partial<T>): Promise<T>;
781855
patch<T = unknown>(path: string, data: Partial<T>): Promise<T>;
782856
delete<T = unknown>(path: string): Promise<T>;
857+
858+
/** query params stringification helper */
859+
qs(params: Record<string, string | string[] | number>): string;
783860
}
784861

785862
export class Client implements Client {
@@ -862,9 +939,17 @@ export class Client implements Client {
862939
return new Client({ ...(this._opts || {}), ...opts });
863940
}
864941

865-
/* ↓ not part of api right now */
942+
qs(params: Record<string, string | string[] | number>): string {
943+
const u = new URL("/", "http://localhost");
944+
for (const [k, v] of Object.entries(params || {})) {
945+
u.searchParams.set(k, (v || "").toString());
946+
}
947+
return u.search.replace("?", "");
948+
}
866949

867-
read<T = unknown>(path: string): Promise<StreamReader<T>> {
950+
/* ↓ not part of api right now,
951+
very unstable, so experimental, much discouraged */
952+
getStream<T = unknown>(path: string): Promise<StreamReader<T>> {
868953
const url = this.resolveUrl(path);
869954
url.searchParams.set("streamed", "true");
870955
url.searchParams.set("nolimit", "true");

0 commit comments

Comments
 (0)