diff --git a/src/Lentille.hs b/src/Lentille.hs index 053fff5c9..c48545763 100644 --- a/src/Lentille.hs +++ b/src/Lentille.hs @@ -26,6 +26,7 @@ module Lentille ( swapDuration, -- * Stream helper + lentilleStreamNext, streamDropBefore, Changes, @@ -125,6 +126,12 @@ yieldStreamError e = do type LentilleStream es a = Stream (Of (Either LentilleError a)) (Eff es) () +-- | Consumes one element of the stream, returns: +-- - Left for the end of stream, +-- - Right for the next element, and the rest of the stream. +lentilleStreamNext :: LentilleStream es a -> Eff es (Either () (Either LentilleError a, LentilleStream es a)) +lentilleStreamNext = S.next + ------------------------------------------------------------------------------- -- Utility functions for crawlers ------------------------------------------------------------------------------- diff --git a/src/Lentille/GitLab/MergeRequests.hs b/src/Lentille/GitLab/MergeRequests.hs index 2621a21d6..e7770ce80 100644 --- a/src/Lentille/GitLab/MergeRequests.hs +++ b/src/Lentille/GitLab/MergeRequests.hs @@ -148,6 +148,12 @@ transformResponse host getIdentIdCB result = , NoErr , extract shortName fullName <$> catMaybes nodes ) + GetProjectMergeRequests Nothing -> + ( PageInfo False Nothing Nothing + , Nothing + , NoRepo + , [] + ) _anyOtherResponse -> ( PageInfo False Nothing Nothing , Nothing diff --git a/src/Macroscope/Worker.hs b/src/Macroscope/Worker.hs index a6779de93..4b09a04eb 100644 --- a/src/Macroscope/Worker.hs +++ b/src/Macroscope/Worker.hs @@ -99,18 +99,22 @@ processStream :: Eff es [Maybe ProcessError] processStream logFunc postFunc = go (0 :: Word) [] [] where - go count acc results stream = do - eDocument <- S.next stream + go count acc resultsAcc stream = do + eDocument <- lentilleStreamNext stream case eDocument of Left () -> do -- The end of the stream res <- processBatch acc - pure $ reverse (res : results) + pure $ reverse (res : resultsAcc) Right (edoc, rest) -> do -- We got a new document let doc = case edoc of Right x -> x Left err -> DTError $ toCrawlerError err + + -- Promote some 'LentilleError' as 'ProcessError' to terminate the stream. + -- This can cause a crawler to stop crawling, for example if an entity removal + -- is not correctly handled and if it produces a DecodeError. let addStreamError :: [Maybe ProcessError] -> [Maybe ProcessError] addStreamError = case edoc of Right _ -> id @@ -119,12 +123,17 @@ processStream logFunc postFunc = go (0 :: Word) [] [] Left (LentilleError _ EntityRemoved) -> id -- Every other 'LentilleError' are fatal Left err -> (Just (StreamError err) :) + let newAcc = doc : acc if count == 499 then do + -- After 500 elements, post the documents to the API res <- processBatch newAcc - go 0 [] (addStreamError (res : results)) rest - else go (count + 1) newAcc (addStreamError results) rest + -- And continue processing the stream + go 0 [] (addStreamError (res : resultsAcc)) rest + else do + -- Keep on processing the stream + go (count + 1) newAcc (addStreamError resultsAcc) rest toCrawlerError (LentilleError ts err) = CrawlerError {..} where @@ -197,7 +206,8 @@ runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStre addUTCTime 1 oldestAge >= startTime -> do logInfo "Crawling entities completed" ["entity" .= entity, "age" .= oldestAge] pure [] - | otherwise -> goStream oldestAge entity + | otherwise -> withContext ("entity" .= entity) do + goStream oldestAge entity goStream oldestAge entity = do logInfo "Processing" ["entity" .= entity, "age" .= oldestAge] @@ -221,6 +231,7 @@ runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStre xs -> pure xs -- Adapt the document stream to intermediate representation + getStream :: UTCTime -> Entity -> LentilleStream es DocumentType getStream oldestAge entity = case documentStream of Changes s -> let project = extractEntityValue _Project