Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/Lentille.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ module Lentille (
swapDuration,

-- * Stream helper
lentilleStreamNext,
streamDropBefore,
Changes,

Expand Down Expand Up @@ -125,6 +126,12 @@ yieldStreamError e = do

type LentilleStream es a = Stream (Of (Either LentilleError a)) (Eff es) ()

-- | Consumes one element of the stream, returns:
-- - Left for the end of stream,
-- - Right for the next element, and the rest of the stream.
lentilleStreamNext :: LentilleStream es a -> Eff es (Either () (Either LentilleError a, LentilleStream es a))
lentilleStreamNext = S.next

-------------------------------------------------------------------------------
-- Utility functions for crawlers
-------------------------------------------------------------------------------
Expand Down
6 changes: 6 additions & 0 deletions src/Lentille/GitLab/MergeRequests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ transformResponse host getIdentIdCB result =
, NoErr
, extract shortName fullName <$> catMaybes nodes
)
GetProjectMergeRequests Nothing ->
( PageInfo False Nothing Nothing
, Nothing
, NoRepo
, []
)
_anyOtherResponse ->
( PageInfo False Nothing Nothing
, Nothing
Expand Down
23 changes: 17 additions & 6 deletions src/Macroscope/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,22 @@ processStream ::
Eff es [Maybe ProcessError]
processStream logFunc postFunc = go (0 :: Word) [] []
where
go count acc results stream = do
eDocument <- S.next stream
go count acc resultsAcc stream = do
eDocument <- lentilleStreamNext stream
case eDocument of
Left () -> do
-- The end of the stream
res <- processBatch acc
pure $ reverse (res : results)
pure $ reverse (res : resultsAcc)
Right (edoc, rest) -> do
-- We got a new document
let doc = case edoc of
Right x -> x
Left err -> DTError $ toCrawlerError err

-- Promote some 'LentilleError' as 'ProcessError' to terminate the stream.
-- This can cause a crawler to stop crawling, for example if an entity removal
-- is not correctly handled and if it produces a DecodeError.
let addStreamError :: [Maybe ProcessError] -> [Maybe ProcessError]
addStreamError = case edoc of
Right _ -> id
Expand All @@ -119,12 +123,17 @@ processStream logFunc postFunc = go (0 :: Word) [] []
Left (LentilleError _ EntityRemoved) -> id
-- Every other 'LentilleError' are fatal
Left err -> (Just (StreamError err) :)

let newAcc = doc : acc
if count == 499
then do
-- After 500 elements, post the documents to the API
res <- processBatch newAcc
go 0 [] (addStreamError (res : results)) rest
else go (count + 1) newAcc (addStreamError results) rest
-- And continue processing the stream
go 0 [] (addStreamError (res : resultsAcc)) rest
else do
-- Keep on processing the stream
go (count + 1) newAcc (addStreamError resultsAcc) rest

toCrawlerError (LentilleError ts err) = CrawlerError {..}
where
Expand Down Expand Up @@ -197,7 +206,8 @@ runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStre
addUTCTime 1 oldestAge >= startTime -> do
logInfo "Crawling entities completed" ["entity" .= entity, "age" .= oldestAge]
pure []
| otherwise -> goStream oldestAge entity
| otherwise -> withContext ("entity" .= entity) do
goStream oldestAge entity

goStream oldestAge entity = do
logInfo "Processing" ["entity" .= entity, "age" .= oldestAge]
Expand All @@ -221,6 +231,7 @@ runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStre
xs -> pure xs

-- Adapt the document stream to intermediate representation
getStream :: UTCTime -> Entity -> LentilleStream es DocumentType
getStream oldestAge entity = case documentStream of
Changes s ->
let project = extractEntityValue _Project
Expand Down
Loading