Skip to content

Commit b35cfc4

Browse files
Cmdvkderme
authored andcommitted
stop blocking offchain queues & update query
1 parent f37b381 commit b35cfc4

File tree

3 files changed

+30
-20
lines changed

3 files changed

+30
-20
lines changed

cardano-db-sync/src/Cardano/DbSync/OffChain.hs

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import Control.Concurrent.Class.MonadSTM.Strict (
2929
StrictTBQueue (..),
3030
flushTBQueue,
3131
isEmptyTBQueue,
32-
readTBQueue,
3332
writeTBQueue,
3433
)
3534
import Control.Monad.Trans.Control (MonadBaseControl)
@@ -84,8 +83,8 @@ loadOffChainWorkQueue ::
8483
loadOffChainWorkQueue _trce offChainWorkQueue = do
8584
whenM (liftIO $ atomically (isEmptyTBQueue (lQueue offChainWorkQueue))) $ do
8685
now <- liftIO Time.getPOSIXTime
87-
runnableOffChainPools <- filter (isRunnable now offChainWorkQueue) <$> lGetData offChainWorkQueue now 100
88-
liftIO $ mapM_ (queueInsert offChainWorkQueue) runnableOffChainPools
86+
runnableOffChainData <- filter (isRunnable now offChainWorkQueue) <$> lGetData offChainWorkQueue now 100
87+
liftIO $ mapM_ (queueInsert offChainWorkQueue) runnableOffChainData
8988
where
9089
isRunnable :: POSIXTime -> LoadOffChainWorkQueue a m -> a -> Bool
9190
isRunnable now oCWorkQueue locWq = retryRetryTime (lRetryTime oCWorkQueue locWq) <= now
@@ -172,7 +171,7 @@ runFetchOffChainPoolThread syncEnv = do
172171
tDelay
173172
-- load the offChain vote work queue using the db
174173
_ <- runReaderT (loadOffChainPoolWorkQueue trce (envOffChainPoolWorkQueue syncEnv)) (envBackend syncEnv)
175-
poolq <- blockingFlushTBQueue (envOffChainPoolWorkQueue syncEnv)
174+
poolq <- atomically $ flushTBQueue (envOffChainPoolWorkQueue syncEnv)
176175
manager <- Http.newManager tlsManagerSettings
177176
now <- liftIO Time.getPOSIXTime
178177
mapM_ (queuePoolInsert <=< fetchOffChainPoolData trce manager now) poolq
@@ -192,7 +191,7 @@ runFetchOffChainVoteThread syncEnv = do
192191
tDelay
193192
-- load the offChain pool work queue using the db
194193
_ <- runReaderT (loadOffChainVoteWorkQueue trce (envOffChainVoteWorkQueue syncEnv)) (envBackend syncEnv)
195-
voteq <- blockingFlushTBQueue (envOffChainVoteWorkQueue syncEnv)
194+
voteq <- atomically $ flushTBQueue (envOffChainVoteWorkQueue syncEnv)
196195
manager <- Http.newManager tlsManagerSettings
197196
now <- liftIO Time.getPOSIXTime
198197
mapM_ (queueVoteInsert <=< fetchOffChainVoteData trce manager now) voteq
@@ -207,16 +206,6 @@ runFetchOffChainVoteThread syncEnv = do
207206
tDelay :: IO ()
208207
tDelay = threadDelay 60_000_000
209208

210-
-- -------------------------------------------------------------------------------------------------
211-
212-
-- Blocks on an empty queue, but gets all elements in the queue if there is more than one.
213-
blockingFlushTBQueue :: StrictTBQueue IO a -> IO [a]
214-
blockingFlushTBQueue queue = do
215-
atomically $ do
216-
x <- readTBQueue queue
217-
xs <- flushTBQueue queue
218-
pure $ x : xs
219-
220209
---------------------------------------------------------------------------------------------------------------------------------
221210
-- Fetch OffChain data
222211
---------------------------------------------------------------------------------------------------------------------------------

