-
Notifications
You must be signed in to change notification settings - Fork 69
Expand file tree
/
Copy pathuse-streams.ts
More file actions
142 lines (119 loc) · 3.61 KB
/
use-streams.ts
File metadata and controls
142 lines (119 loc) · 3.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import { useQuery } from "@tanstack/react-query";
import { useMemo } from "react";
import { useStudio } from "../studio/context";
const STREAMS_PAGE_SIZE = 1000;
interface StreamsApiItem {
created_at: string;
epoch: number;
expires_at: string | null;
name: string;
next_offset: string;
sealed_through: string;
uploaded_through: string;
}
export interface StudioStream {
createdAt: string;
epoch: number;
expiresAt: string | null;
name: string;
nextOffset: string;
sealedThrough: string;
uploadedThrough: string;
}
export interface UseStreamsArgs {
refreshIntervalMs?: number;
}
function isStreamsApiItem(value: unknown): value is StreamsApiItem {
if (typeof value !== "object" || value === null) {
return false;
}
const item = value as Partial<StreamsApiItem>;
return (
typeof item.created_at === "string" &&
typeof item.epoch === "number" &&
(item.expires_at === null || typeof item.expires_at === "string") &&
typeof item.name === "string" &&
typeof item.next_offset === "string" &&
typeof item.sealed_through === "string" &&
typeof item.uploaded_through === "string"
);
}
function createStreamsListUrl(streamsUrl: string | undefined): string {
const trimmed = streamsUrl?.trim();
if (!trimmed) {
return "";
}
const suffix = `/v1/streams?limit=${STREAMS_PAGE_SIZE}&offset=0`;
try {
const url = new URL(trimmed);
const pathname = url.pathname.replace(/\/+$/, "");
url.pathname = pathname.endsWith("/v1/streams")
? pathname
: `${pathname}/v1/streams`;
url.search = `?limit=${STREAMS_PAGE_SIZE}&offset=0`;
url.hash = "";
return url.toString();
} catch {
const pathname = trimmed.replace(/[?#].*$/, "").replace(/\/+$/, "");
return pathname.endsWith("/v1/streams")
? `${pathname}?limit=${STREAMS_PAGE_SIZE}&offset=0`
: `${pathname}${suffix}`;
}
}
function sortStreams(streams: StudioStream[]): StudioStream[] {
return [...streams].sort((left, right) =>
left.name.localeCompare(right.name),
);
}
export function useStreams(args?: UseStreamsArgs) {
const { streamsUrl } = useStudio();
const refreshIntervalMs = args?.refreshIntervalMs;
const streamsListUrl = useMemo(
() => createStreamsListUrl(streamsUrl),
[streamsUrl],
);
const hasStreamsServer = streamsListUrl.length > 0;
const query = useQuery<StudioStream[]>({
enabled: hasStreamsServer,
queryFn: async ({ signal }) => {
const response = await fetch(streamsListUrl, {
signal,
});
if (!response.ok) {
throw new Error(
`Failed loading streams (${response.status} ${response.statusText})`,
);
}
const payload = (await response.json()) as unknown;
if (!Array.isArray(payload) || !payload.every(isStreamsApiItem)) {
throw new Error("Streams server returned an invalid response shape.");
}
return sortStreams(
payload.map((stream) => ({
createdAt: stream.created_at,
epoch: stream.epoch,
expiresAt: stream.expires_at,
name: stream.name,
nextOffset: stream.next_offset,
sealedThrough: stream.sealed_through,
uploadedThrough: stream.uploaded_through,
})),
);
},
queryKey: ["streams", streamsListUrl],
refetchInterval:
typeof refreshIntervalMs === "number" && refreshIntervalMs > 0
? refreshIntervalMs
: false,
refetchOnReconnect: false,
refetchOnWindowFocus: false,
retry: false,
retryOnMount: false,
staleTime: Infinity,
});
return {
...query,
hasStreamsServer,
streams: query.data ?? [],
};
}