Skip to content

Commit f578304

Browse files
Add more documentation to the processStream
This change tries to clarify a bit more the processStream implementation.
1 parent 32984f2 commit f578304

File tree

2 files changed

+22
-5
lines changed

2 files changed

+22
-5
lines changed

src/Lentille.hs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ module Lentille (
2626
swapDuration,
2727

2828
-- * Stream helper
29+
lentilleStreamNext,
2930
streamDropBefore,
3031
Changes,
3132

@@ -125,6 +126,12 @@ yieldStreamError e = do
125126

126127
type LentilleStream es a = Stream (Of (Either LentilleError a)) (Eff es) ()
127128

129+
-- | Consumes one element of the stream, returns:
130+
-- - Left for the end of stream,
131+
-- - Right for the next element, and the rest of the stream.
132+
lentilleStreamNext :: LentilleStream es a -> Eff es (Either () (Either LentilleError a, (LentilleStream es a)))
133+
lentilleStreamNext = S.next
134+
128135
-------------------------------------------------------------------------------
129136
-- Utility functions for crawlers
130137
-------------------------------------------------------------------------------

src/Macroscope/Worker.hs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,18 +99,22 @@ processStream ::
9999
Eff es [Maybe ProcessError]
100100
processStream logFunc postFunc = go (0 :: Word) [] []
101101
where
102-
go count acc results stream = do
103-
eDocument <- S.next stream
102+
go count acc resultsAcc stream = do
103+
eDocument <- lentilleStreamNext stream
104104
case eDocument of
105105
Left () -> do
106106
-- The end of the stream
107107
res <- processBatch acc
108-
pure $ reverse (res : results)
108+
pure $ reverse (res : resultsAcc)
109109
Right (edoc, rest) -> do
110110
-- We got a new document
111111
let doc = case edoc of
112112
Right x -> x
113113
Left err -> DTError $ toCrawlerError err
114+
115+
-- Promote some 'LentilleError' as 'ProcessError' to terminate the stream.
116+
-- This can cause a crawler to stop crawling, for example if an entity removal
117+
-- is not correctly handled and if it produces a DecodeError.
114118
let addStreamError :: [Maybe ProcessError] -> [Maybe ProcessError]
115119
addStreamError = case edoc of
116120
Right _ -> id
@@ -119,12 +123,17 @@ processStream logFunc postFunc = go (0 :: Word) [] []
119123
Left (LentilleError _ EntityRemoved) -> id
120124
-- Every other 'LentilleError' are fatal
121125
Left err -> (Just (StreamError err) :)
126+
122127
let newAcc = doc : acc
123128
if count == 499
124129
then do
130+
-- After 500 elements, post the documents to the API
125131
res <- processBatch newAcc
126-
go 0 [] (addStreamError (res : results)) rest
127-
else go (count + 1) newAcc (addStreamError results) rest
132+
-- And continue processing the stream
133+
go 0 [] (addStreamError (res : resultsAcc)) rest
134+
else do
135+
-- Keep on processing the stream
136+
go (count + 1) newAcc (addStreamError resultsAcc) rest
128137

129138
toCrawlerError (LentilleError ts err) = CrawlerError {..}
130139
where
@@ -222,6 +231,7 @@ runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStre
222231
xs -> pure xs
223232

224233
-- Adapt the document stream to intermediate representation
234+
getStream :: UTCTime -> Entity -> LentilleStream es DocumentType
225235
getStream oldestAge entity = case documentStream of
226236
Changes s ->
227237
let project = extractEntityValue _Project

0 commit comments

Comments
 (0)