1+ /**
2+ * Copyright (C) 2015 Roland Kuhn <http://rolandkuhn.com>
3+ */
4+ package com .reactivedesignpatterns .chapter12
5+
6+ import akka .actor ._
7+ import play .api .libs .json .JsValue
8+ import scala .collection .immutable .TreeMap
9+ import scala .concurrent .duration ._
10+
11+ object ActiveActive {
12+ import ReplicationProtocol ._
13+ import Persistence ._
14+
15+ private case class SeqCommand (seq : Int , cmd : Command , replyTo : ActorRef )
16+ private case class SeqResult (seq : Int , res : Result , replica : ActorRef , replyTo : ActorRef )
17+
18+ private case class SendInitialData (toReplica : ActorRef )
19+ private case class InitialData (map : Map [String , JsValue ])
20+
21+ class Replica extends Actor with Stash {
22+ var map = Map .empty[String , JsValue ]
23+
24+ def receive = {
25+ case InitialData (m) =>
26+ map = m
27+ context.become(initialized)
28+ unstashAll()
29+ case _ => stash()
30+ }
31+
32+ def initialized : Receive = {
33+ case SeqCommand (seq, cmd, replyTo) =>
34+ cmd match {
35+ case Put (key, value, r) =>
36+ map += key -> value
37+ replyTo ! SeqResult (seq, PutConfirmed (key), self, r)
38+ case Get (key, r) =>
39+ replyTo ! SeqResult (seq, GetResult (key, map get key), self, r)
40+ }
41+ case SendInitialData (toReplica) => toReplica ! InitialData (map)
42+ }
43+ }
44+
45+ object Replica {
46+ val props = Props (new Replica )
47+ }
48+
49+ private sealed trait ReplyState {
50+ def deadline : Deadline
51+ def missing : Set [ActorRef ]
52+ def add (res : SeqResult ): ReplyState
53+ def isKnown : Boolean
54+ def isFinished : Boolean = missing.isEmpty
55+ }
56+ private case class Unknown (deadline : Deadline , replies : Set [SeqResult ], missing : Set [ActorRef ]) extends ReplyState {
57+ override def add (res : SeqResult ): ReplyState = {
58+ val quorum = (missing.size + 1 ) / 2
59+ val nextReplies = replies + res
60+ if (nextReplies.size >= quorum) {
61+ val answer = replies.toSeq.groupBy(_.res).collectFirst { case (k, s) if s.size >= quorum => s.head }
62+ if (answer.isDefined) {
63+ val right = answer.get
64+ val wrong = replies.collect { case SeqResult (_, res, replica, _) if res != right => replica }
65+ Known (deadline, right, wrong, missing - res.replica)
66+ } else Unknown (deadline, nextReplies, missing - res.replica)
67+ } else Unknown (deadline, nextReplies, missing - res.replica)
68+ }
69+ override def isKnown = false
70+ }
71+ private case class Known (deadline : Deadline , reply : SeqResult , wrong : Set [ActorRef ], missing : Set [ActorRef ]) extends ReplyState {
72+ override def add (res : SeqResult ): ReplyState = {
73+ val nextWrong = if (res.res == reply.res) wrong else wrong + res.replica
74+ Known (deadline, reply, nextWrong, missing - res.replica)
75+ }
76+ override def isKnown = true
77+ }
78+
79+ class Coordinator (N : Int ) extends Actor {
80+ private var replicas = (1 to N ).map(_ => context.actorOf(Replica .props)).toSet
81+ private val seqNr = Iterator from 0
82+ private var replies = TreeMap .empty[Int , ReplyState ]
83+ private var nextReply = 0
84+
85+ context.setReceiveTimeout(1 .second)
86+
87+ def receive = {
88+ case cmd : Command =>
89+ val c = SeqCommand (seqNr.next, cmd, self)
90+ replicas foreach (_ ! c)
91+ replies += c.seq -> Unknown (5 seconds fromNow, Set .empty, replicas)
92+ doTimeouts()
93+ case res : SeqResult if replies.contains(res.seq) && replicas.contains(res.replica) =>
94+ val prevState = replies(res.seq)
95+ val nextState = prevState.add(res)
96+ // potentially send reply if quorum of replies has been received now
97+ nextState match {
98+ case Known (seq, reply, _, _) if ! prevState.isKnown && seq == nextReply =>
99+ reply.replyTo ! reply.res
100+ nextReply += 1
101+ case _ =>
102+ }
103+ // clean up state
104+ if (nextState.isFinished) {
105+ dispose(nextState)
106+ replies -= res.seq
107+ } else {
108+ replies += res.seq -> nextState
109+ doTimeouts()
110+ }
111+ case ReceiveTimeout => doTimeouts()
112+ }
113+
114+ private def doTimeouts (): Unit = {
115+ val now = Deadline .now
116+ val expired = replies.iterator.takeWhile(_._2.deadline <= now)
117+ expired.map(_._2).foreach(dispose)
118+ }
119+
120+ /**
121+ * The given reply state has been removed from the replies map and is now
122+ * being disposed of. This means that we need to act upon wrong replies
123+ * from replicas.
124+ *
125+ * If there are replicas for which no reply has been recorded yet, we
126+ * ignore them. If they reply incorrectly later they will be replaced then.
127+ * GC pauses are tolerated: do not kick out replicas just for being slow.
128+ */
129+ private def dispose (state : ReplyState ): Unit =
130+ state match {
131+ case Unknown (_, replies, _) =>
132+ // did not reach consensus on this one, pick simple majority
133+ val counts = replies.toList.groupBy(_.res)
134+ val biggest = counts.iterator.map(_._2.size).max
135+ val winners = counts.collectFirst {
136+ case (res, win) if win.size == biggest => win
137+ }.get
138+ val losers = replicas -- winners.iterator.map(_.replica).toSet
139+ losers foreach replaceReplica
140+ case Known (_, _, wrong, _) =>
141+ wrong foreach replaceReplica
142+ }
143+
144+ private def replaceReplica (r : ActorRef ): Unit = {
145+ replicas -= r
146+ r ! PoisonPill
147+ val newReplica = context.actorOf(Replica .props)
148+ replicas.head ! SendInitialData (newReplica)
149+ replicas += newReplica
150+ }
151+ }
152+
153+ }
0 commit comments