Skip to content

Commit 0e8b108

Browse files
committed
Permissive-reading of the state file
This change allows us to read the state file even in the presence of malformed JSON. This is a kind of "best-effort" hack; we don't ever _expect_ malformed JSON; but if we find it, the best thing we can try to do is ignore it and see if things still work correctly. That's what we do here; when we find an invalid parse we emit a warning.
1 parent 09ee2ba commit 0e8b108

File tree

7 files changed

+95
-41
lines changed

7 files changed

+95
-41
lines changed

hydra-node/hydra-node.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ library
102102
Hydra.Node.Util
103103
Hydra.Options
104104
Hydra.Persistence
105+
Hydra.PersistenceLog
105106
Hydra.Utils
106107

107108
other-modules: Paths_hydra_node

hydra-node/src/Hydra/Logging/Messages.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ import Hydra.Chain.Direct.Handlers (CardanoChainLog)
1515
import Hydra.Node (HydraNodeLog)
1616
import Hydra.Node.Network (NetworkLog)
1717
import Hydra.Options (RunOptions)
18+
import Hydra.PersistenceLog (PersistenceLog)
1819

1920
data HydraLog tx
2021
= DirectChain {directChain :: CardanoChainLog}
2122
| APIServer {api :: APIServerLog}
2223
| Network {network :: NetworkLog}
2324
| Node {node :: HydraNodeLog tx}
2425
| NodeOptions {runOptions :: RunOptions}
26+
| Persistence {persistenceLog :: PersistenceLog}
2527
| EnteringMainloop
2628
deriving stock (Generic)
2729

hydra-node/src/Hydra/Node/Run.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ run opts = do
9595
eventStore@EventStore{eventSource} <-
9696
prepareEventStore
9797
=<< mkFileBasedEventStore stateFile
98-
=<< createPersistenceIncremental stateFile
98+
=<< createPersistenceIncremental (contramap Persistence tracer) stateFile
9999
-- NOTE: Add any custom sinks here
100100
let eventSinks :: [EventSink (StateEvent Tx) IO] = []
101101
wetHydraNode <- hydrate (contramap Node tracer) env ledger initialChainState eventStore eventSinks

hydra-node/src/Hydra/Persistence.hs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import Conduit (
88
ConduitT,
99
MonadUnliftIO,
1010
ResourceT,
11+
concatC,
1112
linesUnboundedAsciiC,
1213
mapMC,
1314
runResourceT,
@@ -18,6 +19,8 @@ import Conduit (
1819
import Control.Monad.Trans.Resource (allocate)
1920
import Data.Aeson qualified as Aeson
2021
import Data.ByteString qualified as BS
22+
import Hydra.Logging (Tracer, traceWith)
23+
import Hydra.PersistenceLog (PersistenceLog (..))
2124
import System.Directory (createDirectoryIfMissing, doesFileExist)
2225
import System.FilePath (takeDirectory)
2326
import UnliftIO (MVar, newMVar, putMVar, takeMVar, withMVar)
@@ -82,9 +85,10 @@ createPersistenceIncremental ::
8285
, MonadThrow m
8386
, FromJSON a
8487
) =>
88+
Tracer IO PersistenceLog ->
8589
FilePath ->
8690
m (PersistenceIncremental a m)
87-
createPersistenceIncremental fp = do
91+
createPersistenceIncremental tracer fp = do
8892
liftIO . createDirectoryIfMissing True $ takeDirectory fp
8993
mutex <- newMVar ()
9094
pure $
@@ -97,6 +101,14 @@ createPersistenceIncremental fp = do
97101
, source = source mutex
98102
}
99103
where
104+
-- Maybe read the next item from persistence; or, if we failed to
105+
-- decode it, we will emit a warning.
106+
maybeDecode :: ByteString -> ResourceT m (Maybe a)
107+
maybeDecode bs = case Aeson.eitherDecodeStrict' bs of
108+
Left e -> do
109+
liftIO $ traceWith tracer $ FailedToDecodeJson{reason = show e, filepath = fp, contents = show bs}
110+
pure Nothing
111+
Right decoded -> pure (Just decoded)
100112
source :: forall i. MVar () -> ConduitT i a (ResourceT m) ()
101113
source mutex = do
102114
liftIO (doesFileExist fp) >>= \case
@@ -108,12 +120,5 @@ createPersistenceIncremental fp = do
108120
-- NOTE: Read, decode and yield values line by line.
109121
sourceFileBS fp
110122
.| linesUnboundedAsciiC
111-
.| mapMC
112-
( \bs ->
113-
case Aeson.eitherDecodeStrict' bs of
114-
Left e ->
115-
lift . throwIO $
116-
PersistenceException $
117-
"Error when decoding from file " <> fp <> ": " <> show e <> "\n" <> show bs
118-
Right decoded -> pure decoded
119-
)
123+
.| mapMC maybeDecode
124+
.| concatC
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
module Hydra.PersistenceLog where
2+
3+
import Data.Aeson (defaultOptions, genericParseJSON, genericToJSON, tagSingleConstructors)
4+
import Hydra.Prelude
5+
6+
data PersistenceLog
7+
= FailedToDecodeJson {reason :: String, filepath :: FilePath, contents :: String}
8+
deriving stock (Eq, Show, Generic)
9+
10+
-- Note: Specific Aeson instances, so that we don't hide the tags when emitting
11+
-- this log.
12+
instance ToJSON PersistenceLog where
13+
toJSON =
14+
genericToJSON
15+
defaultOptions
16+
{ tagSingleConstructors = True
17+
}
18+
19+
instance FromJSON PersistenceLog where
20+
parseJSON =
21+
genericParseJSON
22+
defaultOptions
23+
{ tagSingleConstructors = True
24+
}

