Skip to content

Bugfix in priority of put!/get of a resource #101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
name = "ConcurrentSim"
uuid = "6ed1e86c-fcaf-46a9-97e0-2b26a2cdb499"
keywords = ["discrete-even simulation"]
keywords = ["discrete-event simulation"]
license = "MIT"
desc = "A discrete event process oriented simulation framework."
authors = ["Ben Lauwens and SimJulia and ConcurrentSim contributors"]
repo = "https://github.com/JuliaDynamics/ConcurrentSim.jl.git"
version = "1.4.0"
version = "1.4.1"

[deps]
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Expand Down
6 changes: 6 additions & 0 deletions src/base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ mutable struct BaseEvent
end
end

struct HighPrioFirst <: Base.Ordering end
struct LowPrioFirst <: Base.Ordering end
const HighPrio = HighPrioFirst()
const LowPrio = LowPrioFirst()
pickorder( hpf::Bool ) = hpf ? HighPrio : LowPrio

function show(io::IO, ev::AbstractEvent)
print(io, "$(typeof(ev)) $(ev.bev.id)")
end
Expand Down
12 changes: 10 additions & 2 deletions src/resources/base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,20 @@ struct Get <: ResourceEvent
end
end

function Base.lt( ::HighPrioFirst, a::ResourceKey, b::ResourceKey )
(a.priority > b.priority) || (a.priority === b.priority && a.id < b.id)
end

function Base.lt( ::LowPrioFirst, a::ResourceKey, b::ResourceKey )
(a.priority < b.priority) || (a.priority === b.priority && a.id < b.id)
end

function isless(a::ResourceKey, b::ResourceKey)
(a.priority < b.priority) || (a.priority === b.priority && a.id < b.id)
end

function trigger_put(put_ev::ResourceEvent, res::AbstractResource)
queue = DataStructures.PriorityQueue(res.put_queue)
queue = DataStructures.PriorityQueue(res.put_queue.o, res.put_queue)
while length(queue) > 0
(put_ev, key) = DataStructures.peek(queue)
proceed = do_put(res, put_ev, key)
Expand All @@ -37,7 +45,7 @@ function trigger_put(put_ev::ResourceEvent, res::AbstractResource)
end

function trigger_get(get_ev::ResourceEvent, res::AbstractResource)
queue = DataStructures.PriorityQueue(res.get_queue)
queue = DataStructures.PriorityQueue(res.get_queue.o, res.get_queue)
while length(queue) > 0
(get_ev, key) = DataStructures.peek(queue)
proceed = do_get(res, get_ev, key)
Expand Down
16 changes: 9 additions & 7 deletions src/resources/containers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ struct ContainerKey{N<:Real, T<:Number} <: ResourceKey
end

