Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,9 @@ const tasks = {

## Re-using ports

Each `send` and `receive` port pair only support **one** `ConcurrentTask.Pool` subscribed at a time.
**Weird** things can happen if you have **two or more** `ConcurrentTask.Pool`s using the same ports at the same time.
For most cases a single `ConcurrentTask.Pool` is paired a `send` and `receive` port pair.

Generally this should not be needed, but if you have a use-case, please leave an [issue](https://github.com/andrewMacmurray/elm-concurrent-task/issues).
If you need multiple `Pool`s subscribed to a port pair at the same time or need to continually instantiate new `Pool`s (e.g. switching between pages), make sure each pool has a unique id with `ConcurrentTask.withPoolId`. Otherwise `Pool`s may receive results for other pools which can lead to some confusing states!

## Develop Locally

Expand Down
110 changes: 39 additions & 71 deletions runner/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,10 @@ export * from "./http/index.js";
export * from "./browser/index.js";

export interface ElmPorts {
send: {
subscribe: (
callback: (defs: TaskDefinition[] | Command) => Promise<void>
) => void;
};
receive: { send: (result: TaskResult[] | PoolId) => void };
send: { subscribe: (callback: (defs: TaskDefinition[]) => Promise<void>) => void; };
receive: { send: (result: TaskResult[]) => void };
}

export type Command = { command: "identify-pool" };

export type PoolId = { poolId: number };

export type Tasks = { [fn: string]: (arg: any) => any };

export interface TaskDefinition {
Expand Down Expand Up @@ -128,69 +120,49 @@ export function register(options: Options): void {
const tasks = createTasks(options);
const subscribe = options.ports.send.subscribe;
const send = options.ports.receive.send;
let poolId = 0;

function nextPoolId() {
poolId = cycleInt({ max: 1000 }, poolId);
}

subscribe(async (payload) => {
if ("command" in payload) {
switch (payload.command) {
case "identify-pool": {
// The Promise.resolve wrapper here prevents a race condition in Elm where sometimes the Model is not updated in time before the poolId is received
return Promise.resolve().then(() => {
send({ poolId });
nextPoolId();
});
}
default: {
throw new Error(`Unrecognised internal command: ${payload}`);
}
}
} else {
const debouncedSend = debounce(send, debounceThreshold(payload));

for (const def of payload) {
if (!tasks[def.function]) {
return debouncedSend({
attemptId: def.attemptId,
taskId: def.taskId,
result: {
error: {
reason: "missing_function",
message: `${def.function} is not registered`,
},
const debouncedSend = debounce(send, debounceThreshold(payload));

for (const def of payload) {
if (!tasks[def.function]) {
return debouncedSend({
attemptId: def.attemptId,
taskId: def.taskId,
result: {
error: {
reason: "missing_function",
message: `${def.function} is not registered`,
},
});
}
},
});
}
}

payload.map(async (def) => {
try {
logTaskStart(def, options);
const result = await tasks[def.function]?.(def.args);
logTaskFinish(def, options);
debouncedSend({
attemptId: def.attemptId,
taskId: def.taskId,
result: { value: result },
});
} catch (e) {
debouncedSend({
attemptId: def.attemptId,
taskId: def.taskId,
result: {
error: {
reason: "js_exception",
message: `${e.name}: ${e.message}`,
raw: e,
},
payload.map(async (def) => {
try {
logTaskStart(def, options);
const result = await tasks[def.function]?.(def.args);
logTaskFinish(def, options);
debouncedSend({
attemptId: def.attemptId,
taskId: def.taskId,
result: { value: result },
});
} catch (e) {
debouncedSend({
attemptId: def.attemptId,
taskId: def.taskId,
result: {
error: {
reason: "js_exception",
message: `${e.name}: ${e.message}`,
raw: e,
},
});
}
});
}
},
});
}
});
});
}

Expand Down Expand Up @@ -271,7 +243,3 @@ function prefixWith(prefix: string, tasks: Tasks): Tasks {
Object.entries(tasks).map(([key, fn]) => [`${prefix}${key}`, fn])
);
}

function cycleInt(options: { max: number }, i: number): number {
return i >= options.max ? 0 : i + 1;
}
24 changes: 22 additions & 2 deletions src/ConcurrentTask.elm
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module ConcurrentTask exposing
, race
, batch, sequence
, map, andMap, map2, map3, map4, map5
, attempt, attemptEach, Response(..), UnexpectedError(..), onProgress, Pool, pool
, attempt, attemptEach, Response(..), UnexpectedError(..), onProgress, Pool, pool, withPoolId
)

{-| A Task similar to `elm/core`'s `Task` but:
Expand Down Expand Up @@ -194,7 +194,7 @@ Here's a minimal complete example:
, subscriptions = subscriptions
}

@docs attempt, attemptEach, Response, UnexpectedError, onProgress, Pool, pool
@docs attempt, attemptEach, Response, UnexpectedError, onProgress, Pool, pool, withPoolId

-}

Expand Down Expand Up @@ -1264,3 +1264,23 @@ Right now it doesn't expose any functionality, but it could be used in the futur
pool : Pool msg
pool =
Internal.pool


{-| Add an id to a `Pool`.

Why? Because Pools can be instantiated multiple times (think switching pages in a Single Page App),
without a unique identifier a ConcurrentTask Pool may end up receiving responses for a ConcurrentTask pool that was previously discarded.

One example is a user switching back and forth between two pages:

- Page one has a long running task on `init`
- The user switches to page 2, then switches back to page 1
- A new long running task is started
- But the Pool can receive the response from the first long running task (which is unexpected behaviour)

Adding a different id to the `Pool` allows these previous responses to be ignored.

-}
withPoolId : Int -> Pool msg -> Pool msg
withPoolId =
Internal.withPoolId
Loading