-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathfetch-data.worker-loader.ts
More file actions
124 lines (110 loc) · 4.1 KB
/
fetch-data.worker-loader.ts
File metadata and controls
124 lines (110 loc) · 4.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// a web-worker which fetches slices of data, decodes them, and returns the result as a flat float32 array, using transferables
import { HEARTBEAT_RATE_MS, logger } from '@alleninstitute/vis-core';
import { type AbsolutePath, FetchStore, type RangeQuery } from 'zarrita';
import type { CancelMessage, FetchMessage, TransferrableRequestInit } from './fetch-data.interface';
import {
FETCH_RESPONSE_MESSAGE_TYPE,
isCancellationError,
isCancelMessage,
isFetchMessage,
} from './fetch-data.interface';
const NUM_RETRIES = 2;
const RETRY_DELAY_MS = 500;
const fetchFile = async (
rootUrl: string,
path: AbsolutePath,
options?: TransferrableRequestInit | undefined,
abortController?: AbortController | undefined,
): Promise<Uint8Array | undefined> => {
const store = new FetchStore(rootUrl);
return store.get(path, { ...(options || {}), signal: abortController?.signal ?? null });
};
const fetchSlice = async (
rootUrl: string,
path: AbsolutePath,
range: RangeQuery,
options?: TransferrableRequestInit | undefined,
abortController?: AbortController | undefined,
): Promise<Uint8Array | undefined> => {
const store = new FetchStore(rootUrl);
const wait = async (ms: number) =>
new Promise((resolve) => {
setTimeout(resolve, ms);
});
for (let i = 0; i < NUM_RETRIES; i++) {
try {
return await store.getRange(path, range, { ...(options || {}), signal: abortController?.signal ?? null });
} catch (e) {
logger.error('getRange request failed:', e);
const hasRetries = i < NUM_RETRIES - 1;
const message = `getRange request ${i < NUM_RETRIES - 1 ? `will retry in ${RETRY_DELAY_MS}ms` : 'has no retries left'}`;
logger.warn(message);
if (hasRetries) {
await wait(RETRY_DELAY_MS);
}
}
}
return undefined;
};
const handleFetch = (message: FetchMessage, abortControllers: Record<string, AbortController>) => {
const { id, payload } = message;
const { rootUrl, path, range, options } = payload;
if (id in abortControllers) {
logger.error('cannot send message: request ID already in use');
return;
}
const abort = new AbortController();
abortControllers[id] = abort;
const fetchFn =
range !== undefined
? () => fetchSlice(rootUrl, path, range, options, abort)
: () => fetchFile(rootUrl, path, options, abort);
fetchFn()
.then((result: Uint8Array | undefined) => {
const buffer = result?.buffer;
const options = buffer !== undefined ? { transfer: [buffer] } : {};
self.postMessage(
{
type: FETCH_RESPONSE_MESSAGE_TYPE,
id,
payload: result?.buffer,
},
{ ...options },
);
})
.catch((e) => {
if (!isCancellationError(e)) {
logger.error('error in slice fetch worker: ', e);
}
// can ignore if it is a cancellation error
});
};
const handleCancel = (message: CancelMessage, abortControllers: Record<string, AbortController>) => {
const { id } = message;
const abortController = abortControllers[id];
if (!abortController) {
logger.warn('attempted to cancel a non-existent request');
} else {
abortController.abort('cancelled');
}
};
const startHeartbeat = () =>
setInterval(() => {
self.postMessage({ type: 'heartbeat' });
}, HEARTBEAT_RATE_MS);
const setupOnMessage = () => {
const abortControllers: Record<string, AbortController> = {};
const onmessage = async (e: MessageEvent<unknown>) => {
const { data: message } = e;
if (isFetchMessage(message)) {
handleFetch(message, abortControllers);
} else if (isCancelMessage(message)) {
handleCancel(message, abortControllers);
}
};
return onmessage;
};
export const setupFetchDataWorker = (ctx: typeof self) => {
ctx.onmessage = setupOnMessage();
return { startHeartbeat };
};