"""
Container{N<:Real, T<:Number}(env::Environment, capacity::N=one(N); level::N=zero(N))
Container{N<:Real, T<:Number}(env::Environment, capacity::N=one(N); level::N=zero(N), highpriofirst::Bool=false)

A "Container" resource object, storing up to `capacity` units of a resource (of type `N`).

Expand All @@ -16,6 +16,8 @@ The [`lock`](@ref) and [`unlock`](@ref) functions are a convenient way to intera
in a way mostly compatible with other discrete event and concurrency frameworks.
The `request` and `release` aliases are also available for these two functions.

`highpriofirst` determines the order of handling requests that can be met at the same time.

See [`Store`](@ref) for a more channel-like resource.

Think of `Resource` and `Container` as locks and of `Store` as channels. They block only if empty (on taking) or full (on storing).
Expand All @@ -27,17 +29,17 @@ mutable struct Container{N<:Real, T<:Number} <: AbstractResource
seid :: UInt
put_queue :: DataStructures.PriorityQueue{Put, ContainerKey{N, T}}
get_queue :: DataStructures.PriorityQueue{Get, ContainerKey{N, T}}
function Container{N, T}(env::Environment, capacity::N=one(N); level=zero(N)) where {N<:Real, T<:Number}
new(env, capacity, N(level), zero(UInt), DataStructures.PriorityQueue{Put, ContainerKey{N, T}}(), DataStructures.PriorityQueue{Get, ContainerKey{N, T}}())
function Container{N, T}(env::Environment, capacity::N=one(N); level=zero(N), highpriofirst::Bool=false) where {N<:Real, T<:Number}
new(env, capacity, N(level), zero(UInt), DataStructures.PriorityQueue{Put, ContainerKey{N, T}}( pickorder(highpriofirst) ), DataStructures.PriorityQueue{Get, ContainerKey{N, T}}( pickorder(highpriofirst) ))
end
end

function Container(env::Environment, capacity::N=one(N); level=zero(N)) where {N<:Real}
Container{N, Int}(env, capacity; level=N(level))
function Container(env::Environment, capacity::N=one(N); level=zero(N), highpriofirst::Bool=false) where {N<:Real}
Container{N, Int}(env, capacity; level=N(level), highpriofirst=highpriofirst)
end

function Container{T}(env::Environment, capacity::N=one(N); level=zero(N)) where {N<:Real, T<:Number}
Container{N, T}(env, capacity; level=N(level))
function Container{T}(env::Environment, capacity::N=one(N); level=zero(N), highpriofirst::Bool=false) where {N<:Real, T<:Number}
Container{N, T}(env, capacity; level=N(level), highpriofirst=highpriofirst)
end

const Resource = Container{Int, Int}
Expand Down
8 changes: 4 additions & 4 deletions src/resources/delayed_stores.jl
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ mutable struct DelayQueue{T}
store::QueueStore{T, Int}
delay::Float64
end
function DelayQueue(env::Environment, delay)
return DelayQueue(QueueStore{Any}(env), float(delay))
function DelayQueue(env::Environment, delay; highpriofirst::Bool=false)
return DelayQueue(QueueStore{Any}(env, highpriofirst=highpriofirst), float(delay))
end
function DelayQueue{T}(env::Environment, delay) where T
return DelayQueue(QueueStore{T}(env), float(delay))
function DelayQueue{T}(env::Environment, delay; highpriofirst::Bool=false) where T
return DelayQueue(QueueStore{T}(env, highpriofirst=highpriofirst), float(delay))
end

@resumable function latency(env::Environment, channel::DelayQueue, value)
Expand Down
4 changes: 2 additions & 2 deletions src/resources/ordered_stores.jl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ julia> [value(take!(stack)) for _ in 1:length(items)]
See also: [`QueueStore`](@ref), [`Store`](@ref)
"""
const StackStore = Store{N, T, DataStructures.Stack{N}} where {N, T<:Number}
StackStore{N}(env::Environment; capacity=typemax(UInt)) where {N} = StackStore{N, Int}(env; capacity)
StackStore{N}(env::Environment; capacity=typemax(UInt), highpriofirst::Bool=false) where {N} = StackStore{N, Int}(env; capacity, highpriofirst=highpriofirst)

"""
QueueStore{N, T<:Number}
Expand Down Expand Up @@ -66,7 +66,7 @@ julia> [value(take!(queue)) for _ in 1:length(items)]
See also: [`StackStore`](@ref), [`Store`](@ref)
"""
const QueueStore = Store{N, T, DataStructures.Queue{N}} where {N, T<:Number}
QueueStore{N}(env::Environment; capacity=typemax(UInt)) where {N} = QueueStore{N, Int}(env; capacity)
QueueStore{N}(env::Environment; capacity=typemax(UInt), highpriofirst::Bool=false) where {N} = QueueStore{N, Int}(env; capacity, highpriofirst=highpriofirst)

function do_put(sto::StackStore{N, T}, put_ev::Put, key::StorePutKey{N, T}) where {N, T<:Number}
if sto.load < sto.capacity
Expand Down
16 changes: 9 additions & 7 deletions src/resources/stores.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ struct StoreGetKey{T<:Number} <: ResourceKey
end

