@@ -33,11 +33,12 @@ import Plutus.V1.Ledger.Api (toBuiltin)
3333import Control.Concurrent.STM (STM )
3434import qualified Control.Concurrent.STM as STM
3535import Control.Lens
36- import Control.Monad (forM_ , unless , void , when )
36+ import Control.Monad (forM_ , void , when )
3737import Data.Foldable (foldl' )
3838import Ledger.TimeSlot (SlotConfig )
3939import Plutus.ChainIndex (BlockNumber (.. ), ChainIndexTx (.. ), ChainIndexTxOutputs (.. ),
40- InsertUtxoSuccess (.. ), RollbackResult (.. ), Tip (.. ),
40+ InsertUtxoFailed (.. ), InsertUtxoSuccess (.. ),
41+ RollbackFailed (.. ), RollbackResult (.. ), Tip (.. ),
4142 TxConfirmedState (.. ), TxIdState (.. ), TxValidity (.. ),
4243 UtxoState (.. ), blockId , citxTxId , fromOnChainTx , insert )
4344import Plutus.ChainIndex.Compatibility (fromCardanoBlockHeader , fromCardanoPoint )
@@ -57,13 +58,18 @@ startNodeClient socket mode slotConfig networkId instancesState = do
5758 case mode of
5859 MockNode ->
5960 void $ MockClient. runChainSync socket slotConfig
60- (\ block slot -> STM. atomically $ processMockBlock instancesState env block slot)
61+ (\ block slot -> handleSyncAction $ processMockBlock instancesState env block slot)
6162 AlonzoNode -> do
6263 let resumePoints = []
6364 void $ Client. runChainSync socket slotConfig networkId resumePoints
64- (\ block slot -> STM. atomically $ processChainSyncEvent env block slot)
65+ (\ block slot -> handleSyncAction $ processChainSyncEvent env block slot)
6566 pure env
6667
68+ -- | Deal with sync action failures from running this STM action. For now, we
69+ -- deal with them by simply calling `error`; i.e. the application exits.
70+ handleSyncAction :: STM (Either SyncActionFailure () ) -> IO ()
71+ handleSyncAction action = STM. atomically action >>= either (error . show ) pure
72+
6773updateInstances :: IndexedBlock -> InstanceClientEnv -> STM ()
6874updateInstances IndexedBlock {ibUtxoSpent, ibUtxoProduced} InstanceClientEnv {ceUtxoSpentRequests, ceUtxoProducedRequests} = do
6975 forM_ (Map. intersectionWith (,) ibUtxoSpent ceUtxoSpentRequests) $ \ (onChainTx, requests) ->
@@ -72,23 +78,29 @@ updateInstances IndexedBlock{ibUtxoSpent, ibUtxoProduced} InstanceClientEnv{ceUt
7278 traverse (\ OpenTxOutProducedRequest {otxProducingTxns} -> STM. tryPutTMVar otxProducingTxns txns) requests
7379
7480-- | Process a chain sync event that we receive from the alonzo node client
75- processChainSyncEvent :: BlockchainEnv -> ChainSyncEvent -> Slot -> STM ()
76- processChainSyncEvent blockchainEnv event _slot = case event of
77- Resume _ -> pure () -- TODO: Handle resume
78- RollForward (BlockInMode (C. Block header transactions) era) -> processBlock header blockchainEnv transactions era
79- RollBackward chainPoint -> runRollback blockchainEnv chainPoint
81+ processChainSyncEvent :: BlockchainEnv -> ChainSyncEvent -> Slot -> STM (Either SyncActionFailure () )
82+ processChainSyncEvent blockchainEnv event _slot = do
83+ case event of
84+ Resume _ -> pure $ Right () -- TODO: Handle resume
85+ RollForward (BlockInMode (C. Block header transactions) era) -> processBlock header blockchainEnv transactions era
86+ RollBackward chainPoint -> runRollback blockchainEnv chainPoint
87+
88+ data SyncActionFailure
89+ = RollbackFailure RollbackFailed
90+ | InsertUtxoStateFailure InsertUtxoFailed
91+ deriving (Show )
8092
8193-- | Roll back the chain to the given ChainPoint and slot.
82- runRollback :: BlockchainEnv -> ChainPoint -> STM ()
94+ runRollback :: BlockchainEnv -> ChainPoint -> STM (Either SyncActionFailure () )
8395runRollback BlockchainEnv {beTxChanges} chainPoint = do
8496 txIdStateIndex <- STM. readTVar beTxChanges
8597
8698 let point = fromCardanoPoint chainPoint
8799 rs = rollback point txIdStateIndex
88100
89101 case rs of
90- Left e -> error $ " Rollback Failed: " <> show e
91- Right RollbackResult {rolledBackIndex} -> STM. writeTVar beTxChanges rolledBackIndex
102+ Left e -> pure $ Left ( RollbackFailure e)
103+ Right RollbackResult {rolledBackIndex} -> Right <$> STM. writeTVar beTxChanges rolledBackIndex
92104
93105-- | Get transaction ID and validity from a cardano transaction in any era
94106txEvent :: forall era . C. Tx era -> C. EraInMode era C. CardanoMode -> (TxId , TxValidity )
@@ -108,11 +120,17 @@ txMockEvent tx =
108120
109121-- | Update the blockchain env. with changes from a new block of cardano
110122-- transactions in any era
111- processBlock :: forall era . C. BlockHeader -> BlockchainEnv -> [C. Tx era ] -> C. EraInMode era C. CardanoMode -> STM ()
123+ processBlock :: forall era . C. BlockHeader
124+ -> BlockchainEnv
125+ -> [C. Tx era ]
126+ -> C. EraInMode era C. CardanoMode
127+ -> STM (Either SyncActionFailure () )
112128processBlock header env transactions era =
113- unless (null transactions) $ do
114- let tip = fromCardanoBlockHeader header
115- updateTransactionState tip env (flip txEvent era <$> transactions)
129+ if null transactions
130+ then pure $ Right ()
131+ else do
132+ let tip = fromCardanoBlockHeader header
133+ updateTransactionState tip env (flip txEvent era <$> transactions)
116134
117135-- | For the given transactions, perform the updates in the TxIdState, and
118136-- also record that a new block has been processed.
@@ -121,19 +139,19 @@ updateTransactionState
121139 => Tip
122140 -> BlockchainEnv
123141 -> t (TxId , TxValidity )
124- -> STM ()
142+ -> STM (Either SyncActionFailure () )
125143updateTransactionState tip BlockchainEnv {beTxChanges, beCurrentBlock} xs = do
126144 txIdStateIndex <- STM. readTVar beTxChanges
127145 let txIdState = _usTxUtxoData $ measure $ txIdStateIndex
128146 blockNumber <- STM. readTVar beCurrentBlock
129147 let txIdState' = foldl' (insertNewTx blockNumber) txIdState xs
130148 is = insert (UtxoState txIdState' tip) txIdStateIndex
131149 case is of
132- -- TODO: Proper error.
133- Left e -> error $ " Insert of new TxIdState failed. " <> show e
134- Right InsertUtxoSuccess {newIndex = newTxIdState} -> STM. writeTVar beTxChanges newTxIdState
135- STM. writeTVar beCurrentBlock ( succ blockNumber )
136-
150+ Right InsertUtxoSuccess {newIndex = newTxIdState} -> do
151+ STM. writeTVar beTxChanges newTxIdState
152+ STM. writeTVar beCurrentBlock ( succ blockNumber)
153+ pure $ Right ( )
154+ Left e -> pure $ Left $ InsertUtxoStateFailure e
137155
138156insertNewTx :: BlockNumber -> TxIdState -> (TxId , TxValidity ) -> TxIdState
139157insertNewTx blockNumber TxIdState {txnsConfirmed, txnsDeleted} (txi, txValidity) =
@@ -150,26 +168,30 @@ insertNewTx blockNumber TxIdState{txnsConfirmed, txnsDeleted} (txi, txValidity)
150168
151169-- | Go through the transactions in a block, updating the 'BlockchainEnv'
152170-- when any interesting addresses or transactions have changed.
153- processMockBlock :: InstancesState -> BlockchainEnv -> Block -> Slot -> STM ()
171+ processMockBlock :: InstancesState -> BlockchainEnv -> Block -> Slot -> STM (Either SyncActionFailure () )
154172processMockBlock instancesState env@ BlockchainEnv {beAddressMap, beCurrentSlot, beCurrentBlock} transactions slot = do
155173 lastSlot <- STM. readTVar beCurrentSlot
156174 when (slot > lastSlot) $ do
157175 STM. writeTVar beCurrentSlot slot
158- unless (null transactions) $ do
159- addressMap <- STM. readTVar beAddressMap
160- let addressMap' = foldl' (processTx slot) addressMap transactions
161- STM. writeTVar beAddressMap addressMap'
162- blockNumber <- STM. readTVar beCurrentBlock
163176
164- let tip = Tip { tipSlot = slot
165- , tipBlockId = blockId transactions
166- , tipBlockNo = blockNumber
167- }
177+ if null transactions
178+ then pure $ Right ()
179+ else do
180+ addressMap <- STM. readTVar beAddressMap
181+ let addressMap' = foldl' (processTx slot) addressMap transactions
182+ STM. writeTVar beAddressMap addressMap'
183+ blockNumber <- STM. readTVar beCurrentBlock
184+
185+ instEnv <- S. instancesClientEnv instancesState
186+ updateInstances (indexBlock $ fmap fromOnChainTx transactions) instEnv
187+
168188
169- updateTransactionState tip env (txMockEvent <$> fmap fromOnChainTx transactions)
189+ let tip = Tip { tipSlot = slot
190+ , tipBlockId = blockId transactions
191+ , tipBlockNo = blockNumber
192+ }
170193
171- instEnv <- S. instancesClientEnv instancesState
172- updateInstances (indexBlock $ fmap fromOnChainTx transactions) instEnv
194+ updateTransactionState tip env (txMockEvent <$> fmap fromOnChainTx transactions)
173195
174196processTx :: Slot -> AddressMap -> OnChainTx -> AddressMap
175197processTx _ addressMap tx = addressMap' where
0 commit comments