Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions packages/core/src/workers/worker-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export enum WorkerStatus {

export class WorkerPool {
#workers: Worker[];
#promises: Map<number, MessagePromise>;
#promises: Map<string, MessagePromise>;
#timeOfPreviousHeartbeat: Map<number, number>;
#which: number;

Expand All @@ -36,25 +36,37 @@ export class WorkerPool {
for (let i = 0; i < size; i++) {
this.#workers[i] = new Worker(workerModule, { type: 'module' });
this.#workers[i].onmessage = (msg) => this.#handleMessage(i, msg);
this.#timeOfPreviousHeartbeat.set(i, Date.now());
this.#timeOfPreviousHeartbeat.set(i, 0);
}
this.#promises = new Map();
this.#which = 0;
}

/**
* Warning - nothing in this class should be considered useable after
* calling this method - any/all methods called should be expected to be
* completely unreliable. dont call me unless you're about to dispose of all references to this object
*/
destroy() {
for (let i = 0; i < this.#workers.length; i++) {
this.#workers[i].terminate();
}
this.#workers = [];
}
#handleMessage(workerIndex: number, msg: MessageEvent<unknown>) {
const { data } = msg;
const messagePromise = this.#promises.get(workerIndex);
if (isHeartbeatMessage(data)) {
this.#timeOfPreviousHeartbeat.set(workerIndex, Date.now());
return;
}
if (messagePromise === undefined) {
logger.warn('unexpected message from worker');
return;
}
if (isWorkerMessageWithId(data)) {
this.#promises.delete(workerIndex);
logger.debug(`worker ${workerIndex} responded to a message`);
const { id } = data;
const messagePromise = this.#promises.get(id);
if (messagePromise === undefined) {
logger.warn('unexpected message from worker');
return;
}
this.#promises.delete(id);
if (!messagePromise.validator(data)) {
const reason = 'invalid response from worker: message type did not match expected type';
logger.error(reason);
Expand All @@ -63,28 +75,32 @@ export class WorkerPool {
}
messagePromise.resolve(data);
} else {
logger.debug(`worker ${workerIndex} received an invalid message`);
const reason = 'encountered an invalid message; skipping';
logger.error(reason);
messagePromise.reject(new Error(reason));
logger.warn(reason);
}
}

#roundRobin() {
this.#which = (this.#which + 1) % this.#workers.length;
}

submitRequest(
async submitRequest(
message: WorkerMessage,
responseValidator: MessageValidator<WorkerMessageWithId>,
transfers: Transferable[],
signal?: AbortSignal | undefined,
): Promise<WorkerMessageWithId> {
if (this.#workers.length < 1) {
return Promise.reject('this woorker pool has been disposed');
}
const reqId = `rq${uuidv4()}`;
const workerIndex = this.#which;
const messageWithId = { ...message, id: reqId };
const messagePromise = this.#createMessagePromise(responseValidator);
logger.debug(`worker ${workerIndex} being handed a request`);

this.#promises.set(workerIndex, messagePromise);
this.#promises.set(reqId, messagePromise);

if (signal) {
signal.addEventListener('abort', () => {
Expand Down Expand Up @@ -126,7 +142,11 @@ export class WorkerPool {
if (!this.#isValidIndex(workerIndex)) {
throw new Error('invalid worker index');
}
const delta = Date.now() - (this.#timeOfPreviousHeartbeat.get(workerIndex) ?? 0);
const lastHeartbeat = this.#timeOfPreviousHeartbeat.get(workerIndex) ?? 0;
if (lastHeartbeat === 0) {
return WorkerStatus.Unresponsive;
}
const delta = Date.now() - lastHeartbeat;
if (delta && delta > 1500) {
return WorkerStatus.Unresponsive;
}
Expand Down
13 changes: 13 additions & 0 deletions packages/omezarr/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,16 @@ export {
} from './zarr/loading';

export { type CancelRequest, type ZarrSliceRequest, makeOmeZarrSliceLoaderWorker } from './sliceview/worker-loader';
export { decoderFactory } from './zarr/cache-lower';
export { setupFetchDataWorker } from './zarr/cached-loading/fetch-data.worker-loader';
export {
type TransferrableRequestInit,
type FetchMessagePayload,
type FetchMessage,
type FetchResponseMessage,
type CancelMessage,
isFetchMessage,
isFetchResponseMessage,
isCancelMessage,
isCancellationError,
} from './zarr/cached-loading/fetch-data.interface';
52 changes: 52 additions & 0 deletions packages/omezarr/src/zarr/cache-lower.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import type { OmeZarrShapedDataset, OmeZarrMetadata } from './types';
import { type ZarrRequest, buildQuery, loadZarrArrayFileFromStore } from './loading';
import { VisZarrDataError } from '../errors';
import * as zarr from 'zarrita';
import { logger } from '@alleninstitute/vis-core';
import { ZarrFetchStore, type CachingMultithreadedFetchStoreOptions } from './cached-loading/store';

export function decoderFactory(url: string, workerModule: URL, options?: CachingMultithreadedFetchStoreOptions) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file is the only (interesting) non-cherry-picked change here - the rest is directly plucked from #203

const store = new ZarrFetchStore(url, workerModule, options);
const getSlice = async (
metadata: OmeZarrMetadata,
req: ZarrRequest,
level: OmeZarrShapedDataset,
signal?: AbortSignal,
) => {
if (metadata.url !== url) {
throw new Error(
'trying to use a decoder from a different store - we cant do that yet, although we could build a map of url->stores here if we wanted later - TODO',
);
}
const scene = metadata.attrs.multiscales[0];
const { axes } = scene;
if (!level) {
const message = 'invalid Zarr data: no datasets found';
logger.error(message);
throw new VisZarrDataError(message);
}
const arr = metadata.arrays.find((a) => a.path === level.path);
if (!arr) {
const message = `cannot load slice: no array found for path [${level.path}]`;
logger.error(message);
throw new VisZarrDataError(message);
}
const { raw } = await loadZarrArrayFileFromStore(store, arr.path, metadata.zarrVersion, false);
const result = await zarr.get(raw, buildQuery(req, axes, level.shape), { opts: { signal: signal ?? null } });
if (typeof result === 'number') {
throw new Error('oh noes, slice came back all weird');
}
const { shape, data } = result;
if (typeof data !== 'object' || !('buffer' in data)) {
throw new Error('slice was malformed, array-buffer response required');
}
// biome-ignore lint/suspicious/noExplicitAny: <hard to prove - but the typeof check above is sufficient for this to be safe>
return { shape, data: new Float32Array(data as any) };
};
return {
decoder: getSlice,
destroy: () => {
store.destroy();
},
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import type { AbsolutePath, RangeQuery } from 'zarrita';
import z from 'zod';

export type TransferrableRequestInit = Omit<RequestInit, 'body' | 'headers' | 'signal'> & {
body?: string;
headers?: [string, string][] | Record<string, string>;
};

export type FetchMessagePayload = {
rootUrl: string;
path: AbsolutePath;
range?: RangeQuery | undefined;
options?: TransferrableRequestInit | undefined;
};

export const FETCH_MESSAGE_TYPE = 'fetch' as const;
export const FETCH_RESPONSE_MESSAGE_TYPE = 'fetch-response' as const;
export const CANCEL_MESSAGE_TYPE = 'cancel' as const;

export type FetchMessage = {
type: typeof FETCH_MESSAGE_TYPE;
id: string;
payload: FetchMessagePayload;
};

export type FetchResponseMessage = {
type: typeof FETCH_RESPONSE_MESSAGE_TYPE;
id: string;
payload: ArrayBufferLike | undefined;
};

export type CancelMessage = {
type: typeof CANCEL_MESSAGE_TYPE;
id: string;
};

const FetchMessagePayloadSchema = z.object({
rootUrl: z.string().nonempty(),
path: z.string().nonempty().startsWith('/'),
range: z
.union([
z.object({
offset: z.number(),
length: z.number(),
}),
z.object({
suffixLength: z.number(),
}),
])
.optional(),
options: z.unknown().optional(), // being "lazy" for now; doing a full schema for this could be complex and fragile
});

const FetchMessageSchema = z.object({
type: z.literal(FETCH_MESSAGE_TYPE),
id: z.string().nonempty(),
payload: FetchMessagePayloadSchema,
});

const FetchResponseMessageSchema = z.object({
type: z.literal(FETCH_RESPONSE_MESSAGE_TYPE),
id: z.string().nonempty(),
payload: z.unknown().optional(), // unclear if it's feasible/wise to define a schema for this one
});

const CancelMessageSchema = z.object({
type: z.literal(CANCEL_MESSAGE_TYPE),
id: z.string().nonempty(),
});

export function isFetchMessage(val: unknown): val is FetchMessage {
return FetchMessageSchema.safeParse(val).success;
}

export function isFetchResponseMessage(val: unknown): val is FetchResponseMessage {
return FetchResponseMessageSchema.safeParse(val).success;
}

export function isCancelMessage(val: unknown): val is CancelMessage {
return CancelMessageSchema.safeParse(val).success;
}

export function isCancellationError(err: unknown): boolean {
return (
err === 'cancelled' ||
(typeof err === 'object' &&
err !== null &&
(('name' in err && err.name === 'AbortError') || ('code' in err && err.code === 20)))
);
}
124 changes: 124 additions & 0 deletions packages/omezarr/src/zarr/cached-loading/fetch-data.worker-loader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// a web-worker which fetches slices of data, decodes them, and returns the result as a flat float32 array, using transferables

import { HEARTBEAT_RATE_MS, logger } from '@alleninstitute/vis-core';
import { type AbsolutePath, FetchStore, type RangeQuery } from 'zarrita';
import type { CancelMessage, FetchMessage, TransferrableRequestInit } from './fetch-data.interface';
import {
FETCH_RESPONSE_MESSAGE_TYPE,
isCancellationError,
isCancelMessage,
isFetchMessage,
} from './fetch-data.interface';

const NUM_RETRIES = 2;
const RETRY_DELAY_MS = 500;

const fetchFile = async (
rootUrl: string,
path: AbsolutePath,
options?: TransferrableRequestInit | undefined,
abortController?: AbortController | undefined,
): Promise<Uint8Array | undefined> => {
const store = new FetchStore(rootUrl);
return store.get(path, { ...(options || {}), signal: abortController?.signal ?? null });
};

const fetchSlice = async (
rootUrl: string,
path: AbsolutePath,
range: RangeQuery,
options?: TransferrableRequestInit | undefined,
abortController?: AbortController | undefined,
): Promise<Uint8Array | undefined> => {
const store = new FetchStore(rootUrl);
const wait = async (ms: number) =>
new Promise((resolve) => {
setTimeout(resolve, ms);
});
for (let i = 0; i < NUM_RETRIES; i++) {
try {
return await store.getRange(path, range, { ...(options || {}), signal: abortController?.signal ?? null });
} catch (e) {
logger.error('getRange request failed:', e);
const hasRetries = i < NUM_RETRIES - 1;
const message = `getRange request ${i < NUM_RETRIES - 1 ? `will retry in ${RETRY_DELAY_MS}ms` : 'has no retries left'}`;
logger.warn(message);
if (hasRetries) {
await wait(RETRY_DELAY_MS);
}
}
}
return undefined;
};

const handleFetch = (message: FetchMessage, abortControllers: Record<string, AbortController>) => {
const { id, payload } = message;
const { rootUrl, path, range, options } = payload;

if (id in abortControllers) {
logger.error('cannot send message: request ID already in use');
return;
}

const abort = new AbortController();
abortControllers[id] = abort;

const fetchFn =
range !== undefined
? () => fetchSlice(rootUrl, path, range, options, abort)
: () => fetchFile(rootUrl, path, options, abort);

fetchFn()
.then((result: Uint8Array | undefined) => {
const buffer = result?.buffer;
const options = buffer !== undefined ? { transfer: [buffer] } : {};
self.postMessage(
{
type: FETCH_RESPONSE_MESSAGE_TYPE,
id,
payload: result?.buffer,
},
{ ...options },
);
})
.catch((e) => {
if (!isCancellationError(e)) {
logger.error('error in slice fetch worker: ', e);
}
// can ignore if it is a cancellation error
});
};

const handleCancel = (message: CancelMessage, abortControllers: Record<string, AbortController>) => {
const { id } = message;
const abortController = abortControllers[id];
if (!abortController) {
logger.warn('attempted to cancel a non-existent request');
} else {
abortController.abort('cancelled');
}
};

const startHeartbeat = () =>
setInterval(() => {
self.postMessage({ type: 'heartbeat' });
}, HEARTBEAT_RATE_MS);

const setupOnMessage = () => {
const abortControllers: Record<string, AbortController> = {};
const onmessage = async (e: MessageEvent<unknown>) => {
const { data: message } = e;

if (isFetchMessage(message)) {
handleFetch(message, abortControllers);
} else if (isCancelMessage(message)) {
handleCancel(message, abortControllers);
}
};
return onmessage;
};

export const setupFetchDataWorker = (ctx: typeof self) => {
ctx.onmessage = setupOnMessage();
return { startHeartbeat };
};
Loading