"""
Store{N, T<:Number}(env::Environment; capacity::UInt=typemax(UInt))
Store{N, T<:Number}(env::Environment; capacity::UInt=typemax(UInt), highpriofirst::Bool=false)

A store is a resource that can hold a number of items of type `N`. It is similar to a `Base.Channel` with a finite capacity ([`put!`](@ref) blocks after reaching capacity).
The [`put!`](@ref) and [`take!`](@ref) functions are a convenient way to interact with such a "channel" in a way mostly compatible with other discrete event and concurrency frameworks.

`highpriofirst` determines the order of handling requests that can be met at the same time.

See [`Container`](@ref) for a more lock-like resource.

Think of `Resource` and `Container` as locks and of `Store` as channels/stacks. They block only if empty (on taking) or full (on storing).
Expand All @@ -42,17 +44,17 @@ mutable struct Store{N, T<:Number, D} <: AbstractResource
seid :: UInt
put_queue :: DataStructures.PriorityQueue{Put, StorePutKey{N, T}}
get_queue :: DataStructures.PriorityQueue{Get, StoreGetKey{T}}
function Store{N, T, D}(env::Environment; capacity=typemax(UInt)) where {N, T<:Number, D}
new(env, UInt(capacity), zero(UInt), D(), zero(UInt), DataStructures.PriorityQueue{Put, StorePutKey{N, T}}(), DataStructures.PriorityQueue{Get, StoreGetKey{T}}())
function Store{N, T, D}(env::Environment; capacity=typemax(UInt), highpriofirst::Bool=false) where {N, T<:Number, D}
new(env, UInt(capacity), zero(UInt), D(), zero(UInt), DataStructures.PriorityQueue{Put, StorePutKey{N, T}}( pickorder(highpriofirst) ), DataStructures.PriorityQueue{Get, StoreGetKey{T}}( pickorder(highpriofirst) ))
end
end

function Store{N, T}(env::Environment; capacity=typemax(UInt)) where {N, T<:Number}
Store{N, T, Dict{N, UInt}}(env; capacity=UInt(capacity))
function Store{N, T}(env::Environment; capacity=typemax(UInt), highpriofirst::Bool=false) where {N, T<:Number}
Store{N, T, Dict{N, UInt}}(env; capacity=UInt(capacity), highpriofirst=highpriofirst)
end

function Store{N}(env::Environment; capacity=typemax(UInt)) where {N}
Store{N, Int}(env; capacity=UInt(capacity))
function Store{N}(env::Environment; capacity=typemax(UInt), highpriofirst::Bool=false) where {N}
Store{N, Int}(env; capacity=UInt(capacity), highpriofirst=highpriofirst)
end

"""
Expand Down
10 changes: 7 additions & 3 deletions src/simulations.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@ struct EventKey
id :: UInt
end

function isless(a::EventKey, b::EventKey) :: Bool
function Base.lt( ::HighPrioFirst, a::EventKey, b::EventKey ) :: Bool
(a.time < b.time) || (a.time === b.time && a.priority > b.priority) || (a.time === b.time && a.priority === b.priority && a.id < b.id)
end

function Base.lt( ::LowPrioFirst, a::EventKey, b::EventKey ) :: Bool
(a.time < b.time) || (a.time === b.time && a.priority < b.priority) || (a.time === b.time && a.priority === b.priority && a.id < b.id)
end

mutable struct Simulation <: Environment
time :: Float64
heap :: DataStructures.PriorityQueue{BaseEvent, EventKey}
eid :: UInt
sid :: UInt
active_proc :: Union{AbstractProcess, Nothing}
function Simulation(initial_time::Number=zero(Float64))
new(initial_time, DataStructures.PriorityQueue{BaseEvent, EventKey}(), zero(UInt), zero(UInt), nothing)
function Simulation(initial_time::Number=zero(Float64), highpriofirst::Bool=true)
new(initial_time, DataStructures.PriorityQueue{BaseEvent, EventKey}( pickorder(highpriofirst) ), zero(UInt), zero(UInt), nothing)
end
end

Expand Down
19 changes: 18 additions & 1 deletion test/test_resource_priorities.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ put!(sto, :b; priority = typemin(UInt))
@testset "Resource priority evaluation" begin
using ResumableFunctions

println( "Resource request 1 with priority 5, resource request 2 with priority 0" )

let sim = Simulation()
@resumable function f(env, res)
@yield lock(res)
Expand All @@ -71,6 +73,21 @@ put!(sto, :b; priority = typemin(UInt))
ev2 = unlock(res)
@process f(sim, res)
run(sim)
@test state(ev1) === ConcurrentSim.processed

println( "Default request order (low prio first): request ", state(ev1) === ConcurrentSim.processed ? 1 : 2, " served first." )
end

let sim = Simulation()
@resumable function f(env, res)
@yield lock(res)
end

res = Resource(sim, highpriofirst=true)
ev1 = unlock(res, priority=5)
ev2 = unlock(res)
@process f(sim, res)
run(sim)

println( "Alternate request order (high prio first): request ", state(ev1) === ConcurrentSim.processed ? 1 : 2, " served first." )
end
end