Skip to content

Commit d143612

Browse files
committed
add strange pipepaxos hack …
1 parent e14d41f commit d143612

File tree

7 files changed

+96
-67
lines changed

7 files changed

+96
-67
lines changed

Modules/Protocol Benchmarks/src/main/scala/probench/KeyValueReplica.scala

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import rdts.base.LocalUid.replicaId
1111
import rdts.base.{Lattice, LocalUid, Uid}
1212
import rdts.datatypes.LastWriterWins
1313
import rdts.protocols.Participants
14-
import rdts.protocols.paper.{MultiPaxos, MultipaxosPhase}
14+
import rdts.protocols.paper.PipePaxos
1515
import rdts.time.Time
1616
import replication.DeltaStorage.Type.*
1717
import replication.ProtocolMessage.Payload
@@ -61,7 +61,7 @@ class KeyValueReplica(
6161
}
6262

6363
class Cluster {
64-
@volatile var state: ClusterState = MultiPaxos.empty
64+
@volatile var state: ClusterState = PipePaxos.empty
6565

6666
given Lattice[Payload[ClusterState]] =
6767
given Lattice[Int] = Lattice.fromOrdering
@@ -90,7 +90,7 @@ class KeyValueReplica(
9090
assert(changed == state)
9191
// else log(s"upkept: ${pprint(upkept)}")
9292
val newState = publish(upkept)
93-
maybeAnswerClient(old.rounds.counter)
93+
maybeAnswerClient(old.nextDecisionRound)
9494
// try to propose a new value in case voting is decided
9595
maybeProposeNewValue(client.state)
9696
}
@@ -102,7 +102,7 @@ class KeyValueReplica(
102102
val oldstate = state
103103
state = state.merge(delta)
104104
dataManager.applyDelta(delta)
105-
maybeAnswerClient(oldstate.rounds.counter)
105+
maybeAnswerClient(oldstate.nextDecisionRound)
106106
} else {
107107
log("skip")
108108
}
@@ -125,20 +125,23 @@ class KeyValueReplica(
125125
peers.minOption match
126126
case Some(id) if id == uid =>
127127
log(s"Proposing election of $uid")
128-
publish(state.startLeaderElection): Unit
128+
if state.openRounds.isEmpty then
129+
publish(state.addRound())
130+
()
131+
publish(state.startLeaderElection)
132+
()
129133
case _ => ()
130134
}
131135

132136
def maybeProposeNewValue(client: ClientState)(using LocalUid): Unit = {
133137
// check if we are the leader and ready to handle a request
134-
if state.leader.contains(replicaId) && state.phase == MultipaxosPhase.Idle then
138+
if state.leader.contains(replicaId) then
135139
// ready to propose value
136-
client.firstUnansweredRequest match
137-
case Some(req) =>
138-
log(s"Proposing new value $req.")
139-
val _ = publish(state.proposeIfLeader(req))
140-
case None =>
141-
log("I am the leader but request queue is empty.")
140+
for req <- client.sortedUnansweredRequests do {
141+
log(s"Proposing new value $req.")
142+
publish(state.proposeIfLeader(req))
143+
}
144+
142145
}
143146

144147
private def maybeAnswerClient(previousRound: Time): Unit = {

Modules/Protocol Benchmarks/src/main/scala/probench/data/Data.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import probench.data.RequestResponseQueue.Req
44
import rdts.base.{Lattice, LocalUid}
55
import rdts.datatypes.LastWriterWins
66
import rdts.protocols.Participants
7-
import rdts.protocols.paper.MultiPaxos
7+
import rdts.protocols.paper.{MultiPaxos, PipePaxos}
88

99
enum KVOperation[Key, Value] {
1010
def key: Key
@@ -14,7 +14,7 @@ enum KVOperation[Key, Value] {
1414
}
1515

1616
type ConnInformation = Map[LocalUid, LastWriterWins[Long]]
17-
type ClusterState = MultiPaxos[Req[KVOperation[String, String]]]
17+
type ClusterState = PipePaxos[Req[KVOperation[String, String]]]
1818
type ClientState = RequestResponseQueue[KVOperation[String, String], String]
1919

2020
case class KVState(

Modules/Protocol Benchmarks/src/main/scala/probench/data/RequestResponseQueue.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ case class RequestResponseQueue[S, T](
4949
def firstUnansweredRequest: Option[Req[S]] =
5050
timestampsSorted.headOption.flatMap(requests.get)
5151

52+
def sortedUnansweredRequests: Seq[Req[S]] = timestampsSorted.flatMap(requests.get)
53+
5254
private def timestampsSorted: List[Timestamp] =
5355
requests.keySet.toList.sorted
5456

Modules/Protocol Benchmarks/src/test/scala/probench/ClusterConsensus.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,9 @@ class ClusterConsensus extends munit.FunSuite {
9595

9696
nodes.foreach(noUpkeep)
9797

98-
assertEquals(nodes(0).cluster.state.log(3).value, KVOperation.Read("test2"))
99-
assertEquals(nodes(2).cluster.state.log.size, 2)
98+
99+
assertEquals(nodes(0).cluster.state.closedRounds(3).value, KVOperation.Read("test2"))
100+
assertEquals(nodes(2).cluster.state.closedRounds.size, 2)
100101

101102
}
102103
test("consensus with one node") {
@@ -131,12 +132,12 @@ class ClusterConsensus extends munit.FunSuite {
131132
client.read("test")
132133

133134
assertEquals(primary.client.state.requestsSorted, List.empty)
134-
assertEquals(primary.cluster.state.read.length, 2)
135+
assertEquals(primary.cluster.state.closedRounds.size, 2)
135136

136137
for n <- Range(0, 1000) do
137138
client.write(n.toString, "value")
138139

139-
assertEquals(primary.cluster.state.read.length, 1002)
140+
assertEquals(primary.cluster.state.closedRounds.size, 1002)
140141

141142
def investigateUpkeep(state: ClusterState)(using LocalUid) = {
142143
val delta = state.upkeep

Modules/RDTs/src/main/scala/rdts/protocols/paper/Paxos.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,16 @@ case class Paxos[A](
2020
rounds: Map[BallotNum, PaxosRound[A]] =
2121
Map.empty[BallotNum, PaxosRound[A]]
2222
) {
23+
24+
def phase(using Participants): MultipaxosPhase = currentRound match
25+
case None => MultipaxosPhase.LeaderElection
26+
case Some(PaxosRound(leaderElection, _)) if leaderElection.result.isEmpty => MultipaxosPhase.LeaderElection
27+
case Some(PaxosRound(leaderElection, proposals))
28+
if leaderElection.result.nonEmpty && proposals.votes.nonEmpty => MultipaxosPhase.Voting
29+
case Some(PaxosRound(leaderElection, proposals))
30+
if leaderElection.result.nonEmpty && proposals.votes.isEmpty => MultipaxosPhase.Idle
31+
case _ => throw new Error("Inconsistent Paxos State")
32+
2333
// voting
2434
def voteLeader(leader: Uid)(using
2535
LocalUid

Modules/RDTs/src/main/scala/rdts/protocols/paper/PipePaxos.scala

Lines changed: 58 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -23,65 +23,78 @@ case class PipePaxos[A](
2323
):
2424

2525
// private helper functions
26-
private lazy val openRounds = log.collect { case (k, ClosingCons.Open(paxos)) => (k, paxos) }
27-
private lazy val maxPaxos: (round: Time, paxos: Paxos[A]) = openRounds.maxBy(_._1)
28-
lazy val nextDecisionRound: Long = openRounds.minBy(_._1)._1
26+
lazy val openRounds = log.collect { case (k, ClosingCons.Open(paxos)) => (k, paxos) }
27+
private lazy val maxPaxos: (round: Time, paxos: Paxos[A]) = openRounds.maxByOption(_._1).getOrElse((-1, Paxos[A]()))
28+
lazy val nextDecisionRound: Long = openRounds.minByOption(_._1).map(_._1).getOrElse(-1)
29+
lazy val maxRound = log.keys.maxOption.getOrElse(-1L)
30+
def nextIdleRound(using Participants): Option[(round: Time, paxos: Paxos[A])] =
31+
openRounds.find((_, p) => p.phase == MultipaxosPhase.Idle)
32+
33+
lazy val closedRounds: Map[Long, A] = log.collect { case (k, ClosingCons.Done(value)) => (k, value) }
34+
35+
def phase(using Participants) = maxPaxos.paxos.phase
2936

3037
// public API
3138
def leader(using Participants): Option[Uid] = maxPaxos.paxos.currentRound match
3239
case Some(PaxosRound(leaderElection, _)) => leaderElection.result
3340
case None => None
3441

35-
def phase(using Participants): MultipaxosPhase =
36-
maxPaxos.paxos.currentRound match
37-
case None => MultipaxosPhase.LeaderElection
38-
case Some(PaxosRound(leaderElection, _)) if leaderElection.result.isEmpty => MultipaxosPhase.LeaderElection
39-
case Some(PaxosRound(leaderElection, proposals))
40-
if leaderElection.result.nonEmpty && proposals.votes.nonEmpty => MultipaxosPhase.Voting
41-
case Some(PaxosRound(leaderElection, proposals))
42-
if leaderElection.result.nonEmpty && proposals.votes.isEmpty => MultipaxosPhase.Idle
43-
case _ => throw new Error("Inconsistent Paxos State")
44-
4542
def readDecisionsSince(time: Time): Iterable[A] =
46-
NumericRange(time, nextDecisionRound, 1L).view.flatMap(log.get).collect{case Done(value) => value}
43+
NumericRange(time, nextDecisionRound, 1L).view.flatMap(log.get).collect { case Done(value) => value }
4744

4845
def startLeaderElection(using LocalUid): PipePaxos[A] =
4946
PipePaxos(openRounds.view.mapValues(v => ClosingCons.Open(v.phase1a)).toMap)
5047

5148
def proposeIfLeader(value: A)(using LocalUid, Participants): PipePaxos[A] =
52-
PipePaxos(
53-
Map(maxPaxos.round -> Open(maxPaxos.paxos.phase2a(value)))
54-
) // phase 2a already checks if I am the leader
55-
56-
def upkeep(using LocalUid, Participants): PipePaxos[A] = PipePaxos {
57-
openRounds.map { (roundNumber, paxos) =>
58-
// perform upkeep in Paxos
59-
val deltaPaxos = paxos.upkeep()
60-
val newPaxos = paxos.merge(deltaPaxos)
61-
62-
(newPaxos.result, newPaxos.newestBallotWithLeader) match
63-
case (Some(decision), Some((ballotNum, PaxosRound(leaderElection, _)))) =>
64-
// we are voting on proposals and there is a decision
65-
66-
val newDecision = Map(roundNumber -> Done(decision))
67-
if roundNumber == maxPaxos.round then
68-
// create new Paxos where leader is already elected
69-
val newPaxos = Paxos(rounds =
70-
Map(ballotNum -> PaxosRound(
71-
leaderElection = leaderElection,
72-
proposals = Voting[A]()
73-
))
74-
)
75-
newDecision.updated(roundNumber + 1, Open(newPaxos))
76-
else newDecision
77-
case _ =>
78-
// nothing to do, return upkeep result
79-
Map(roundNumber -> Open(deltaPaxos))
80-
}.foldLeft(Map.empty[Long, ClosingCons[A]])(Lattice.merge[Map[Long, ClosingCons[A]]])
49+
nextIdleRound match {
50+
case Some(round) =>
51+
PipePaxos(
52+
Map(round.round -> Open(round.paxos.phase2a(value)))
53+
) // phase 2a already checks if I am the leader
54+
case None =>
55+
val rounded = addRound()
56+
val proposed = rounded.maxPaxos.paxos.phase2a(value)
57+
rounded `merge` PipePaxos(Map(rounded.maxRound -> Open(proposed)))
58+
59+
}
60+
61+
def addRound()(using Participants): PipePaxos[A] = {
62+
val maybeshortcut = maxPaxos.paxos.newestBallotWithLeader match {
63+
case Some((ballotNum, PaxosRound(leaderElection, _))) =>
64+
Paxos(rounds =
65+
Map(ballotNum -> PaxosRound(
66+
leaderElection = leaderElection,
67+
proposals = Voting[A]()
68+
))
69+
)
70+
case _ => Paxos[A]()
71+
}
72+
PipePaxos(Map((maxPaxos.round + 1) -> Open(maybeshortcut)))
8173
}
8274

75+
def upkeep(using LocalUid, Participants): PipePaxos[A] =
76+
val res = PipePaxos {
77+
openRounds.map { (roundNumber, paxos) =>
78+
// perform upkeep in Paxos
79+
val deltaPaxos = paxos.upkeep()
80+
val newPaxos = paxos.merge(deltaPaxos)
81+
82+
newPaxos.result match
83+
case Some(decision) =>
84+
// we are voting on proposals and there is a decision
85+
Map(roundNumber -> Done(decision))
86+
case _ =>
87+
// nothing to do, return upkeep result
88+
Map(roundNumber -> Open(deltaPaxos))
89+
}.foldLeft(Map.empty[Long, ClosingCons[A]])(Lattice.merge[Map[Long, ClosingCons[A]]])
90+
}
91+
// keep an open round to remember leaderelection
92+
if res.openRounds.nonEmpty
93+
then res
94+
else
95+
res `merge` addRound()
96+
8397
object PipePaxos:
84-
def empty[A]: PipePaxos[A] = PipePaxos[A](Map(0L -> Open(Paxos())))
98+
def empty[A]: PipePaxos[A] = PipePaxos[A](Map.empty)
8599

86100
given [A] => Lattice[PipePaxos[A]] = Lattice.derived
87-

Modules/RDTs/src/test/scala/test/rdts/protocols/PipePaxosTest.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ class PipePaxosTest extends munit.FunSuite {
1919
var testPaxosObject = emptyPaxosObject
2020

2121
assertEquals(testPaxosObject.leader, None)
22-
assertEquals(testPaxosObject.phase, MultipaxosPhase.LeaderElection, "multipaxos starts in leader election phase")
2322

2423
val proposeValue = 1
2524
// replica 1 tries to become leader
25+
testPaxosObject = testPaxosObject.merge(testPaxosObject.addRound())
2626
testPaxosObject = testPaxosObject.merge(testPaxosObject.startLeaderElection(using id1))
2727

2828
// testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep(using id1))
@@ -70,11 +70,11 @@ class PipePaxosTest extends munit.FunSuite {
7070
}
7171

7272
test("conflicting proposals") {
73-
var testPaxosObject = emptyPaxosObject
73+
var testPaxosObject = emptyPaxosObject.addRound()
7474

7575
// replicas 1 and 2 try to become leader
76-
var rep1 = emptyPaxosObject.merge(emptyPaxosObject.startLeaderElection(using id1))
77-
var rep2 = emptyPaxosObject.merge(emptyPaxosObject.startLeaderElection(using id2))
76+
var rep1 = testPaxosObject.merge(testPaxosObject.startLeaderElection(using id1))
77+
var rep2 = testPaxosObject.merge(testPaxosObject.startLeaderElection(using id2))
7878

7979
// sync
8080
testPaxosObject = testPaxosObject.merge(rep1).merge(rep2)

0 commit comments

Comments
 (0)