Skip to content

Commit 0552540

Browse files
authored
chore -cached loading (V3 omezarr support) step one (#205)
1 parent f75a4e7 commit 0552540

File tree

13 files changed

+373
-61
lines changed

13 files changed

+373
-61
lines changed

packages/core/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@alleninstitute/vis-core",
3-
"version": "0.0.4",
3+
"version": "0.0.5",
44
"contributors": [
55
{
66
"name": "Lane Sawyer",

packages/core/src/workers/worker-pool.ts

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export enum WorkerStatus {
2626

2727
export class WorkerPool {
2828
#workers: Worker[];
29-
#promises: Map<number, MessagePromise>;
29+
#promises: Map<string, MessagePromise>;
3030
#timeOfPreviousHeartbeat: Map<number, number>;
3131
#which: number;
3232

@@ -36,25 +36,37 @@ export class WorkerPool {
3636
for (let i = 0; i < size; i++) {
3737
this.#workers[i] = new Worker(workerModule, { type: 'module' });
3838
this.#workers[i].onmessage = (msg) => this.#handleMessage(i, msg);
39-
this.#timeOfPreviousHeartbeat.set(i, Date.now());
39+
this.#timeOfPreviousHeartbeat.set(i, 0);
4040
}
4141
this.#promises = new Map();
4242
this.#which = 0;
4343
}
44-
44+
/**
45+
* Warning - nothing in this class should be considered useable after
46+
* calling this method - any/all methods called should be expected to be
47+
* completely unreliable. dont call me unless you're about to dispose of all references to this object
48+
*/
49+
destroy() {
50+
for (let i = 0; i < this.#workers.length; i++) {
51+
this.#workers[i].terminate();
52+
}
53+
this.#workers = [];
54+
}
4555
#handleMessage(workerIndex: number, msg: MessageEvent<unknown>) {
4656
const { data } = msg;
47-
const messagePromise = this.#promises.get(workerIndex);
4857
if (isHeartbeatMessage(data)) {
4958
this.#timeOfPreviousHeartbeat.set(workerIndex, Date.now());
5059
return;
5160
}
52-
if (messagePromise === undefined) {
53-
logger.warn('unexpected message from worker');
54-
return;
55-
}
5661
if (isWorkerMessageWithId(data)) {
57-
this.#promises.delete(workerIndex);
62+
logger.debug(`worker ${workerIndex} responded to a message`);
63+
const { id } = data;
64+
const messagePromise = this.#promises.get(id);
65+
if (messagePromise === undefined) {
66+
logger.warn('unexpected message from worker');
67+
return;
68+
}
69+
this.#promises.delete(id);
5870
if (!messagePromise.validator(data)) {
5971
const reason = 'invalid response from worker: message type did not match expected type';
6072
logger.error(reason);
@@ -63,28 +75,32 @@ export class WorkerPool {
6375
}
6476
messagePromise.resolve(data);
6577
} else {
78+
logger.debug(`worker ${workerIndex} received an invalid message`);
6679
const reason = 'encountered an invalid message; skipping';
67-
logger.error(reason);
68-
messagePromise.reject(new Error(reason));
80+
logger.warn(reason);
6981
}
7082
}
7183

7284
#roundRobin() {
7385
this.#which = (this.#which + 1) % this.#workers.length;
7486
}
7587

76-
submitRequest(
88+
async submitRequest(
7789
message: WorkerMessage,
7890
responseValidator: MessageValidator<WorkerMessageWithId>,
7991
transfers: Transferable[],
8092
signal?: AbortSignal | undefined,
8193
): Promise<WorkerMessageWithId> {
94+
if (this.#workers.length < 1) {
95+
return Promise.reject('this woorker pool has been disposed');
96+
}
8297
const reqId = `rq${uuidv4()}`;
8398
const workerIndex = this.#which;
8499
const messageWithId = { ...message, id: reqId };
85100
const messagePromise = this.#createMessagePromise(responseValidator);
101+
logger.debug(`worker ${workerIndex} being handed a request`);
86102

87-
this.#promises.set(workerIndex, messagePromise);
103+
this.#promises.set(reqId, messagePromise);
88104

89105
if (signal) {
90106
signal.addEventListener('abort', () => {
@@ -126,7 +142,11 @@ export class WorkerPool {
126142
if (!this.#isValidIndex(workerIndex)) {
127143
throw new Error('invalid worker index');
128144
}
129-
const delta = Date.now() - (this.#timeOfPreviousHeartbeat.get(workerIndex) ?? 0);
145+
const lastHeartbeat = this.#timeOfPreviousHeartbeat.get(workerIndex) ?? 0;
146+
if (lastHeartbeat === 0) {
147+
return WorkerStatus.Unresponsive;
148+
}
149+
const delta = Date.now() - lastHeartbeat;
130150
if (delta && delta > 1500) {
131151
return WorkerStatus.Unresponsive;
132152
}

packages/omezarr/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@alleninstitute/vis-omezarr",
3-
"version": "0.0.14",
3+
"version": "0.0.15",
44
"contributors": [
55
{
66
"name": "Lane Sawyer",

packages/omezarr/src/index.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,16 @@ export {
5555
} from './zarr/loading';
5656

5757
export { type CancelRequest, type ZarrSliceRequest, makeOmeZarrSliceLoaderWorker } from './sliceview/worker-loader';
58+
export { decoderFactory } from './zarr/cache-lower';
59+
export { setupFetchDataWorker } from './zarr/cached-loading/fetch-data.worker-loader';
60+
export {
61+
type TransferrableRequestInit,
62+
type FetchMessagePayload,
63+
type FetchMessage,
64+
type FetchResponseMessage,
65+
type CancelMessage,
66+
isFetchMessage,
67+
isFetchResponseMessage,
68+
isCancelMessage,
69+
isCancellationError,
70+
} from './zarr/cached-loading/fetch-data.interface';
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import type { OmeZarrShapedDataset, OmeZarrMetadata } from './types';
2+
import { type ZarrRequest, buildQuery, loadZarrArrayFileFromStore } from './loading';
3+
import { VisZarrDataError } from '../errors';
4+
import * as zarr from 'zarrita';
5+
import { logger } from '@alleninstitute/vis-core';
6+
import { ZarrFetchStore, type CachingMultithreadedFetchStoreOptions } from './cached-loading/store';
7+
8+
export function decoderFactory(url: string, workerModule: URL, options?: CachingMultithreadedFetchStoreOptions) {
9+
const store = new ZarrFetchStore(url, workerModule, options);
10+
const getSlice = async (
11+
metadata: OmeZarrMetadata,
12+
req: ZarrRequest,
13+
level: OmeZarrShapedDataset,
14+
signal?: AbortSignal,
15+
) => {
16+
if (metadata.url !== url) {
17+
throw new Error(
18+
'trying to use a decoder from a different store - we cant do that yet, although we could build a map of url->stores here if we wanted later - TODO',
19+
);
20+
}
21+
const scene = metadata.attrs.multiscales[0];
22+
const { axes } = scene;
23+
if (!level) {
24+
const message = 'invalid Zarr data: no datasets found';
25+
logger.error(message);
26+
throw new VisZarrDataError(message);
27+
}
28+
const arr = metadata.arrays.find((a) => a.path === level.path);
29+
if (!arr) {
30+
const message = `cannot load slice: no array found for path [${level.path}]`;
31+
logger.error(message);
32+
throw new VisZarrDataError(message);
33+
}
34+
const { raw } = await loadZarrArrayFileFromStore(store, arr.path, metadata.zarrVersion, false);
35+
const result = await zarr.get(raw, buildQuery(req, axes, level.shape), { opts: { signal: signal ?? null } });
36+
if (typeof result === 'number') {
37+
throw new Error('oh noes, slice came back all weird');
38+
}
39+
const { shape, data } = result;
40+
if (typeof data !== 'object' || !('buffer' in data)) {
41+
throw new Error('slice was malformed, array-buffer response required');
42+
}
43+
// biome-ignore lint/suspicious/noExplicitAny: <hard to prove - but the typeof check above is sufficient for this to be safe>
44+
return { shape, data: new Float32Array(data as any) };
45+
};
46+
return {
47+
decoder: getSlice,
48+
destroy: () => {
49+
store.destroy();
50+
},
51+
};
52+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import type { AbsolutePath, RangeQuery } from 'zarrita';
2+
import z from 'zod';
3+
4+
export type TransferrableRequestInit = Omit<RequestInit, 'body' | 'headers' | 'signal'> & {
5+
body?: string;
6+
headers?: [string, string][] | Record<string, string>;
7+
};
8+
9+
export type FetchMessagePayload = {
10+
rootUrl: string;
11+
path: AbsolutePath;
12+
range?: RangeQuery | undefined;
13+
options?: TransferrableRequestInit | undefined;
14+
};
15+
16+
export const FETCH_MESSAGE_TYPE = 'fetch' as const;
17+
export const FETCH_RESPONSE_MESSAGE_TYPE = 'fetch-response' as const;
18+
export const CANCEL_MESSAGE_TYPE = 'cancel' as const;
19+
20+
export type FetchMessage = {
21+
type: typeof FETCH_MESSAGE_TYPE;
22+
id: string;
23+
payload: FetchMessagePayload;
24+
};
25+
26+
export type FetchResponseMessage = {
27+
type: typeof FETCH_RESPONSE_MESSAGE_TYPE;
28+
id: string;
29+
payload: ArrayBufferLike | undefined;
30+
};
31+
32+
export type CancelMessage = {
33+
type: typeof CANCEL_MESSAGE_TYPE;
34+
id: string;
35+
};
36+
37+
const FetchMessagePayloadSchema = z.object({
38+
rootUrl: z.string().nonempty(),
39+
path: z.string().nonempty().startsWith('/'),
40+
range: z
41+
.union([
42+
z.object({
43+
offset: z.number(),
44+
length: z.number(),
45+
}),
46+
z.object({
47+
suffixLength: z.number(),
48+
}),
49+
])
50+
.optional(),
51+
options: z.unknown().optional(), // being "lazy" for now; doing a full schema for this could be complex and fragile
52+
});
53+
54+
const FetchMessageSchema = z.object({
55+
type: z.literal(FETCH_MESSAGE_TYPE),
56+
id: z.string().nonempty(),
57+
payload: FetchMessagePayloadSchema,
58+
});
59+
60+
const FetchResponseMessageSchema = z.object({
61+
type: z.literal(FETCH_RESPONSE_MESSAGE_TYPE),
62+
id: z.string().nonempty(),
63+
payload: z.unknown().optional(), // unclear if it's feasible/wise to define a schema for this one
64+
});
65+
66+
const CancelMessageSchema = z.object({
67+
type: z.literal(CANCEL_MESSAGE_TYPE),
68+
id: z.string().nonempty(),
69+
});
70+
71+
export function isFetchMessage(val: unknown): val is FetchMessage {
72+
return FetchMessageSchema.safeParse(val).success;
73+
}
74+
75+
export function isFetchResponseMessage(val: unknown): val is FetchResponseMessage {
76+
return FetchResponseMessageSchema.safeParse(val).success;
77+
}
78+
79+
export function isCancelMessage(val: unknown): val is CancelMessage {
80+
return CancelMessageSchema.safeParse(val).success;
81+
}
82+
83+
export function isCancellationError(err: unknown): boolean {
84+
return (
85+
err === 'cancelled' ||
86+
(typeof err === 'object' &&
87+
err !== null &&
88+
(('name' in err && err.name === 'AbortError') || ('code' in err && err.code === 20)))
89+
);
90+
}

0 commit comments

Comments
 (0)