cardano-db-sync/src/Cardano/DbSync/OffChain/Http.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ httpGetOffChain manager request mHash url =
172172
{ sovaHash = metadataHash
173173
, sovaBytes = respBS
174174
, -- TODO: no json format decided for vote metadata yet so we are leaving it blank for now
175-
sovaJson = ""
175+
sovaJson = "{}"
176176
, sovaContentType = mContentType
177177
}
178178

cardano-db-sync/src/Cardano/DbSync/OffChain/Query.hs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,16 @@ queryNewVoteWorkQueue :: MonadIO m => POSIXTime -> ReaderT SqlBackend m [OffChai
7575
queryNewVoteWorkQueue now = do
7676
res <- select $ do
7777
va <- from $ table @VotingAnchor
78-
where_ (notExists $ from (table @OffChainVoteData) >>= \ocvd -> where_ (ocvd ^. OffChainVoteDataVotingAnchorId ==. va ^. VotingAnchorId))
79-
where_ (notExists $ from (table @OffChainVoteFetchError) >>= \ocvfe -> where_ (ocvfe ^. OffChainVoteFetchErrorVotingAnchorId ==. va ^. VotingAnchorId))
78+
where_
79+
( notExists $
80+
from (table @OffChainVoteData) >>= \ocvd ->
81+
where_ (ocvd ^. OffChainVoteDataVotingAnchorId ==. va ^. VotingAnchorId)
82+
)
83+
where_
84+
( notExists $
85+
from (table @OffChainVoteFetchError) >>= \ocvfe ->
86+
where_ (ocvfe ^. OffChainVoteFetchErrorVotingAnchorId ==. va ^. VotingAnchorId)
87+
)
8088
pure
8189
( va ^. VotingAnchorId
8290
, va ^. VotingAnchorDataHash
@@ -101,6 +109,11 @@ queryOffChainVoteWorkQueue _now = do
101109
$ table @VotingAnchor
102110
`innerJoin` table @OffChainVoteFetchError
103111
`on` (\(va :& ocpfe) -> ocpfe ^. OffChainVoteFetchErrorVotingAnchorId ==. va ^. VotingAnchorId)
112+
where_
113+
( notExists $
114+
from (table @OffChainVoteData) >>= \ocvd ->
115+
where_ (ocvd ^. OffChainVoteDataVotingAnchorId ==. ocpfe ^. OffChainVoteFetchErrorVotingAnchorId)
116+
)
104117
orderBy [desc (ocpfe ^. OffChainVoteFetchErrorFetchTime)]
105118
pure
106119
( ocpfe ^. OffChainVoteFetchErrorFetchTime
@@ -144,8 +157,16 @@ queryNewPoolWorkQueue now = do
144157
`innerJoin` table @PoolMetadataRef
145158
`on` (\(ph :& pmr) -> ph ^. PoolHashId ==. pmr ^. PoolMetadataRefPoolId)
146159
where_ (just (pmr ^. PoolMetadataRefId) `in_` latestRefs)
147-
where_ (notExists $ from (table @OffChainPoolData) >>= \pod -> where_ (pod ^. OffChainPoolDataPmrId ==. pmr ^. PoolMetadataRefId))
148-
where_ (notExists $ from (table @OffChainPoolFetchError) >>= \pofe -> where_ (pofe ^. OffChainPoolFetchErrorPmrId ==. pmr ^. PoolMetadataRefId))
160+
where_
161+
( notExists $
162+
from (table @OffChainPoolData) >>= \pod ->
163+
where_ (pod ^. OffChainPoolDataPmrId ==. pmr ^. PoolMetadataRefId)
164+
)
165+
where_
166+
( notExists $
167+
from (table @OffChainPoolFetchError) >>= \pofe ->
168+
where_ (pofe ^. OffChainPoolFetchErrorPmrId ==. pmr ^. PoolMetadataRefId)
169+
)
149170
pure
150171
( ph ^. PoolHashId
151172
, pmr ^. PoolMetadataRefId

0 commit comments

Comments
 (0)