Skip to content
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ changes.

- **BREAKING** Enable handling client recover in all head states.
- See [Issue #1812](https://github.com/cardano-scaling/hydra/issues/1812) and [PR #2217](https://github.com/cardano-scaling/hydra/pull/2217).
> This enables clients (e.g. the TUI) to fully recover after event-log rotation.
- The Checkpoint event, and consequently the EventLogRotated server output, now carry the full NodeState instead of just the HeadState.

- Optimistic approach to statefile corruption by just ignoring invalid JSON
lines [#2253](https://github.com/cardano-scaling/hydra/issues/2253)
Expand Down
1 change: 1 addition & 0 deletions hydra-node/hydra-node.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ library
Hydra.Node.Network
Hydra.Node.ParameterMismatch
Hydra.Node.Run
Hydra.Node.State
Hydra.Node.Util
Hydra.Options
Hydra.Persistence
Expand Down
2 changes: 1 addition & 1 deletion hydra-node/src/Hydra/API/HTTPServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ import Hydra.Cardano.Api (Coin, LedgerEra, PolicyAssets, PolicyId, Tx)
import Hydra.Chain (Chain (..), PostTxError (..), draftCommitTx)
import Hydra.Chain.ChainState (IsChainState)
import Hydra.Chain.Direct.State ()
import Hydra.HeadLogic.State (NodeState (..))
import Hydra.Ledger (ValidationError (..))
import Hydra.Logging (Tracer, traceWith)
import Hydra.Node.ApiTransactionTimeout (ApiTransactionTimeout (..))
import Hydra.Node.DepositPeriod (toNominalDiffTime)
import Hydra.Node.Environment (Environment (..))
import Hydra.Node.State (NodeState (..))
import Hydra.Tx (CommitBlueprintTx (..), ConfirmedSnapshot, IsTx (..), Snapshot (..), UTxOType)
import Network.HTTP.Types (ResponseHeaders, hContentType, status200, status202, status400, status404, status500)
import Network.Wai (Application, Request (pathInfo, requestMethod), Response, consumeRequestBodyStrict, rawPathInfo, responseLBS)
Expand Down
4 changes: 1 addition & 3 deletions hydra-node/src/Hydra/API/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,18 @@ import Hydra.Chain.ChainState (IsChainState)
import Hydra.Chain.Direct.State ()
import Hydra.Events (EventSink (..), EventSource (..))
import Hydra.HeadLogic (
Deposit (..),
HeadState (..),
InitialState (..),
NodeState (..),
OpenState (..),
aggregateNodeState,
)
import Hydra.HeadLogic.Outcome qualified as StateChanged
import Hydra.HeadLogic.State (initNodeState)
import Hydra.HeadLogic.StateEvent (StateEvent (..))
import Hydra.Logging (Tracer, traceWith)
import Hydra.Network (IP, PortNumber)
import Hydra.Node.ApiTransactionTimeout (ApiTransactionTimeout)
import Hydra.Node.Environment (Environment)
import Hydra.Node.State (Deposit (..), NodeState (..), initNodeState)
import Hydra.Tx (IsTx (..), Party, txId)
import Network.HTTP.Types (status500)
import Network.Wai (responseLBS)
Expand Down
3 changes: 2 additions & 1 deletion hydra-node/src/Hydra/API/ServerOutput.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import Data.ByteString.Lazy qualified as LBS
import Hydra.API.ClientInput (ClientInput)
import Hydra.Chain (PostChainTx, PostTxError)
import Hydra.Chain.ChainState (ChainStateType, IsChainState)
import Hydra.HeadLogic.State (ClosedState (..), HeadState (..), InitialState (..), NodeState, OpenState (..), SeenSnapshot (..))
import Hydra.HeadLogic.State (ClosedState (..), HeadState (..), InitialState (..), OpenState (..), SeenSnapshot (..))
import Hydra.HeadLogic.State qualified as HeadState
import Hydra.Ledger (ValidationError)
import Hydra.Network (Host, ProtocolVersion)
import Hydra.Node.Environment (Environment (..))
import Hydra.Node.State (NodeState)
import Hydra.Prelude hiding (seq)
import Hydra.Tx (HeadId, Party, Snapshot, SnapshotNumber, getSnapshot)
import Hydra.Tx qualified as Tx
Expand Down
3 changes: 2 additions & 1 deletion hydra-node/src/Hydra/API/WSServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ import Hydra.Chain.ChainState (
IsChainState,
)
import Hydra.Chain.Direct.State ()
import Hydra.HeadLogic (ClosedState (ClosedState, readyToFanoutSent), HeadState, InitialState (..), NodeState (..), OpenState (..), StateChanged)
import Hydra.HeadLogic (ClosedState (ClosedState, readyToFanoutSent), HeadState, InitialState (..), OpenState (..), StateChanged)
import Hydra.HeadLogic.State qualified as HeadState
import Hydra.Logging (Tracer, traceWith)
import Hydra.NetworkVersions qualified as NetworkVersions
import Hydra.Node.Environment (Environment (..))
import Hydra.Node.State (NodeState (..))
import Hydra.Tx (HeadId, Party)
import Network.WebSockets (
PendingConnection (pendingRequest),
Expand Down
109 changes: 59 additions & 50 deletions hydra-node/src/Hydra/HeadLogic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,11 @@ import Hydra.HeadLogic.State (
ClosedState (..),
Committed,
CoordinatedHeadState (..),
Deposit (..),
DepositStatus (..),
HeadState (..),
IdleState (IdleState, chainState),
InitialState (..),
NodeState (..),
OpenState (..),
PendingCommits,
PendingDeposits,
SeenSnapshot (..),
getChainState,
seenSnapshotNumber,
Expand All @@ -83,6 +79,7 @@ import Hydra.Network qualified as Network
import Hydra.Network.Message (Message (..), NetworkEvent (..))
import Hydra.Node.DepositPeriod (DepositPeriod (..))
import Hydra.Node.Environment (Environment (..), mkHeadParameters)
import Hydra.Node.State (Deposit (..), DepositStatus (..), NodeState (..), PendingDeposits, depositsForHead)
import Hydra.Tx (
HeadId,
HeadSeed,
Expand Down Expand Up @@ -925,39 +922,18 @@ onOpenNetworkReqDec env ledger ttl currentSlot openState decommitTx =
, coordinatedHeadState
} = openState

-- | Process the chain (and time) advancing in an open head.
-- | Process the chain (and time) advancing in any head state.
--
-- __Transition__: 'OpenState' → 'OpenState'
-- __Transition__: No transition
--
-- This is primarily used to track deposits and either drop them or request
-- snapshots for inclusion.
onOpenChainTick :: IsTx tx => Environment -> PendingDeposits tx -> OpenState tx -> UTCTime -> Outcome tx
onOpenChainTick env pendingDeposits st chainTime =
-- This is primarily used to track deposits status changes.
onChainTick :: IsTx tx => Environment -> PendingDeposits tx -> UTCTime -> Outcome tx
onChainTick env pendingDeposits chainTime =
-- Determine new active and new expired
updateDeposits $ \newActive newExpired ->
-- Emit state change for both
-- XXX: This is a bit messy
((mkDepositActivated newActive <> mkDepositExpired newExpired) <>) $
-- Apply state changes and pick next active to request snapshot
-- XXX: This is smelly as we rely on Map <> to override entries (left
-- biased). This is also weird because we want to actually apply the state
-- change and also to determine the next active.
withNextActive (newActive <> newExpired <> pendingDeposits) $ \depositTxId ->
-- REVIEW: this is not really a wait, but discard?
-- TODO: Spec: wait tx𝜔 = ⊥ ∧ 𝑈𝛼 = ∅
if isNothing decommitTx
&& isNothing currentDepositTxId
&& not snapshotInFlight
&& isLeader parameters party nextSn
then
-- XXX: This state update has no equivalence in the
-- spec. Do we really need to store that we have
-- requested a snapshot? If yes, should update spec.
newState SnapshotRequestDecided{snapshotNumber = nextSn}
-- Spec: multicast (reqSn,̂ 𝑣,̄ 𝒮.𝑠 + 1,̂ 𝒯, 𝑈𝛼, ⊥)
<> cause (NetworkEffect $ ReqSn version nextSn (txId <$> localTxs) Nothing (Just depositTxId))
else
noop
mkDepositActivated newActive <> mkDepositExpired newExpired
where
updateDeposits cont =
uncurry cont $ Map.foldlWithKey updateDeposit (mempty, mempty) pendingDeposits
Expand All @@ -979,6 +955,43 @@ onOpenChainTick env pendingDeposits st chainTime =

plusTime = flip addUTCTime

mkDepositActivated m = changes . (`Map.foldMapWithKey` m) $ \depositTxId deposit ->
pure DepositActivated{depositTxId, chainTime, deposit}

mkDepositExpired m = changes . (`Map.foldMapWithKey` m) $ \depositTxId deposit ->
pure DepositExpired{depositTxId, chainTime, deposit}

Environment{depositPeriod} = env

-- | Process the chain (and time) advancing in an open head.
--
-- __Transition__: 'OpenState' → 'OpenState'
--
-- This is primarily used to track deposits and either drop them or request
-- snapshots for inclusion.
onOpenChainTick :: IsTx tx => Environment -> PendingDeposits tx -> OpenState tx -> Outcome tx
onOpenChainTick env pendingDeposits st =
-- Apply state changes and pick next active to request snapshot
-- XXX: This is smelly as we rely on Map <> to override entries (left
-- biased). This is also weird because we want to actually apply the state
-- change and also to determine the next active.
withNextActive pendingDeposits $ \depositTxId ->
-- REVIEW: this is not really a wait, but discard?
-- TODO: Spec: wait tx𝜔 = ⊥ ∧ 𝑈𝛼 = ∅
if isNothing decommitTx
&& isNothing currentDepositTxId
&& not snapshotInFlight
&& isLeader parameters party nextSn
then
-- XXX: This state update has no equivalence in the
-- spec. Do we really need to store that we have
-- requested a snapshot? If yes, should update spec.
newState SnapshotRequestDecided{snapshotNumber = nextSn}
-- Spec: multicast (reqSn,̂ 𝑣,̄ 𝒮.𝑠 + 1,̂ 𝒯, 𝑈𝛼, ⊥)
<> cause (NetworkEffect $ ReqSn version nextSn (txId <$> localTxs) Nothing (Just depositTxId))
else
noop
where
-- REVIEW! check what if there are more than 1 new active deposit
-- What is the sorting criteria to pick next?
withNextActive :: forall tx. (Eq (UTxOType tx), Monoid (UTxOType tx)) => Map (TxIdType tx) (Deposit tx) -> (TxIdType tx -> Outcome tx) -> Outcome tx
Expand All @@ -988,15 +1001,9 @@ onOpenChainTick env pendingDeposits st chainTime =
p (_, Deposit{deposited, status}) = deposited /= mempty && status == Active
maybe noop (cont . fst) . find p $ Map.toList deposits

mkDepositActivated m = changes . (`Map.foldMapWithKey` m) $ \depositTxId deposit ->
pure DepositActivated{depositTxId, chainTime, deposit}

mkDepositExpired m = changes . (`Map.foldMapWithKey` m) $ \depositTxId deposit ->
pure DepositExpired{depositTxId, chainTime, deposit}

nextSn = confirmedSn + 1

Environment{party, depositPeriod} = env
Environment{party} = env

CoordinatedHeadState
{ localTxs
Expand Down Expand Up @@ -1354,10 +1361,10 @@ update env ledger NodeState{headState = st, pendingDeposits, currentSlot} ev = c
onOpenClientNewTx tx
(Open openState, NetworkInput ttl (ReceivedMessage{msg = ReqTx tx})) ->
onOpenNetworkReqTx env ledger currentSlot openState ttl tx
(Open openState, NetworkInput _ (ReceivedMessage{sender, msg = ReqSn sv sn txIds decommitTx depositTxId})) ->
onOpenNetworkReqSn env ledger pendingDeposits currentSlot openState sender sv sn txIds decommitTx depositTxId
(Open openState, NetworkInput _ (ReceivedMessage{sender, msg = AckSn snapshotSignature sn})) ->
onOpenNetworkAckSn env pendingDeposits openState sender snapshotSignature sn
(Open openState@OpenState{headId = ourHeadId}, NetworkInput _ (ReceivedMessage{sender, msg = ReqSn sv sn txIds decommitTx depositTxId})) ->
onOpenNetworkReqSn env ledger (depositsForHead ourHeadId pendingDeposits) currentSlot openState sender sv sn txIds decommitTx depositTxId
(Open openState@OpenState{headId = ourHeadId}, NetworkInput _ (ReceivedMessage{sender, msg = AckSn snapshotSignature sn})) ->
onOpenNetworkAckSn env (depositsForHead ourHeadId pendingDeposits) openState sender snapshotSignature sn
( Open openState@OpenState{headId = ourHeadId}
, ChainInput Observation{observedTx = OnCloseTx{headId, snapshotNumber = closedSnapshotNumber, contestationDeadline}, newChainState}
)
Expand All @@ -1378,17 +1385,13 @@ update env ledger NodeState{headState = st, pendingDeposits, currentSlot} ev = c
onOpenClientDecommit headId ledger currentSlot coordinatedHeadState decommitTx
(Open openState, NetworkInput ttl (ReceivedMessage{msg = ReqDec{transaction}})) ->
onOpenNetworkReqDec env ledger ttl currentSlot openState transaction
(Open OpenState{headId = ourHeadId}, ChainInput Observation{observedTx = OnDepositTx{headId, depositTxId, deposited, created, deadline}, newChainState})
| ourHeadId == headId ->
newState DepositRecorded{chainState = newChainState, headId, depositTxId, deposited, created, deadline}
| otherwise ->
Error NotOurHead{ourHeadId, otherHeadId = headId}
(Open openState@OpenState{}, ChainInput Tick{chainTime, chainSlot}) ->
(Open openState@OpenState{headId = ourHeadId}, ChainInput Tick{chainTime, chainSlot}) ->
-- XXX: We originally forgot the normal TickObserved state event here and so
-- time did not advance in an open head anymore. This is a hint that we
-- should compose event handling better.
newState TickObserved{chainSlot}
<> onOpenChainTick env pendingDeposits openState chainTime
<> onChainTick env pendingDeposits chainTime
<> onOpenChainTick env (depositsForHead ourHeadId pendingDeposits) openState
(Open openState@OpenState{headId = ourHeadId}, ChainInput Observation{observedTx = OnIncrementTx{headId, newVersion, depositTxId}, newChainState})
| ourHeadId == headId ->
onOpenChainIncrementTx openState newChainState newVersion depositTxId
Expand All @@ -1409,6 +1412,7 @@ update env ledger NodeState{headState = st, pendingDeposits, currentSlot} ev = c
(Closed ClosedState{contestationDeadline, readyToFanoutSent, headId}, ChainInput Tick{chainTime, chainSlot})
| chainTime > contestationDeadline && not readyToFanoutSent ->
newState TickObserved{chainSlot}
<> onChainTick env pendingDeposits chainTime
<> newState HeadIsReadyToFanout{headId}
(Closed closedState, ClientInput Fanout) ->
onClosedClientFanout closedState
Expand All @@ -1418,15 +1422,18 @@ update env ledger NodeState{headState = st, pendingDeposits, currentSlot} ev = c
| otherwise ->
Error NotOurHead{ourHeadId, otherHeadId = headId}
-- Node-level
(_, ChainInput Observation{observedTx = OnDepositTx{headId, depositTxId, deposited, created, deadline}, newChainState}) ->
newState DepositRecorded{chainState = newChainState, headId, depositTxId, deposited, created, deadline}
(_, ClientInput Recover{recoverTxId}) -> do
onClientRecover currentSlot pendingDeposits recoverTxId
(_, ChainInput Observation{observedTx = OnRecoverTx{headId, recoveredTxId, recoveredUTxO}, newChainState}) ->
newState DepositRecovered{chainState = newChainState, headId, depositTxId = recoveredTxId, recovered = recoveredUTxO}
-- General
(_, ChainInput Rollback{rolledBackChainState}) ->
newState ChainRolledBack{chainState = rolledBackChainState}
(_, ChainInput Tick{chainSlot}) ->
(_, ChainInput Tick{chainTime, chainSlot}) ->
newState TickObserved{chainSlot}
<> onChainTick env pendingDeposits chainTime
(_, ChainInput PostTxError{postChainTx, postTxError}) ->
cause . ClientEffect $ ServerOutput.PostTxOnChainFailed{postChainTx, postTxError}
(_, ClientInput{clientInput}) ->
Expand Down Expand Up @@ -1482,6 +1489,8 @@ aggregateNodeState nodeState sc =
}
TickObserved{chainSlot} ->
ns{currentSlot = chainSlot}
ChainRolledBack{chainState} ->
ns{currentSlot = chainStateSlot chainState}
_ -> ns

-- * HeadState aggregate
Expand Down
2 changes: 1 addition & 1 deletion hydra-node/src/Hydra/HeadLogic/Outcome.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import Hydra.API.ServerOutput (ClientMessage, DecommitInvalidReason)
import Hydra.Chain (PostChainTx)
import Hydra.Chain.ChainState (ChainSlot, ChainStateType, IsChainState)
import Hydra.HeadLogic.Error (LogicError)
import Hydra.HeadLogic.State (Deposit, NodeState)
import Hydra.Ledger (ValidationError)
import Hydra.Network (Host, ProtocolVersion)
import Hydra.Network.Message (Message)
import Hydra.Node.Environment (Environment (..), mkHeadParameters)
import Hydra.Node.State (Deposit, NodeState)
import Hydra.Tx (
HeadId,
HeadParameters,
Expand Down
59 changes: 1 addition & 58 deletions hydra-node/src/Hydra/HeadLogic/State.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module Hydra.HeadLogic.State where
import Hydra.Prelude

import Data.Map qualified as Map
import Hydra.Chain.ChainState (ChainSlot, IsChainState (..))
import Hydra.Chain.ChainState (IsChainState (..))
import Hydra.Tx (
HeadId,
HeadParameters,
Expand All @@ -23,36 +23,6 @@ import Hydra.Tx.Snapshot (
SnapshotNumber,
SnapshotVersion,
)
import Test.QuickCheck (recursivelyShrink)

type PendingDeposits tx = Map (TxIdType tx) (Deposit tx)

-- FIXME: move to a dedicated module (maybe with deposits too?)
data NodeState tx = NodeState
{ headState :: HeadState tx
, pendingDeposits :: PendingDeposits tx
-- ^ Pending deposits as observed on chain.
-- TODO: could even move the chain state here (also see todo below)
-- , chainState :: ChainStateType tx
, currentSlot :: ChainSlot
}
deriving stock (Generic)

instance (ArbitraryIsTx tx, Arbitrary (ChainStateType tx)) => Arbitrary (NodeState tx) where
arbitrary = genericArbitrary

deriving stock instance (IsTx tx, Eq (ChainStateType tx)) => Eq (NodeState tx)
deriving stock instance (IsTx tx, Show (ChainStateType tx)) => Show (NodeState tx)
deriving anyclass instance (IsTx tx, ToJSON (ChainStateType tx)) => ToJSON (NodeState tx)
deriving anyclass instance (IsTx tx, FromJSON (ChainStateType tx)) => FromJSON (NodeState tx)

initNodeState :: IsChainState tx => ChainStateType tx -> NodeState tx
initNodeState chainState =
NodeState
{ headState = Idle IdleState{chainState}
, pendingDeposits = mempty
, currentSlot = chainStateSlot chainState
}

-- | The main state of the Hydra protocol state machine. It holds both, the
-- overall protocol state, but also the off-chain 'CoordinatedHeadState'.
Expand Down Expand Up @@ -249,33 +219,6 @@ seenSnapshotNumber = \case
RequestedSnapshot{lastSeen} -> lastSeen
SeenSnapshot{snapshot = Snapshot{number}} -> number

-- | A deposit tracked by the protocol. The 'DepositStatus' determines whether
-- it may be used for an incremental commit or not.
data Deposit tx = Deposit
{ headId :: HeadId
, deposited :: UTxOType tx
, created :: UTCTime
, deadline :: UTCTime
, status :: DepositStatus
}
deriving (Generic)

deriving stock instance IsTx tx => Eq (Deposit tx)
deriving stock instance IsTx tx => Show (Deposit tx)
deriving anyclass instance IsTx tx => ToJSON (Deposit tx)
deriving anyclass instance IsTx tx => FromJSON (Deposit tx)

instance ArbitraryIsTx tx => Arbitrary (Deposit tx) where
arbitrary = genericArbitrary
shrink = recursivelyShrink

data DepositStatus = Inactive | Active | Expired
deriving (Generic, Eq, Show, ToJSON, FromJSON)

instance Arbitrary DepositStatus where
arbitrary = genericArbitrary
shrink = genericShrink

-- ** Closed

-- | An 'Closed' head with an current candidate 'ConfirmedSnapshot', which may
Expand Down
2 changes: 1 addition & 1 deletion hydra-node/src/Hydra/HeadLogic/StateEvent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ module Hydra.HeadLogic.StateEvent where

import Hydra.Chain.ChainState (IsChainState)
import Hydra.Events (EventId, HasEventId (..))
import Hydra.HeadLogic (NodeState)
import Hydra.HeadLogic.Outcome (StateChanged (Checkpoint))
import Hydra.Node.State (NodeState)
import Hydra.Prelude
import Hydra.Tx (ArbitraryIsTx)

Expand Down
Loading