diff --git a/CHANGELOG.md b/CHANGELOG.md index eea6fc5..8eef07b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # News +## v1.4.0 - 2023-08-07 + +- Implement a `DelayQueue`, i.e. a `QueueStore` with latency between the store and take events. +- Bugfix to `QueueStore` and `StackStore` for take events on empty stores. + ## v1.3.0 - 2023-08-07 - Implement ordered versions of `Store`, namely `QueueStore` and `StackStore`. diff --git a/Project.toml b/Project.toml index f830d07..7a6eccd 100644 --- a/Project.toml +++ b/Project.toml @@ -5,7 +5,7 @@ license = "MIT" desc = "A discrete event process oriented simulation framework." authors = ["Ben Lauwens and SimJulia and ConcurrentSim contributors"] repo = "https://github.com/JuliaDynamics/ConcurrentSim.jl.git" -version = "1.3.0" +version = "1.4.0" [deps] DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" diff --git a/docs/src/examples/Latency.md b/docs/src/examples/Latency.md index 24657c6..3fbc7f2 100644 --- a/docs/src/examples/Latency.md +++ b/docs/src/examples/Latency.md @@ -1,6 +1,8 @@ # Event Latency +There is a built-in [`DelayQueue`](@ref) if you need a store [`Store`](@ref) with latency between `put!` and `take!` events. However here, we show you how you could have built one for yourself. If you modify this in order to construct a particularly useful type of latency store, please contribute it to the library through a pull request. + ## Description In this example we show how to separate the time delay between processes from the processes themselves. We model a communications channel, called a `Cable`, where a sender sends messages regularly each `SEND_PERIOD` time units and a receiver listens each `RECEIVE_PERIOD`. The messages in the cable have a delay fo `DELAY_DURATION` until they reach the recevier. diff --git a/src/ConcurrentSim.jl b/src/ConcurrentSim.jl index 940f282..5f318e0 100755 --- a/src/ConcurrentSim.jl +++ b/src/ConcurrentSim.jl @@ -17,7 +17,8 @@ module ConcurrentSim export @resumable, @yield export AbstractProcess, Simulation, run, now, active_process, StopSimulation export Process, @process, interrupt - export Container, Resource, Store, StackStore, QueueStore, put!, get, cancel, request, tryrequest, release + export Container, Resource, Store, StackStore, QueueStore, DelayQueue + export put!, get, cancel, request, tryrequest, release export nowDatetime include("base.jl") @@ -29,6 +30,7 @@ module ConcurrentSim include("resources/containers.jl") include("resources/stores.jl") include("resources/ordered_stores.jl") + include("resources/delayed_stores.jl") include("utils/time.jl") include("deprecated_aliased.jl") end diff --git a/src/resources/delayed_stores.jl b/src/resources/delayed_stores.jl new file mode 100755 index 0000000..bad21b1 --- /dev/null +++ b/src/resources/delayed_stores.jl @@ -0,0 +1,58 @@ +@doc raw""" + DelayQueue{T} + +A queue in which items are stored in a FIFO order, but are only available after a delay. + +```jldoctest +julia> sim = Simulation() + queue = DelayQueue{Symbol}(sim, 10) + @resumable function producer(env, queue) + for item in [:a,:b,:a,:c] + @info "putting $item at time $(now(env))" + put!(queue, item) + @yield timeout(env, 2) + end + end + @resumable function consumer(env, queue) + @yield timeout(env, 5) + while true + t = @yield take!(queue) + @info "taking $(t) at time $(now(env))" + end + end + @process producer(sim, queue) + @process consumer(sim, queue) + run(sim, 30) +[ Info: putting a at time 0.0 +[ Info: putting b at time 2.0 +[ Info: putting a at time 4.0 +[ Info: putting c at time 6.0 +[ Info: taking a at time 10.0 +[ Info: taking b at time 12.0 +[ Info: taking a at time 14.0 +[ Info: taking c at time 16.0 +``` +""" +mutable struct DelayQueue{T} + store::QueueStore{T, Int} + delay::Float64 +end +function DelayQueue(env::Environment, delay) + return DelayQueue(QueueStore{Any}(env), float(delay)) +end +function DelayQueue{T}(env::Environment, delay) where T + return DelayQueue(QueueStore{T}(env), float(delay)) +end + +@resumable function latency(env::Environment, channel::DelayQueue, value) + @yield timeout(channel.store.env, channel.delay) + put!(channel.store, value) +end + +function Base.put!(channel::DelayQueue, value) + @process latency(channel.store.env, channel, value) # start the process, but do not wait on it +end + +function Base.take!(channel::DelayQueue) + get(channel.store) +end diff --git a/src/resources/ordered_stores.jl b/src/resources/ordered_stores.jl index 33d0b64..5923520 100755 --- a/src/resources/ordered_stores.jl +++ b/src/resources/ordered_stores.jl @@ -79,6 +79,7 @@ end function do_get(sto::StackStore{N, T}, get_ev::Get, key::StoreGetKey{T}) where {N, T<:Number} key.filter !== get_any_item && error("Filtering not supported for `StackStore`. Use an unordered store instead, or submit a feature request for implementing filtering to our issue tracker.") + isempty(sto.items) && return true item = pop!(sto.items) sto.load -= one(UInt) schedule(get_ev; value=item) @@ -96,6 +97,7 @@ end function do_get(sto::QueueStore{N, T}, get_ev::Get, key::StoreGetKey{T}) where {N, T<:Number} key.filter !== get_any_item && error("Filtering not supported for `QueueStore`. Use an unordered store instead, or submit a feature request for implementing filtering to our issue tracker.") + isempty(sto.items) && return true item = dequeue!(sto.items) sto.load -= one(UInt) schedule(get_ev; value=item) diff --git a/test/runtests.jl b/test/runtests.jl index 2bb15ca..8d7ec0a 100755 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -35,6 +35,7 @@ println("Starting tests with $(Threads.nthreads()) threads out of `Sys.CPU_THREA @doset "resources_containers_deprecated" @doset "resources_stores" @doset "resources_stores_deprecated" +@doset "resources_fancy_stores" @doset "resource_priorities" @doset "utils_time" VERSION >= v"1.9" && @doset "doctests" diff --git a/test/test_resources_fancy_stores.jl b/test/test_resources_fancy_stores.jl new file mode 100755 index 0000000..0e06aeb --- /dev/null +++ b/test/test_resources_fancy_stores.jl @@ -0,0 +1,29 @@ +using ConcurrentSim +using ResumableFunctions + +@resumable function producer(env, queue) + for item in [:a,:b,:a,:c] + @info "putting $item at time $(now(env))" + put!(queue, item) + @yield timeout(env, 2) + end +end +@resumable function consumer(env, queue) + @yield timeout(env, 5) + while true + t = @yield take!(queue) + @info "taking $(t) at time $(now(env))" + end +end + +function runsim(storeconstructor) + sim = Simulation() + queue = storeconstructor(sim) + @process producer(sim, queue) + @process consumer(sim, queue) + run(sim, 30) +end + +runsim(sim->DelayQueue{Symbol}(sim, 10)) +runsim(sim->QueueStore{Symbol}(sim)) +runsim(sim->Store{Symbol}(sim))