hydra-node/test/Hydra/Events/FileBasedSpec.hs

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import Hydra.HeadLogic (StateChanged)
1616
import Hydra.HeadLogic.StateEvent (StateEvent (..))
1717
import Hydra.Ledger.Cardano (Tx)
1818
import Hydra.Ledger.Simple (SimpleTx)
19+
import Hydra.Logging (Verbosity (Verbose), withTracer)
1920
import Hydra.Persistence (PersistenceIncremental (..), createPersistenceIncremental)
2021
import Test.Aeson.GenericSpecs (
2122
defaultSettings,
@@ -80,17 +81,18 @@ spec = do
8081
forAllShrink genContinuousEvents shrink $ \events -> do
8182
ioProperty $ do
8283
withTempDir "hydra-persistence" $ \tmpDir -> do
83-
let stateDir = tmpDir <> "/data"
84-
PersistenceIncremental{append} <- createPersistenceIncremental stateDir
85-
forM_ events append
86-
-- Load and store events through the event source interface
87-
EventStore{eventSource = src, eventSink = EventSink{putEvent}} <-
88-
mkFileBasedEventStore stateDir =<< createPersistenceIncremental stateDir
89-
loadedEvents <- getEvents src
90-
-- Store all loaded events like the node would do
91-
forM_ loadedEvents putEvent
92-
pure $
93-
loadedEvents === events
84+
withTracer (Verbose "hydra-persistence") $ \tracer -> do
85+
let stateDir = tmpDir <> "/data"
86+
PersistenceIncremental{append} <- createPersistenceIncremental tracer stateDir
87+
forM_ events append
88+
-- Load and store events through the event source interface
89+
EventStore{eventSource = src, eventSink = EventSink{putEvent}} <-
90+
mkFileBasedEventStore stateDir =<< createPersistenceIncremental tracer stateDir
91+
loadedEvents <- getEvents src
92+
-- Store all loaded events like the node would do
93+
forM_ loadedEvents putEvent
94+
pure $
95+
loadedEvents === events
9496

9597
genContinuousEvents :: Gen [StateEvent SimpleTx]
9698
genContinuousEvents =
@@ -99,6 +101,7 @@ genContinuousEvents =
99101
withEventSourceAndSink :: (EventSource (StateEvent SimpleTx) IO -> EventSink (StateEvent SimpleTx) IO -> IO b) -> IO b
100102
withEventSourceAndSink action =
101103
withTempDir "hydra-persistence" $ \tmpDir -> do
102-
let stateDir = tmpDir <> "/data"
103-
EventStore{eventSource, eventSink} <- mkFileBasedEventStore stateDir =<< createPersistenceIncremental stateDir
104-
action eventSource eventSink
104+
withTracer (Verbose "hydra-persistence") $ \tracer -> do
105+
let stateDir = tmpDir <> "/data"
106+
EventStore{eventSource, eventSink} <- mkFileBasedEventStore stateDir =<< createPersistenceIncremental tracer stateDir
107+
action eventSource eventSink

hydra-node/test/Hydra/PersistenceSpec.hs

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ import Test.Hydra.Prelude
77

88
import Data.Aeson (Value (..))
99
import Data.Aeson qualified as Aeson
10+
import Data.ByteString qualified as BS
1011
import Data.Text qualified as Text
12+
import Hydra.Logging (Verbosity (Verbose), traceWith, withTracer, withTracerOutputTo)
1113
import Hydra.Persistence (Persistence (..), PersistenceIncremental (..), createPersistence, createPersistenceIncremental, loadAll)
14+
import Hydra.PersistenceLog
1215
import Test.QuickCheck (checkCoverage, cover, elements, oneof, suchThat, (===))
1316
import Test.QuickCheck.Gen (listOf)
1417
import Test.QuickCheck.Monadic (monadicIO, monitor, pick, run)
@@ -35,12 +38,26 @@ spec = do
3538
pure $ actualResult === Just item
3639

3740
describe "PersistenceIncremental" $ do
38-
it "can handle empty files" $ do
41+
it "can ignore invalid lines and emits warning" $ do
3942
withTempDir "hydra-persistence" $ \tmpDir -> do
40-
let fp = tmpDir <> "/data"
41-
writeFileBS fp ""
42-
p <- createPersistenceIncremental fp
43-
loadAll p `shouldReturn` ([] :: [Aeson.Value])
43+
let logFile = tmpDir <> "/tracer.log"
44+
withFile logFile WriteMode $ \hdl -> do
45+
withTracerOutputTo hdl "persistence-incremental" $ \tracer -> do
46+
let fp = tmpDir <> "/data"
47+
writeFileBS fp "\"abc\"\n{\"xyz\": "
48+
-- traceWith tracer $ FailedToDecodeJson{reason = "show e", filepath = "fp", contents = "show bs"}
49+
p <- createPersistenceIncremental tracer fp
50+
loadAll p `shouldReturn` ([Aeson.String "abc"] :: [Aeson.Value])
51+
logs <- readFileBS logFile
52+
logs `shouldSatisfy` BS.isInfixOf "FailedToDecodeJson"
53+
54+
it "can handle empty files" $ do
55+
withTracer (Verbose "persistence-incremental") $ \tracer -> do
56+
withTempDir "hydra-persistence" $ \tmpDir -> do
57+
let fp = tmpDir <> "/data"
58+
writeFileBS fp ""
59+
p <- createPersistenceIncremental tracer fp
60+
loadAll p `shouldReturn` ([] :: [Aeson.Value])
4461

4562
it "is consistent after multiple append calls in presence of new-lines" $
4663
checkCoverage $
@@ -50,24 +67,26 @@ spec = do
5067
monitor (cover 10 (containsNewLine items) "some item contains a new line")
5168

5269
actualResult <- run $
53-
withTempDir "hydra-persistence" $ \tmpDir -> do
54-
p <- createPersistenceIncremental $ tmpDir <> "/data"
55-
forM_ items $ append p
56-
loadAll p
70+
withTracer (Verbose "persistence-incremental") $ \tracer -> do
71+
withTempDir "hydra-persistence" $ \tmpDir -> do
72+
p <- createPersistenceIncremental tracer $ tmpDir <> "/data"
73+
forM_ items $ append p
74+
loadAll p
5775
pure $ actualResult === items
5876

5977
it "it cannot load from a different thread once having started appending" $
6078
monadicIO $ do
6179
items <- pick $ listOf genPersistenceItem
6280
moreItems <- pick $ listOf genPersistenceItem `suchThat` ((> 2) . length)
6381
pure $
64-
withTempDir "hydra-persistence" $ \tmpDir -> do
65-
p <- createPersistenceIncremental $ tmpDir <> "/data"
66-
forM_ items $ append p
67-
loadAll p `shouldReturn` items
68-
raceLabelled_
69-
("forever-load-all", forever $ threadDelay 0.01 >> loadAll p)
70-
("append-more-items", forM_ moreItems $ \item -> append p item >> threadDelay 0.01)
82+
withTracer (Verbose "persistence-incremental") $ \tracer -> do
83+
withTempDir "hydra-persistence" $ \tmpDir -> do
84+
p <- createPersistenceIncremental tracer $ tmpDir <> "/data"
85+
forM_ items $ append p
86+
loadAll p `shouldReturn` items
87+
raceLabelled_
88+
("forever-load-all", forever $ threadDelay 0.01 >> loadAll p)
89+
("append-more-items", forM_ moreItems $ \item -> append p item >> threadDelay 0.01)
7190

7291
genPersistenceItem :: Gen Aeson.Value
7392
genPersistenceItem =

0 commit comments

Comments
 (0)