Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ object QuantizedTime {

given Ordering[QuantizedFiniteDuration] with {
override def compare(self: QuantizedFiniteDuration, other: QuantizedFiniteDuration): Int = {
// Whether this "require" is needed is up to semantic interpretation.
// Whether this "required" is needed is up to semantic interpretation.
// I'm choosing to include it because in our particular case such a comparison would almost certainly be a
// programming error, and it is not a priori given what should happen if the instants being compared as "close"
// within their respective quantization window.
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/hydrozoa/multisig/ledger/DappLedgerM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ object DappLedgerM {
final case class State(
treasury: MultisigTreasuryUtxo,
// TODO: Queue[(EventId, DepositUtxo, RefundTx.PostDated)]
// Deposits according to the total order of events
// (specifically, NOT according to the deposit absorption start time)
deposits: Queue[(LedgerEventId, DepositUtxo)] = Queue()
) {
def appendToQueue(t: (LedgerEventId, DepositUtxo)): State =
Expand Down
37 changes: 34 additions & 3 deletions src/main/scala/hydrozoa/multisig/ledger/JointLedger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.suprnation.actor.Actor.{Actor, Receive}
import com.suprnation.actor.ActorRef.ActorRef
import com.suprnation.typelevel.actors.syntax.BroadcastOps
import hydrozoa.config.head.HeadConfig
import hydrozoa.config.head.multisig.timing.TxTiming
import hydrozoa.config.node.owninfo.OwnHeadPeerPrivate
import hydrozoa.lib.actor.*
import hydrozoa.lib.cardano.scalus.QuantizedTime.QuantizedInstant
Expand All @@ -26,7 +27,7 @@ import hydrozoa.multisig.ledger.joint.obligation.Payout
import hydrozoa.multisig.ledger.virtual.commitment.KzgCommitment.KzgCommitment
import hydrozoa.multisig.ledger.virtual.tx.{GenesisObligation, L2Genesis}
import monocle.Focus.focus
import scala.collection.immutable.Queue
import scala.collection.immutable.{Queue, TreeMap}
import scala.math.Ordered.orderingToOrdered
import scalus.cardano.ledger.{AssetName, TransactionHash, TransactionInput}
import scalus.uplc.builtin.{ByteString, platform}
Expand Down Expand Up @@ -255,7 +256,7 @@ final case class JointLedger(
*/

def partitionDeposits(
deposits: Queue[(LedgerEventId, DepositUtxo)],
depositStream: Queue[(LedgerEventId, DepositUtxo)],
blockStartTime: QuantizedInstant,
settlementValidityEnd: QuantizedInstant
): IO[
Expand All @@ -267,6 +268,8 @@ final case class JointLedger(
] = for {
_ <- IO.pure(())

deposits = sortDepositStream(depositStream, config.txTiming)

_ = logger.trace(
s"partitionDeposits: deposits: ${deposits.map(_._1)}, " +
s"blockStartTime=$blockStartTime, " +
Expand Down Expand Up @@ -485,7 +488,7 @@ final case class JointLedger(
producing <- unsafeGetProducing

ret <- partitionDeposits(
deposits = producing.dappLedgerState.deposits,
depositStream = producing.dappLedgerState.deposits,
blockStartTime = producing.startTime,
settlementValidityEnd =
txTiming.newSettlementEndTime(producing.competingFallbackValidityStart)
Expand Down Expand Up @@ -657,6 +660,33 @@ final case class JointLedger(
*/
object JointLedger {

/** @param depositStream
* The stream of deposits sorted according to the total ordering.
* @param txTiming
* @return
* The same stream, re-ordered according to absorption start time, with ties broken according
* to the ordering in which they appear in depositStream. The result is that if deposits are
* grouped into sequences [s1, s2, ...], according to identical absorption start time, each
* sequence s_i will be a (possibly non-contiguous) subsequence of depositStream.
*/
def sortDepositStream(
depositStream: Queue[(LedgerEventId, DepositUtxo)],
txTiming: TxTiming
): Queue[(LedgerEventId, DepositUtxo)] = {
val intermediateMap: TreeMap[QuantizedInstant, Queue[(LedgerEventId, DepositUtxo)]] =
depositStream
.foldLeft(TreeMap.empty[QuantizedInstant, Queue[(LedgerEventId, DepositUtxo)]])(
(acc, deposit) =>
acc.updatedWith(
txTiming.depositAbsorptionStartTime(deposit._2.submissionDeadline)
) {
case None => Some(Queue(deposit))
case Some(queue) => Some(queue.appended(deposit))
}
)
intermediateMap.foldLeft(Queue.empty) { case (q, (_, deposits)) => q ++ deposits }
}

type Handle = ActorRef[IO, Requests.Request]

type Config = HeadConfig.Section & OwnHeadPeerPrivate.Section
Expand Down Expand Up @@ -742,4 +772,5 @@ object JointLedger {
)
)
)

}
179 changes: 166 additions & 13 deletions src/test/scala/hydrozoa/multisig/ledger/JointLedgerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,45 @@ import cats.*
import cats.data.*
import cats.effect.*
import cats.effect.unsafe.implicits.*
import cats.syntax.all.*
import cats.implicits.*
import com.suprnation.actor.Actor.{Actor, Receive}
import com.suprnation.actor.ActorRef.ActorRef
import com.suprnation.actor.{ActorSystem, test as _}
import hydrozoa.config.head.HeadPeersSpec.Exact
import hydrozoa.config.head.multisig.timing.TxTiming
import hydrozoa.config.head.network.CardanoNetwork
import hydrozoa.config.head.peers.{TestPeers, generateTestPeers}
import hydrozoa.config.node.{NodeConfig, generateNodeConfig}
import hydrozoa.lib.cardano.scalus.QuantizedTime.*
import hydrozoa.lib.cardano.scalus.QuantizedTime.QuantizedInstant.realTimeQuantizedInstant
import hydrozoa.multisig.consensus.ConsensusActor
import hydrozoa.multisig.consensus.ConsensusActor.Request
import hydrozoa.multisig.consensus.peer.HeadPeerNumber
import hydrozoa.multisig.ledger.JointLedger.Requests.{CompleteBlockFinal, CompleteBlockRegular, StartBlock}
import hydrozoa.multisig.ledger.JointLedger.{Done, Producing}
import hydrozoa.multisig.ledger.JointLedger.{Done, Producing, sortDepositStream}
import hydrozoa.multisig.ledger.JointLedgerTestHelpers.*
import hydrozoa.multisig.ledger.JointLedgerTestHelpers.Requests.*
import hydrozoa.multisig.ledger.JointLedgerTestHelpers.Scenarios.*
import hydrozoa.multisig.ledger.block.{Block, BlockBrief, BlockNumber, BlockVersion}
import hydrozoa.multisig.ledger.dapp.tx.genDepositUtxo
import hydrozoa.multisig.ledger.dapp.txseq.DepositRefundTxSeq
import hydrozoa.multisig.ledger.dapp.utxo.DepositUtxo
import hydrozoa.multisig.ledger.event.LedgerEvent.DepositEvent
import hydrozoa.multisig.ledger.event.LedgerEventId
import hydrozoa.multisig.ledger.event.LedgerEventId.ValidityFlag.{Invalid, Valid}
import hydrozoa.multisig.ledger.event.{LedgerEventId, LedgerEventNumber}
import hydrozoa.multisig.ledger.virtual.commitment.KzgCommitment
import hydrozoa.multisig.ledger.virtual.tx.{GenesisObligation, L2Genesis}
import java.util.concurrent.TimeUnit
import org.scalacheck.*
import org.scalacheck.Prop.propBoolean
import org.scalacheck.PropertyM.monadForPropM
import org.scalacheck.rng.Seed
import org.scalacheck.util.Pretty
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.{Failure, Success, Try}
import scalus.cardano.address.ShelleyAddress
import scalus.cardano.ledger.{Block as _, BlockHeader as _, Coin, *}
import scalus.uplc.builtin.ByteString
import scalus.|>
Expand Down Expand Up @@ -323,14 +331,163 @@ object JointLedgerTestHelpers {
}
}

// Annoyingly, `Gen` doesn't have `Monad[Gen]` already. But I want to use `traverse` below, so I'm vendoring it here
implicit val genMonad: Monad[Gen] = new Monad[Gen] {
def pure[A](a: A): Gen[A] = Gen.const(a)
def flatMap[A, B](fa: Gen[A])(f: A => Gen[B]): Gen[B] = fa.flatMap(f)
def tailRecM[A, B](a: A)(f: A => Gen[Either[A, B]]): Gen[B] =
flatMap(f(a)) {
case Left(a1) => tailRecM(a1)(f)
case Right(b) => pure(b)
}
}

object JointLedgerTest extends Properties("Joint Ledger Test") {
override def overrideParameters(p: Test.Parameters): Test.Parameters =
p.withInitialSeed(Seed.fromBase64("_cO3yONWJeBMtw0XJ1i1kDMLpecMm_13gd0CRL2142J=").get)

import TestM.*

override def overrideParameters(p: Test.Parameters): Test.Parameters = {
p
.withMinSuccessfulTests(100)
}
// We can observe three test statistics:
// - Whether or not ties are occurring
// - Whether or not the sorting is trivial due to events being pre - sorted
// - Whether or not the sorting is trivial due < 2 events
//
// The properties we check:
// - Deposits are indeed sorted according to start time
// - Sorting does not change the number of elements
// - The sequences of deposits grouped by the same start times are all sub-sequences of the unsorted stream.
val _ = property("deposit sorting") = run(
// FIXME: initializer is too bulky, we can reduce it significantly
initializer = defaultInitializer,
testM = {
// FIXME: These generators are quite inefficient.

// monotonic sequence, duplicates may occur
val genMonotonic: Gen[List[Int]] = Gen.listOf(Gen.choose(0, 1000)).map(_.sorted)
val genStrictMonotonic: Gen[List[Int]] = genMonotonic.map(_.distinct)
def genLedgerEventIdsStrictMonotonic(
headPeerNum: HeadPeerNumber
): Gen[Queue[LedgerEventId]] =
for {
monotonic <- genStrictMonotonic
} yield Queue.from(
monotonic.map(i => LedgerEventId(headPeerNum, LedgerEventNumber(i)))
)

def genPeerDeposits(
headPeerNum: HeadPeerNumber,
config: CardanoNetwork.Section & TxTiming.Section,
headAddress: ShelleyAddress
): Gen[Queue[(LedgerEventId, DepositUtxo)]] =
for {
eventIds <- genLedgerEventIdsStrictMonotonic(headPeerNum)
deposits <- Gen.listOfN(
eventIds.length,
genDepositUtxo(
config = config,
headAddress = Some(headAddress)
)
)
} yield eventIds.zip(deposits)

def genInterleaved[A](queues: Vector[Queue[A]]): Gen[Queue[A]] =
Gen.tailRecM(queues -> Queue.empty[A]) { case (remaining, acc) =>
val nonEmpty = remaining.filter(_.nonEmpty)
if nonEmpty.isEmpty then Gen.const(Right(acc))
else
Gen.choose(0, nonEmpty.length - 1).map { idx =>
val (head, tail) = nonEmpty(idx).dequeue
Left(nonEmpty.updated(idx, tail) -> (acc :+ head))
}
}

def genEventStream(
config: CardanoNetwork.Section & TxTiming.Section,
headAddress: ShelleyAddress
): Gen[Queue[(LedgerEventId, DepositUtxo)]] =
for {
headPeerNumbers <- for {
n <- Gen.choose(2, 100)
} yield Vector.range(0, n).map(HeadPeerNumber(_))
peerDeposits <- headPeerNumbers.traverse(n =>
genPeerDeposits(n, config, headAddress)
)
eventStream <- genInterleaved(peerDeposits)
} yield eventStream

// order-preserving, non-contiguous matching;
// isSubsequenceOf( List(1, 2, 3), List(1, 6, 3, 2, 3, 7, 1)) == true
@tailrec
def isSubsequenceOf[A](sub: Seq[A], seq: Seq[A]): Boolean = {
if sub.isEmpty then true
else
Try(seq.dropWhile(_ != sub.head).tail) match {
case Success(nextSeq) => isSubsequenceOf(sub.tail, nextSeq)
case Failure(_) => false
}
}

for {
env <- ask[TestR]
eventStream <- pick(genEventStream(env.config, env.config.headMultisigAddress))
sorted = sortDepositStream(eventStream, env.config.txTiming)
startTimeGroups = sorted
.groupBy((id, deposit) =>
env.config.txTiming
.depositAbsorptionStartTime(deposit.submissionDeadline)
)
.values

// Test statistic: make sure that ties are actually occurring in some samples
_ <- lift(PropertyM.monitor[IO](Prop.collect {
if eventStream.length <= 1
then "events.length <= 1 (trivial case)"
else "events.length > 1 (non-trivial case)"
}))

// Test statistic: make sure that ties are actually occurring in some samples
_ <- lift(PropertyM.monitor[IO](Prop.collect {
val collectionSizes = startTimeGroups.map(_.length)
if collectionSizes.isEmpty then "no duplicate start times"
else "some duplicate start times"
}))

// Test statistic: the sorted and unsorted streams are different
_ <- lift(PropertyM.monitor[IO](Prop.collect {
if sorted == eventStream then "event stream pre-sorted"
else "event stream not pre-sorted"
}))

_ <- assertWith(
msg = "Deposits are sorted by absorption start time",
condition = {
val startTimes = sorted.map(deposit =>
env.config.txTiming.depositAbsorptionStartTime(
deposit._2.submissionDeadline
)
)
startTimes.sorted == startTimes
}
)

_ <- assertWith(
msg = "Sorting does not change the number of elements",
condition = sorted.length == eventStream.length
)

_ <- assertWith(
msg =
"If multiple deposits have the same absorption start time, order of the sorted deposits must be" +
" a subsequence of the event stream",
condition = {
startTimeGroups.forall(groupQueue => isSubsequenceOf(groupQueue, eventStream))
}
)
} yield true

}
)

val _ = property("Joint Ledger Happy Path") = run(
initializer = defaultInitializer,
Expand Down Expand Up @@ -545,9 +702,7 @@ object JointLedgerTest extends Properties("Joint Ledger Test") {

// TODO: This property is disabled by lazy, since a deposit cannot be registered and absorbed in
// in the same block; requires updating.
lazy val _ = property(
"Absorbs deposit: absoprtion starts at block start time"
) = run(
lazy val _ = property("Absorbs deposit: absoprtion starts at block start time") = run(
initializer = defaultInitializer,
testM = for {
env <- ask[TestR]
Expand Down Expand Up @@ -607,9 +762,7 @@ object JointLedgerTest extends Properties("Joint Ledger Test") {

// TODO: This property is disabled by lazy, since a deposit cannot be registered and absorbed in
// in the same block; requires updating.
lazy val _ = property(
"Absorbs deposit: absorprtion starts 1s after block start time"
) = run(
lazy val _ = property("Absorbs deposit: absorprtion starts 1s after block start time") = run(
initializer = defaultInitializer,
testM = for {
env <- ask[TestR]
Expand Down
2 changes: 2 additions & 0 deletions src/test/scala/test/TestM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,6 @@ object TestM {
def lift[R, A](e: IO[A]): TestM[R, A] =
TestM(Kleisli.liftF(PropertyM.run(e)))

def lift[R, A](propertyM: PropertyM[IO, A]): TestM[R, A] = TestM(Kleisli.liftF(propertyM))

}