Skip to content

Commit b16b3f8

Browse files
authored
Permissive-reading of the state file (#2255)
This change allows us to read the state file even in the presence of malformed JSON. This is a kind of "best-effort" hack; we don't ever _expect_ malformed JSON; but if we find it, the best thing we can try to do is ignore it and see if things still work correctly. That's what we do here; when we find an invalid parse we emit a warning. Fixes #2253 **Todo** - [x] Maybe add a test to check that we get the expected log msg - [x] Changelog --- <!-- Consider each and tick it off one way or the other --> * [x] CHANGELOG updated or not needed * [x] Documentation updated or not needed * [x] Haddocks updated or not needed * [x] No new TODOs introduced or explained herafter
2 parents 09ee2ba + 81d729e commit b16b3f8

File tree

9 files changed

+107
-40
lines changed

9 files changed

+107
-40
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ changes.
4242
- **BREAKING** Enable handling client recover in all head states.
4343
- See [Issue #1812](https://github.com/cardano-scaling/hydra/issues/1812) and [PR #2217](https://github.com/cardano-scaling/hydra/pull/2217).
4444

45+
- Optimistic approach to statefile corruption by just ignoring invalid JSON
46+
lines [#2253](https://github.com/cardano-scaling/hydra/issues/2253)
47+
4548
## [0.22.4] - 2025-08-05
4649

4750
- Fix API not correctly handling event log rotation. This was evident in not

hydra-node/hydra-node.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ library
102102
Hydra.Node.Util
103103
Hydra.Options
104104
Hydra.Persistence
105+
Hydra.PersistenceLog
105106
Hydra.Utils
106107

107108
other-modules: Paths_hydra_node

hydra-node/src/Hydra/Logging/Messages.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ import Hydra.Chain.Direct.Handlers (CardanoChainLog)
1515
import Hydra.Node (HydraNodeLog)
1616
import Hydra.Node.Network (NetworkLog)
1717
import Hydra.Options (RunOptions)
18+
import Hydra.PersistenceLog (PersistenceLog)
1819

1920
data HydraLog tx
2021
= DirectChain {directChain :: CardanoChainLog}
2122
| APIServer {api :: APIServerLog}
2223
| Network {network :: NetworkLog}
2324
| Node {node :: HydraNodeLog tx}
2425
| NodeOptions {runOptions :: RunOptions}
26+
| Persistence {persistenceLog :: PersistenceLog}
2527
| EnteringMainloop
2628
deriving stock (Generic)
2729

hydra-node/src/Hydra/Node/Run.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ run opts = do
9595
eventStore@EventStore{eventSource} <-
9696
prepareEventStore
9797
=<< mkFileBasedEventStore stateFile
98-
=<< createPersistenceIncremental stateFile
98+
=<< createPersistenceIncremental (contramap Persistence tracer) stateFile
9999
-- NOTE: Add any custom sinks here
100100
let eventSinks :: [EventSink (StateEvent Tx) IO] = []
101101
wetHydraNode <- hydrate (contramap Node tracer) env ledger initialChainState eventStore eventSinks

hydra-node/src/Hydra/Persistence.hs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import Conduit (
88
ConduitT,
99
MonadUnliftIO,
1010
ResourceT,
11+
concatC,
1112
linesUnboundedAsciiC,
1213
mapMC,
1314
runResourceT,
@@ -18,6 +19,8 @@ import Conduit (
1819
import Control.Monad.Trans.Resource (allocate)
1920
import Data.Aeson qualified as Aeson
2021
import Data.ByteString qualified as BS
22+
import Hydra.Logging (Tracer, traceWith)
23+
import Hydra.PersistenceLog (PersistenceLog (..))
2124
import System.Directory (createDirectoryIfMissing, doesFileExist)
2225
import System.FilePath (takeDirectory)
2326
import UnliftIO (MVar, newMVar, putMVar, takeMVar, withMVar)
@@ -82,9 +85,10 @@ createPersistenceIncremental ::
8285
, MonadThrow m
8386
, FromJSON a
8487
) =>
88+
Tracer IO PersistenceLog ->
8589
FilePath ->
8690
m (PersistenceIncremental a m)
87-
createPersistenceIncremental fp = do
91+
createPersistenceIncremental tracer fp = do
8892
liftIO . createDirectoryIfMissing True $ takeDirectory fp
8993
mutex <- newMVar ()
9094
pure $
@@ -97,6 +101,14 @@ createPersistenceIncremental fp = do
97101
, source = source mutex
98102
}
99103
where
104+
-- Maybe read the next item from persistence; or, if we failed to
105+
-- decode it, we will emit a warning.
106+
maybeDecode :: ByteString -> ResourceT m (Maybe a)
107+
maybeDecode bs = case Aeson.eitherDecodeStrict' bs of
108+
Left e -> do
109+
liftIO $ traceWith tracer $ FailedToDecodeJson{reason = show e, filepath = fp, contents = show bs}
110+
pure Nothing
111+
Right decoded -> pure (Just decoded)
100112
source :: forall i. MVar () -> ConduitT i a (ResourceT m) ()
101113
source mutex = do
102114
liftIO (doesFileExist fp) >>= \case
@@ -108,12 +120,5 @@ createPersistenceIncremental fp = do
108120
-- NOTE: Read, decode and yield values line by line.
109121
sourceFileBS fp
110122
.| linesUnboundedAsciiC
111-
.| mapMC
112-
( \bs ->
113-
case Aeson.eitherDecodeStrict' bs of
114-
Left e ->
115-
lift . throwIO $
116-
PersistenceException $
117-
"Error when decoding from file " <> fp <> ": " <> show e <> "\n" <> show bs
118-
Right decoded -> pure decoded
119-
)
123+
.| mapMC maybeDecode
124+
.| concatC
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
module Hydra.PersistenceLog where
2+
3+
import Data.Aeson (defaultOptions, genericParseJSON, genericToJSON, tagSingleConstructors)
4+
import Hydra.Prelude
5+
6+
data PersistenceLog
7+
= FailedToDecodeJson {reason :: String, filepath :: FilePath, contents :: String}
8+
deriving stock (Eq, Show, Generic)
9+
10+
-- Note: Specific Aeson instances, so that we don't hide the tags when emitting
11+
-- this log.
12+
instance ToJSON PersistenceLog where
13+
toJSON =
14+
genericToJSON
15+
defaultOptions
16+
{ tagSingleConstructors = True
17+
}
18+
19+
instance FromJSON PersistenceLog where
20+
parseJSON =
21+
genericParseJSON
22+
defaultOptions
23+
{ tagSingleConstructors = True
24+
}

hydra-node/test/Hydra/Events/FileBasedSpec.hs

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import Hydra.HeadLogic (StateChanged)
1616
import Hydra.HeadLogic.StateEvent (StateEvent (..))
1717
import Hydra.Ledger.Cardano (Tx)
1818
import Hydra.Ledger.Simple (SimpleTx)
19+
import Hydra.Logging (Verbosity (Verbose), withTracer)
1920
import Hydra.Persistence (PersistenceIncremental (..), createPersistenceIncremental)
2021
import Test.Aeson.GenericSpecs (
2122
defaultSettings,
@@ -80,17 +81,18 @@ spec = do
8081
forAllShrink genContinuousEvents shrink $ \events -> do
8182
ioProperty $ do
8283
withTempDir "hydra-persistence" $ \tmpDir -> do
83-
let stateDir = tmpDir <> "/data"
84-
PersistenceIncremental{append} <- createPersistenceIncremental stateDir
85-
forM_ events append
86-
-- Load and store events through the event source interface
87-
EventStore{eventSource = src, eventSink = EventSink{putEvent}} <-
88-
mkFileBasedEventStore stateDir =<< createPersistenceIncremental stateDir
89-
loadedEvents <- getEvents src
90-
-- Store all loaded events like the node would do
91-
forM_ loadedEvents putEvent
92-
pure $
93-
loadedEvents === events
84+
withTracer (Verbose "hydra-persistence") $ \tracer -> do
85+
let stateDir = tmpDir <> "/data"
86+
PersistenceIncremental{append} <- createPersistenceIncremental tracer stateDir
87+
forM_ events append
88+
-- Load and store events through the event source interface
89+
EventStore{eventSource = src, eventSink = EventSink{putEvent}} <-
90+
mkFileBasedEventStore stateDir =<< createPersistenceIncremental tracer stateDir
91+
loadedEvents <- getEvents src
92+
-- Store all loaded events like the node would do
93+
forM_ loadedEvents putEvent
94+
pure $
95+
loadedEvents === events
9496

9597
genContinuousEvents :: Gen [StateEvent SimpleTx]
9698
genContinuousEvents =
@@ -99,6 +101,7 @@ genContinuousEvents =
99101
withEventSourceAndSink :: (EventSource (StateEvent SimpleTx) IO -> EventSink (StateEvent SimpleTx) IO -> IO b) -> IO b
100102
withEventSourceAndSink action =
101103
withTempDir "hydra-persistence" $ \tmpDir -> do
102-
let stateDir = tmpDir <> "/data"
103-
EventStore{eventSource, eventSink} <- mkFileBasedEventStore stateDir =<< createPersistenceIncremental stateDir
104-
action eventSource eventSink
104+
withTracer (Verbose "hydra-persistence") $ \tracer -> do
105+
let stateDir = tmpDir <> "/data"
106+
EventStore{eventSource, eventSink} <- mkFileBasedEventStore stateDir =<< createPersistenceIncremental tracer stateDir
107+
action eventSource eventSink

hydra-node/test/Hydra/PersistenceSpec.hs

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,13 @@ import Test.Hydra.Prelude
88
import Data.Aeson (Value (..))
99
import Data.Aeson qualified as Aeson
1010
import Data.Text qualified as Text
11+
import Hydra.Logging (Envelope (..), Verbosity (Verbose), withTracer)
1112
import Hydra.Persistence (Persistence (..), PersistenceIncremental (..), createPersistence, createPersistenceIncremental, loadAll)
13+
import Hydra.PersistenceLog
1214
import Test.QuickCheck (checkCoverage, cover, elements, oneof, suchThat, (===))
1315
import Test.QuickCheck.Gen (listOf)
1416
import Test.QuickCheck.Monadic (monadicIO, monitor, pick, run)
17+
import Test.Util (captureTracer)
1518

1619
spec :: Spec
1720
spec = do
@@ -35,12 +38,25 @@ spec = do
3538
pure $ actualResult === Just item
3639

3740
describe "PersistenceIncremental" $ do
38-
it "can handle empty files" $ do
41+
it "can ignore invalid lines and emits warning" $ do
3942
withTempDir "hydra-persistence" $ \tmpDir -> do
43+
(tracer, getTraces) <- captureTracer "persistence-incremental"
4044
let fp = tmpDir <> "/data"
41-
writeFileBS fp ""
42-
p <- createPersistenceIncremental fp
43-
loadAll p `shouldReturn` ([] :: [Aeson.Value])
45+
writeFileBS fp "\"abc\"\n{\"xyz\": "
46+
p <- createPersistenceIncremental tracer fp
47+
loadAll p `shouldReturn` ([Aeson.String "abc"] :: [Aeson.Value])
48+
traces <- getTraces
49+
let rightMsg [Envelope{message = FailedToDecodeJson{}}] = True
50+
rightMsg _ = False
51+
traces `shouldSatisfy` rightMsg
52+
53+
it "can handle empty files" $ do
54+
withTracer (Verbose "persistence-incremental") $ \tracer -> do
55+
withTempDir "hydra-persistence" $ \tmpDir -> do
56+
let fp = tmpDir <> "/data"
57+
writeFileBS fp ""
58+
p <- createPersistenceIncremental tracer fp
59+
loadAll p `shouldReturn` ([] :: [Aeson.Value])
4460

4561
it "is consistent after multiple append calls in presence of new-lines" $
4662
checkCoverage $
@@ -50,24 +66,26 @@ spec = do
5066
monitor (cover 10 (containsNewLine items) "some item contains a new line")
5167

5268
actualResult <- run $
53-
withTempDir "hydra-persistence" $ \tmpDir -> do
54-
p <- createPersistenceIncremental $ tmpDir <> "/data"
55-
forM_ items $ append p
56-
loadAll p
69+
withTracer (Verbose "persistence-incremental") $ \tracer -> do
70+
withTempDir "hydra-persistence" $ \tmpDir -> do
71+
p <- createPersistenceIncremental tracer $ tmpDir <> "/data"
72+
forM_ items $ append p
73+
loadAll p
5774
pure $ actualResult === items
5875

5976
it "it cannot load from a different thread once having started appending" $
6077
monadicIO $ do
6178
items <- pick $ listOf genPersistenceItem
6279
moreItems <- pick $ listOf genPersistenceItem `suchThat` ((> 2) . length)
6380
pure $
64-
withTempDir "hydra-persistence" $ \tmpDir -> do
65-
p <- createPersistenceIncremental $ tmpDir <> "/data"
66-
forM_ items $ append p
67-
loadAll p `shouldReturn` items
68-
raceLabelled_
69-
("forever-load-all", forever $ threadDelay 0.01 >> loadAll p)
70-
("append-more-items", forM_ moreItems $ \item -> append p item >> threadDelay 0.01)
81+
withTracer (Verbose "persistence-incremental") $ \tracer -> do
82+
withTempDir "hydra-persistence" $ \tmpDir -> do
83+
p <- createPersistenceIncremental tracer $ tmpDir <> "/data"
84+
forM_ items $ append p
85+
loadAll p `shouldReturn` items
86+
raceLabelled_
87+
("forever-load-all", forever $ threadDelay 0.01 >> loadAll p)
88+
("append-more-items", forM_ moreItems $ \item -> append p item >> threadDelay 0.01)
7189

7290
genPersistenceItem :: Gen Aeson.Value
7391
genPersistenceItem =

hydra-node/test/Test/Util.hs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import Data.Aeson (encode)
2121
import Data.Aeson qualified as Aeson
2222
import Data.Text qualified as Text
2323
import Hydra.Ledger.Simple (SimpleTx)
24+
import Hydra.Logging (Envelope (..), traceInTVar)
2425
import Hydra.Network (NetworkCallback (..))
2526
import Hydra.Node (HydraNodeLog)
2627
import System.IO.Temp (writeSystemTempFile)
@@ -175,3 +176,13 @@ waitMatch waitNext delay match = do
175176

176177
align _ [] = []
177178
align n (h : q) = h : fmap (Text.replicate n " " <>) q
179+
180+
-- | Create a tracer that captures all messages and a function to retrieve all
181+
-- traces captured.
182+
-- XXX: This is duplicated in MithrilSpec in hydra-cluster, but can't (easily)
183+
-- be moved to the Test Prelude because of the dependency on Hydra.Logging.
184+
captureTracer :: Text -> IO (Tracer IO a, IO [Envelope a])
185+
captureTracer namespace = do
186+
traces <- newLabelledTVarIO "capture-tracer" []
187+
let tracer = traceInTVar traces namespace
188+
pure (tracer, readTVarIO traces)

0 commit comments

Comments
 (0)