1+
2+ export ConsensusActor, NoConsensusActor, AveragingConsensusAlgorithm, AveragingConsensusMessage, create_averaging_consensus_participant, gradient_term
3+
4+ abstract type ConsensusActor end
5+
6+ function gradient_term(actor:: ConsensusActor , λ:: Vector{<:Real} , data:: Any )
7+ return 0
8+ end
9+
10+ mutable struct NoConsensusActor <: ConsensusActor
11+ end
12+
13+ struct AveragingConsensusMessage
14+ λ:: Vector{Float64}
15+ k:: Int
16+ data:: Any
17+ end
18+
19+ struct ConsensusFinishedMessage
20+ λ:: Vector{Float64}
21+ k:: Int
22+ actor:: ConsensusActor
23+ end
24+
25+ @kwdef mutable struct AveragingConsensusAlgorithm <: DistributedAlgorithm
26+ message_queue:: Dict{Int,Vector{AveragingConsensusMessage}} = Dict{Int,Vector{AveragingConsensusMessage}}()
27+ first_message = true
28+ k:: Int = 0
29+ max_iter:: Int = 50
30+ λ:: Vector{Float64} = Vector{Real}()
31+
32+ initial_λ:: Real
33+ α:: Real
34+ actor:: ConsensusActor
35+
36+ finish_callback:: Function
37+ end
38+
39+ function on_exchange_message(algorithm_data:: AveragingConsensusAlgorithm , carrier:: Carrier , message:: AveragingConsensusMessage , meta:: Any )
40+ if message. k >= algorithm_data. max_iter
41+ # abort if iteration count is reached
42+ algorithm_data. finish_callback(algorithm_data, carrier)
43+ return
44+ end
45+
46+ if algorithm_data. first_message
47+ algorithm_data. first_message = false
48+ algorithm_data. λ = ones(length(message. λ)) .* algorithm_data. initial_λ
49+
50+ for addr in others(carrier, " " )
51+ send_to_other(carrier, AveragingConsensusMessage(algorithm_data. λ, 0 , message. data), addr)
52+ end
53+ end
54+ queue = get!(algorithm_data. message_queue, message. k, [])
55+
56+ push!(queue, message)
57+
58+ if length(queue) == length(others(carrier, " " ))
59+ avgλ = sum(m. λ for m in queue) ./ length(queue)
60+ algorithm_data. λ .+ = algorithm_data. α .* (avgλ .- algorithm_data. λ) .+ gradient_term(algorithm_data. actor, algorithm_data. λ, message. data)
61+
62+ algorithm_data. k += message. k + 1
63+ delete!(algorithm_data. message_queue, message. k)
64+
65+ for addr in others(carrier, " " )
66+ send_to_other(carrier, AveragingConsensusMessage(algorithm_data. λ, algorithm_data. k, message. data), addr)
67+ end
68+ end
69+ end
70+
71+ function create_averaging_consensus_participant(finish_callback:: Function , consensus_actor:: ConsensusActor ; initial_λ:: Real = 10 , α:: Real = 0.3 , max_iter:: Int = 50 )
72+ appl_consensus_actor = isnothing(consensus_actor) ? NoConsensusActor() : consensus_actor
73+
74+ return AveragingConsensusAlgorithm(finish_callback= finish_callback, initial_λ= initial_λ, α= α, actor= appl_consensus_actor, max_iter= max_iter)
75+ end
0 commit comments