diff --git a/README.md b/README.md index 2f0ec55..3f8c8d5 100644 --- a/README.md +++ b/README.md @@ -368,10 +368,9 @@ const tasks = { ## Re-using ports -Each `send` and `receive` port pair only support **one** `ConcurrentTask.Pool` subscribed at a time. -**Weird** things can happen if you have **two or more** `ConcurrentTask.Pool`s using the same ports at the same time. +For most cases a single `ConcurrentTask.Pool` is paired a `send` and `receive` port pair. -Generally this should not be needed, but if you have a use-case, please leave an [issue](https://github.com/andrewMacmurray/elm-concurrent-task/issues). +If you need multiple `Pool`s subscribed to a port pair at the same time or need to continually instantiate new `Pool`s (e.g. switching between pages), make sure each pool has a unique id with `ConcurrentTask.withPoolId`. Otherwise `Pool`s may receive results for other pools which can lead to some confusing states! ## Develop Locally diff --git a/runner/index.ts b/runner/index.ts index d171bc6..3580b8f 100644 --- a/runner/index.ts +++ b/runner/index.ts @@ -13,18 +13,10 @@ export * from "./http/index.js"; export * from "./browser/index.js"; export interface ElmPorts { - send: { - subscribe: ( - callback: (defs: TaskDefinition[] | Command) => Promise - ) => void; - }; - receive: { send: (result: TaskResult[] | PoolId) => void }; + send: { subscribe: (callback: (defs: TaskDefinition[]) => Promise) => void; }; + receive: { send: (result: TaskResult[]) => void }; } -export type Command = { command: "identify-pool" }; - -export type PoolId = { poolId: number }; - export type Tasks = { [fn: string]: (arg: any) => any }; export interface TaskDefinition { @@ -128,69 +120,49 @@ export function register(options: Options): void { const tasks = createTasks(options); const subscribe = options.ports.send.subscribe; const send = options.ports.receive.send; - let poolId = 0; - - function nextPoolId() { - poolId = cycleInt({ max: 1000 }, poolId); - } subscribe(async (payload) => { - if ("command" in payload) { - switch (payload.command) { - case "identify-pool": { - // The Promise.resolve wrapper here prevents a race condition in Elm where sometimes the Model is not updated in time before the poolId is received - return Promise.resolve().then(() => { - send({ poolId }); - nextPoolId(); - }); - } - default: { - throw new Error(`Unrecognised internal command: ${payload}`); - } - } - } else { - const debouncedSend = debounce(send, debounceThreshold(payload)); - - for (const def of payload) { - if (!tasks[def.function]) { - return debouncedSend({ - attemptId: def.attemptId, - taskId: def.taskId, - result: { - error: { - reason: "missing_function", - message: `${def.function} is not registered`, - }, + const debouncedSend = debounce(send, debounceThreshold(payload)); + + for (const def of payload) { + if (!tasks[def.function]) { + return debouncedSend({ + attemptId: def.attemptId, + taskId: def.taskId, + result: { + error: { + reason: "missing_function", + message: `${def.function} is not registered`, }, - }); - } + }, + }); } + } - payload.map(async (def) => { - try { - logTaskStart(def, options); - const result = await tasks[def.function]?.(def.args); - logTaskFinish(def, options); - debouncedSend({ - attemptId: def.attemptId, - taskId: def.taskId, - result: { value: result }, - }); - } catch (e) { - debouncedSend({ - attemptId: def.attemptId, - taskId: def.taskId, - result: { - error: { - reason: "js_exception", - message: `${e.name}: ${e.message}`, - raw: e, - }, + payload.map(async (def) => { + try { + logTaskStart(def, options); + const result = await tasks[def.function]?.(def.args); + logTaskFinish(def, options); + debouncedSend({ + attemptId: def.attemptId, + taskId: def.taskId, + result: { value: result }, + }); + } catch (e) { + debouncedSend({ + attemptId: def.attemptId, + taskId: def.taskId, + result: { + error: { + reason: "js_exception", + message: `${e.name}: ${e.message}`, + raw: e, }, - }); - } - }); - } + }, + }); + } + }); }); } @@ -271,7 +243,3 @@ function prefixWith(prefix: string, tasks: Tasks): Tasks { Object.entries(tasks).map(([key, fn]) => [`${prefix}${key}`, fn]) ); } - -function cycleInt(options: { max: number }, i: number): number { - return i >= options.max ? 0 : i + 1; -} diff --git a/src/ConcurrentTask.elm b/src/ConcurrentTask.elm index 1a9a258..7c644ff 100644 --- a/src/ConcurrentTask.elm +++ b/src/ConcurrentTask.elm @@ -9,7 +9,7 @@ module ConcurrentTask exposing , race , batch, sequence , map, andMap, map2, map3, map4, map5 - , attempt, attemptEach, Response(..), UnexpectedError(..), onProgress, Pool, pool + , attempt, attemptEach, Response(..), UnexpectedError(..), onProgress, Pool, pool, withPoolId ) {-| A Task similar to `elm/core`'s `Task` but: @@ -194,7 +194,7 @@ Here's a minimal complete example: , subscriptions = subscriptions } -@docs attempt, attemptEach, Response, UnexpectedError, onProgress, Pool, pool +@docs attempt, attemptEach, Response, UnexpectedError, onProgress, Pool, pool, withPoolId -} @@ -1264,3 +1264,23 @@ Right now it doesn't expose any functionality, but it could be used in the futur pool : Pool msg pool = Internal.pool + + +{-| Add an id to a `Pool`. + +Why? Because Pools can be instantiated multiple times (think switching pages in a Single Page App), +without a unique identifier a ConcurrentTask Pool may end up receiving responses for a ConcurrentTask pool that was previously discarded. + +One example is a user switching back and forth between two pages: + + - Page one has a long running task on `init` + - The user switches to page 2, then switches back to page 1 + - A new long running task is started + - But the Pool can receive the response from the first long running task (which is unexpected behaviour) + +Adding a different id to the `Pool` allows these previous responses to be ignored. + +-} +withPoolId : Int -> Pool msg -> Pool msg +withPoolId = + Internal.withPoolId diff --git a/src/ConcurrentTask/Internal.elm b/src/ConcurrentTask/Internal.elm index 1e2cb81..2965eb4 100644 --- a/src/ConcurrentTask/Internal.elm +++ b/src/ConcurrentTask/Internal.elm @@ -23,6 +23,7 @@ module ConcurrentTask.Internal exposing , onResponseDecoderFailure , pool , succeed + , withPoolId ) import Array exposing (Array) @@ -408,34 +409,12 @@ type Pool msg type alias Pool_ msg = - { poolId : PoolId - , queued : List ( Array Todo, Progress msg ) + { poolId : Maybe Int , attempts : Dict AttemptId (Progress msg) , attemptIds : Ids } -{-| Because Pools can be instantiated multiple times (think switching pages in a Single Page App), -without a unique identifier a Task Pool may end up receiving responses for a Task pool that was previously discarded. - -One example is a user switching back and forth between two pages: - - - Page one has a long running task on `init` - - The user switches to page 2, then switches back to page 1 - - A new long running task is started - - But the Task Pool can receive the response from the first long running task (which is unexpected behaviour) - -Adding a `PoolId` is a bit fiddly (internally there needs to be a random / external value present before any tasks start), but it solves this problem. - -The JS runner externally keeps track of the number of Pools that have been instantiated and sends that number back, so each new pool is unique. - --} -type PoolId - = Unidentified - | Identifying - | Identified String - - type alias Progress msg = { inFlight : Set TaskId , task : ( Ids, ConcurrentTask msg msg ) @@ -489,101 +468,33 @@ attempt attempt_ task = , onComplete = attempt_.onComplete } in - case poolId attempt_.pool of - Unidentified -> - ( withPoolId Identifying attempt_.pool |> queueTask ( defs, progress ) - , attempt_.send identifyPoolCmd - ) - - Identifying -> - ( attempt_.pool |> queueTask ( defs, progress ) - , Cmd.none - ) - - Identified _ -> - startTask - { progress = progress - , pool = attempt_.pool - , send = attempt_.send - , defs = defs - } - - -startTask : - { progress : Progress msg - , pool : Pool msg - , send : Encode.Value -> Cmd msg - , defs : Array Todo - } - -> ( Pool msg, Cmd msg ) -startTask options = - ( startAttempt options.progress options.pool - , options.send (encodeDefinitions (currentAttemptId options.pool) options.defs) - ) - - -identifyPoolCmd : Encode.Value -identifyPoolCmd = - Encode.object [ ( "command", Encode.string "identify-pool" ) ] - - -decodeIdentifyResponse : Decode.Value -> Result Decode.Error PoolId -decodeIdentifyResponse = - Decode.decodeValue - (Decode.map (String.fromInt >> Identified) - (Decode.field "poolId" Decode.int) - ) + ( startAttempt progress attempt_.pool + , attempt_.send (encodeDefinitions (currentAttemptId attempt_.pool) defs) + ) onProgress : OnProgress msg -> Pool msg -> Sub msg onProgress options pool_ = options.receive (\rawResults -> - case decodeIdentifyResponse rawResults of - Ok id -> - options.onProgress - (startQueuedTasks - { pool = withPoolId id pool_ - , send = options.send - } - ) - - Err _ -> - toBatchResults rawResults - |> Dict.toList - |> List.foldl - (\( attempt_, results ) ( p, cmd ) -> - case findAttempt attempt_ p of - Nothing -> - ( p, cmd ) - - Just progress -> - progress - |> updateAttempt options p ( attempt_, results ) - |> withCmd cmd - ) - ( pool_, Cmd.none ) - |> options.onProgress + toBatchResults rawResults + |> Dict.toList + |> List.foldl + (\( attempt_, results ) ( p, cmd ) -> + case findAttempt attempt_ p of + Nothing -> + ( p, cmd ) + + Just progress -> + progress + |> updateAttempt options p ( attempt_, results ) + |> withCmd cmd + ) + ( pool_, Cmd.none ) + |> options.onProgress ) -startQueuedTasks : { send : Encode.Value -> Cmd msg, pool : Pool msg } -> ( Pool msg, Cmd msg ) -startQueuedTasks options = - queuedTasks options.pool - |> List.foldl - (\( defs, progress ) ( pool_, cmd ) -> - startTask - { progress = progress - , defs = defs - , pool = pool_ - , send = options.send - } - |> withCmd cmd - ) - ( options.pool, Cmd.none ) - |> Tuple.mapFirst clearQueue - - updateAttempt : OnProgress msg -> Pool msg -> ( AttemptId, Results ) -> Progress msg -> ( Pool msg, Cmd msg ) updateAttempt options pool_ ( attemptId, results ) progress = case stepTask results progress.task of @@ -592,6 +503,10 @@ updateAttempt options pool_ ( attemptId, results ) progress = nextProgress : ( Ids, ConcurrentTask msg msg ) nextProgress = ( ids_, next_ ) + + notStarted : Todo -> Bool + notStarted def = + not (Set.member def.taskId progress.inFlight) in case stepTask results nextProgress of ( _, Done res ) -> @@ -610,7 +525,7 @@ updateAttempt options pool_ ( attemptId, results ) progress = } pool_ , defs - |> Array.filter (notStarted progress) + |> Array.filter notStarted |> encodeDefinitions attemptId |> options.send ) @@ -621,7 +536,14 @@ updateAttempt options pool_ ( attemptId, results ) progress = recordSent : Array Todo -> Set TaskId -> Set TaskId recordSent defs inFlight = - Set.union inFlight (toSentIds defs) + let + sentIds : Set TaskId + sentIds = + Array.map .taskId defs + |> Array.toList + |> Set.fromList + in + Set.union inFlight sentIds removeCompleted : Results -> Set TaskId -> Set TaskId @@ -629,48 +551,36 @@ removeCompleted res inFlight = Set.diff inFlight (Set.fromList (Dict.keys res)) -toSentIds : Array Todo -> Set TaskId -toSentIds defs = - Array.map .taskId defs - |> Array.toList - |> Set.fromList - - sendResult : (Response x a -> msg) -> Response x a -> Cmd msg sendResult onComplete res = CoreTask.succeed res |> CoreTask.perform onComplete -notStarted : Progress msg -> Todo -> Bool -notStarted model def = - not (Set.member def.taskId model.inFlight) - - toBatchResults : Decode.Value -> BatchResults toBatchResults = + let + toBatchResults_ : List RawResult -> BatchResults + toBatchResults_ = + List.foldl + (\result batch_ -> + Dict.update result.attemptId + (\attempt_ -> + case attempt_ of + Nothing -> + Just (Dict.singleton result.taskId result.result) + + Just attempt__ -> + Just (Dict.insert result.taskId result.result attempt__) + ) + batch_ + ) + Dict.empty + in Decode.decodeValue (Decode.list decodeRawResult) >> Result.map toBatchResults_ >> Result.withDefault Dict.empty -toBatchResults_ : List RawResult -> BatchResults -toBatchResults_ = - List.foldl - (\result batch_ -> - Dict.update result.attemptId - (\attempt_ -> - case attempt_ of - Nothing -> - Just (Dict.singleton result.taskId result.result) - - Just attempt__ -> - Just (Dict.insert result.taskId result.result attempt__) - ) - batch_ - ) - Dict.empty - - -- Encode / Decode @@ -685,150 +595,144 @@ decodeRawResult = decodeResponse : Definition x a -> Decode.Value -> Response x a decodeResponse def val = - case def.errors of - ExpectThrows catch -> - decodeExpectThrows catch def val - - ExpectErrors expect -> - decodeExpectErrors expect def val - - ExpectNoErrors -> - decodeExpectNoErrors def val - - -decodeExpectNoErrors : Definition x a -> Decode.Value -> Response b a -decodeExpectNoErrors def val = - case Decode.decodeValue (decodeRunnerError def) val of - Ok err -> - UnexpectedError err - - Err _ -> - case Decode.decodeValue (decodeRunnerSuccess def) val of - Ok a -> - Success a - - Err e -> - UnexpectedError - (ResponseDecoderFailure - { function = def.function - , error = e - } + let + decodeRunnerSuccess : Decoder a + decodeRunnerSuccess = + case def.expect of + ExpectJson expect -> + Decode.field "value" expect + + decodeRunnerError : Decoder UnexpectedError + decodeRunnerError = + Decode.field "error" + (Decode.field "reason" Decode.string + |> Decode.andThen + (\reason -> + case reason of + "js_exception" -> + Decode.map2 + (\msg raw -> + UnhandledJsException + { function = def.function + , message = msg + , raw = raw + } + ) + (Decode.field "message" Decode.string) + (Decode.field "raw" Decode.value) + + "missing_function" -> + Decode.field "message" (Decode.map MissingFunction Decode.string) + + _ -> + Decode.succeed (InternalError ("Unknown runner error reason: " ++ reason)) ) + ) - -decodeExpectThrows : (String -> x) -> Definition a b -> Decode.Value -> Response x b -decodeExpectThrows catch def val = - case Decode.decodeValue (decodeRunnerError def) val of - Ok err -> - case err of - UnhandledJsException e -> - Error (catch e.message) - - _ -> + decodeExpectErrors : Decoder x -> Response x a + decodeExpectErrors expect = + let + decodeExpectErrorField : Decoder val -> Decoder val + decodeExpectErrorField decoder = + Decode.field "value" (Decode.field "error" decoder) + in + case Decode.decodeValue decodeRunnerError val of + Ok err -> UnexpectedError err - Err _ -> - case Decode.decodeValue (decodeRunnerSuccess def) val of - Ok a -> - Success a - - Err e -> - UnexpectedError - (ResponseDecoderFailure - { function = def.function - , error = e - } - ) - + Err _ -> + case Decode.decodeValue (decodeExpectErrorField Decode.value) val of + Ok _ -> + case Decode.decodeValue (decodeExpectErrorField expect) val of + Ok err_ -> + Error err_ + + Err e_ -> + UnexpectedError + (ErrorsDecoderFailure + { function = def.function + , error = e_ + } + ) + + Err _ -> + case Decode.decodeValue decodeRunnerSuccess val of + Ok a -> + Success a + + Err e_ -> + UnexpectedError + (ResponseDecoderFailure + { function = def.function + , error = e_ + } + ) + + decodeExpectThrows : (String -> x) -> Response x a + decodeExpectThrows catch = + case Decode.decodeValue decodeRunnerError val of + Ok err -> + case err of + UnhandledJsException e -> + Error (catch e.message) -decodeExpectErrors : Decoder x -> Definition a b -> Decode.Value -> Response x b -decodeExpectErrors expect def val = - case Decode.decodeValue (decodeRunnerError def) val of - Ok err -> - UnexpectedError err + _ -> + UnexpectedError err - Err _ -> - case Decode.decodeValue (decodeExpectErrorField Decode.value) val of - Ok _ -> - case Decode.decodeValue (decodeExpectErrorField expect) val of - Ok err_ -> - Error err_ + Err _ -> + case Decode.decodeValue decodeRunnerSuccess val of + Ok a -> + Success a - Err e_ -> + Err e -> UnexpectedError - (ErrorsDecoderFailure + (ResponseDecoderFailure { function = def.function - , error = e_ + , error = e } ) + decodeExpectNoErrors : () -> Response x a + decodeExpectNoErrors _ = + case Decode.decodeValue decodeRunnerError val of + Ok err -> + UnexpectedError err + Err _ -> - case Decode.decodeValue (decodeRunnerSuccess def) val of + case Decode.decodeValue decodeRunnerSuccess val of Ok a -> Success a - Err e_ -> + Err e -> UnexpectedError (ResponseDecoderFailure { function = def.function - , error = e_ + , error = e } ) + in + case def.errors of + ExpectThrows catch -> + decodeExpectThrows catch + ExpectErrors expect -> + decodeExpectErrors expect -decodeExpectErrorField : Decoder a -> Decoder a -decodeExpectErrorField decoder = - Decode.field "value" (Decode.field "error" decoder) - - -decodeRunnerSuccess : Definition x a -> Decoder a -decodeRunnerSuccess def = - case def.expect of - ExpectJson expect -> - Decode.field "value" expect - - -decodeRunnerError : Definition x a -> Decoder UnexpectedError -decodeRunnerError def = - Decode.field "error" - (Decode.field "reason" Decode.string - |> Decode.andThen - (\reason -> - case reason of - "js_exception" -> - Decode.map2 - (\msg raw -> - UnhandledJsException - { function = def.function - , message = msg - , raw = raw - } - ) - (Decode.field "message" Decode.string) - (Decode.field "raw" Decode.value) - - "missing_function" -> - Decode.field "message" (Decode.map MissingFunction Decode.string) - - _ -> - Decode.succeed (InternalError ("Unknown runner error reason: " ++ reason)) - ) - ) + ExpectNoErrors -> + decodeExpectNoErrors () encodeDefinitions : AttemptId -> Array Todo -> Encode.Value encodeDefinitions attemptId = - Encode.array (encodeDefinition attemptId) - - -encodeDefinition : AttemptId -> Todo -> Encode.Value -encodeDefinition attemptId def = - Encode.object - [ ( "attemptId", Encode.string attemptId ) - , ( "taskId", Encode.string def.taskId ) - , ( "function", Encode.string def.function ) - , ( "args", def.args ) - ] + Encode.array + (\def -> + Encode.object + [ ( "attemptId", Encode.string attemptId ) + , ( "taskId", Encode.string def.taskId ) + , ( "function", Encode.string def.function ) + , ( "args", def.args ) + ] + ) @@ -838,13 +742,17 @@ encodeDefinition attemptId def = pool : Pool msg pool = Pool - { poolId = Unidentified - , queued = [] + { poolId = Nothing , attempts = Dict.empty , attemptIds = Ids.init } +withPoolId : Int -> Pool msg -> Pool msg +withPoolId id = + mapPool (\pool_ -> { pool_ | poolId = Just id }) + + startAttempt : Progress msg -> Pool msg -> Pool msg startAttempt progress p = mapPool @@ -860,31 +768,13 @@ startAttempt progress p = currentAttemptId : Pool msg -> AttemptId currentAttemptId (Pool pool_) = case pool_.poolId of - Identified id -> - id ++ ":" ++ Ids.get pool_.attemptIds - - Unidentified -> - Ids.get pool_.attemptIds + Just id -> + String.fromInt id ++ ":" ++ Ids.get pool_.attemptIds - Identifying -> + Nothing -> Ids.get pool_.attemptIds -poolId : Pool msg -> PoolId -poolId (Pool pool_) = - pool_.poolId - - -withPoolId : PoolId -> Pool msg -> Pool msg -withPoolId id = - mapPool (\pool_ -> { pool_ | poolId = id }) - - -queueTask : ( Array Todo, Progress msg ) -> Pool msg -> Pool msg -queueTask progress = - mapPool (\pool_ -> { pool_ | queued = progress :: pool_.queued }) - - updateProgressFor : AttemptId -> Progress msg -> Pool msg -> Pool msg updateProgressFor attemptId progress_ = mapPool (\pool_ -> { pool_ | attempts = Dict.update attemptId (Maybe.map (always progress_)) pool_.attempts }) @@ -895,16 +785,6 @@ removeFromPool attemptId = mapPool (\pool_ -> { pool_ | attempts = Dict.remove attemptId pool_.attempts }) -queuedTasks : Pool msg -> List ( Array Todo, Progress msg ) -queuedTasks (Pool p) = - p.queued - - -clearQueue : Pool msg -> Pool msg -clearQueue = - mapPool (\pool_ -> { pool_ | queued = [] }) - - findAttempt : AttemptId -> Pool msg -> Maybe (Progress msg) findAttempt attemptId (Pool p) = Dict.get attemptId p.attempts