Skip to content

implement DelayQueue #99

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# News

## v1.4.0 - 2023-08-07

- Implement a `DelayQueue`, i.e. a `QueueStore` with latency between the store and take events.
- Bugfix to `QueueStore` and `StackStore` for take events on empty stores.

## v1.3.0 - 2023-08-07

- Implement ordered versions of `Store`, namely `QueueStore` and `StackStore`.
Expand Down
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ license = "MIT"
desc = "A discrete event process oriented simulation framework."
authors = ["Ben Lauwens and SimJulia and ConcurrentSim contributors"]
repo = "https://github.com/JuliaDynamics/ConcurrentSim.jl.git"
version = "1.3.0"
version = "1.4.0"

[deps]
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Expand Down
2 changes: 2 additions & 0 deletions docs/src/examples/Latency.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

# Event Latency

There is a built-in [`DelayQueue`](@ref) if you need a store [`Store`](@ref) with latency between `put!` and `take!` events. However here, we show you how you could have built one for yourself. If you modify this in order to construct a particularly useful type of latency store, please contribute it to the library through a pull request.

## Description
In this example we show how to separate the time delay between processes from the processes themselves. We model a communications channel, called a `Cable`, where a sender sends messages regularly each `SEND_PERIOD` time units and a receiver listens each `RECEIVE_PERIOD`. The messages in the cable have a delay fo `DELAY_DURATION` until they reach the recevier.

Expand Down
4 changes: 3 additions & 1 deletion src/ConcurrentSim.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ module ConcurrentSim
export @resumable, @yield
export AbstractProcess, Simulation, run, now, active_process, StopSimulation
export Process, @process, interrupt
export Container, Resource, Store, StackStore, QueueStore, put!, get, cancel, request, tryrequest, release
export Container, Resource, Store, StackStore, QueueStore, DelayQueue
export put!, get, cancel, request, tryrequest, release
export nowDatetime

include("base.jl")
Expand All @@ -29,6 +30,7 @@ module ConcurrentSim
include("resources/containers.jl")
include("resources/stores.jl")
include("resources/ordered_stores.jl")
include("resources/delayed_stores.jl")
include("utils/time.jl")
include("deprecated_aliased.jl")
end
58 changes: 58 additions & 0 deletions src/resources/delayed_stores.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
@doc raw"""
DelayQueue{T}

A queue in which items are stored in a FIFO order, but are only available after a delay.

```jldoctest
julia> sim = Simulation()
queue = DelayQueue{Symbol}(sim, 10)
@resumable function producer(env, queue)
for item in [:a,:b,:a,:c]
@info "putting $item at time $(now(env))"
put!(queue, item)
@yield timeout(env, 2)
end
end
@resumable function consumer(env, queue)
@yield timeout(env, 5)
while true
t = @yield take!(queue)
@info "taking $(t) at time $(now(env))"
end
end
@process producer(sim, queue)
@process consumer(sim, queue)
run(sim, 30)
[ Info: putting a at time 0.0
[ Info: putting b at time 2.0
[ Info: putting a at time 4.0
[ Info: putting c at time 6.0
[ Info: taking a at time 10.0
[ Info: taking b at time 12.0
[ Info: taking a at time 14.0
[ Info: taking c at time 16.0
```
"""
mutable struct DelayQueue{T}
store::QueueStore{T, Int}
delay::Float64
end
function DelayQueue(env::Environment, delay)
return DelayQueue(QueueStore{Any}(env), float(delay))
end
function DelayQueue{T}(env::Environment, delay) where T
return DelayQueue(QueueStore{T}(env), float(delay))
end

@resumable function latency(env::Environment, channel::DelayQueue, value)
@yield timeout(channel.store.env, channel.delay)
put!(channel.store, value)
end

function Base.put!(channel::DelayQueue, value)
@process latency(channel.store.env, channel, value) # start the process, but do not wait on it
end

function Base.take!(channel::DelayQueue)
get(channel.store)
end
2 changes: 2 additions & 0 deletions src/resources/ordered_stores.jl
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ end

function do_get(sto::StackStore{N, T}, get_ev::Get, key::StoreGetKey{T}) where {N, T<:Number}
key.filter !== get_any_item && error("Filtering not supported for `StackStore`. Use an unordered store instead, or submit a feature request for implementing filtering to our issue tracker.")
isempty(sto.items) && return true
item = pop!(sto.items)
sto.load -= one(UInt)
schedule(get_ev; value=item)
Expand All @@ -96,6 +97,7 @@ end

function do_get(sto::QueueStore{N, T}, get_ev::Get, key::StoreGetKey{T}) where {N, T<:Number}
key.filter !== get_any_item && error("Filtering not supported for `QueueStore`. Use an unordered store instead, or submit a feature request for implementing filtering to our issue tracker.")
isempty(sto.items) && return true
item = dequeue!(sto.items)
sto.load -= one(UInt)
schedule(get_ev; value=item)
Expand Down
1 change: 1 addition & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ println("Starting tests with $(Threads.nthreads()) threads out of `Sys.CPU_THREA
@doset "resources_containers_deprecated"
@doset "resources_stores"
@doset "resources_stores_deprecated"
@doset "resources_fancy_stores"
@doset "resource_priorities"
@doset "utils_time"
VERSION >= v"1.9" && @doset "doctests"
Expand Down
29 changes: 29 additions & 0 deletions test/test_resources_fancy_stores.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using ConcurrentSim
using ResumableFunctions

@resumable function producer(env, queue)
for item in [:a,:b,:a,:c]
@info "putting $item at time $(now(env))"
put!(queue, item)
@yield timeout(env, 2)
end
end
@resumable function consumer(env, queue)
@yield timeout(env, 5)
while true
t = @yield take!(queue)
@info "taking $(t) at time $(now(env))"
end
end

function runsim(storeconstructor)
sim = Simulation()
queue = storeconstructor(sim)
@process producer(sim, queue)
@process consumer(sim, queue)
run(sim, 30)
end

runsim(sim->DelayQueue{Symbol}(sim, 10))
runsim(sim->QueueStore{Symbol}(sim))
runsim(sim->Store{Symbol}(sim))