@@ -14,6 +14,7 @@ import Cardano.Db (
14
14
OffChainPoolFetchErrorId ,
15
15
OffChainVoteData ,
16
16
OffChainVoteFetchError ,
17
+ OffChainVoteFetchErrorId ,
17
18
PoolHash ,
18
19
PoolHashId ,
19
20
PoolMetaHash (PoolMetaHash ),
@@ -36,12 +37,13 @@ import Database.Esqueleto.Experimental (
36
37
SqlExpr ,
37
38
Value (.. ),
38
39
ValueList ,
39
- desc ,
40
+ asc ,
40
41
from ,
41
42
groupBy ,
42
43
in_ ,
43
44
innerJoin ,
44
45
just ,
46
+ limit ,
45
47
max_ ,
46
48
notExists ,
47
49
on ,
@@ -63,16 +65,16 @@ import System.Random.Shuffle (shuffleM)
63
65
---------------------------------------------------------------------------------------------------------------------------------
64
66
getOffChainVoteData :: MonadIO m => POSIXTime -> Int -> ReaderT SqlBackend m [OffChainVoteWorkQueue ]
65
67
getOffChainVoteData now maxCount = do
66
- xs <- queryNewVoteWorkQueue now
68
+ xs <- queryNewVoteWorkQueue now maxCount
67
69
if length xs >= maxCount
68
70
then take maxCount <$> liftIO (shuffleM xs)
69
71
else do
70
- ys <- queryOffChainVoteWorkQueue (Time. posixSecondsToUTCTime now)
72
+ ys <- queryOffChainVoteWorkQueue (Time. posixSecondsToUTCTime now) maxCount
71
73
take maxCount . (xs ++ ) <$> liftIO (shuffleM ys)
72
74
73
75
-- get all the voting anchors that don't already exist in OffChainVoteData or OffChainVoteFetchError
74
- queryNewVoteWorkQueue :: MonadIO m => POSIXTime -> ReaderT SqlBackend m [OffChainVoteWorkQueue ]
75
- queryNewVoteWorkQueue now = do
76
+ queryNewVoteWorkQueue :: MonadIO m => POSIXTime -> Int -> ReaderT SqlBackend m [OffChainVoteWorkQueue ]
77
+ queryNewVoteWorkQueue now maxCount = do
76
78
res <- select $ do
77
79
va <- from $ table @ VotingAnchor
78
80
where_
@@ -85,6 +87,7 @@ queryNewVoteWorkQueue now = do
85
87
from (table @ OffChainVoteFetchError ) >>= \ ocvfe ->
86
88
where_ (ocvfe ^. OffChainVoteFetchErrorVotingAnchorId ==. va ^. VotingAnchorId )
87
89
)
90
+ limit $ fromIntegral maxCount
88
91
pure
89
92
( va ^. VotingAnchorId
90
93
, va ^. VotingAnchorDataHash
@@ -101,20 +104,17 @@ queryNewVoteWorkQueue now = do
101
104
, oVoteWqUrl = url
102
105
}
103
106
104
- queryOffChainVoteWorkQueue :: MonadIO m => UTCTime -> ReaderT SqlBackend m [OffChainVoteWorkQueue ]
105
- queryOffChainVoteWorkQueue _now = do
107
+ queryOffChainVoteWorkQueue :: MonadIO m => UTCTime -> Int -> ReaderT SqlBackend m [OffChainVoteWorkQueue ]
108
+ queryOffChainVoteWorkQueue _now maxCount = do
106
109
res <- select $ do
107
110
(va :& ocpfe) <-
108
111
from
109
112
$ table @ VotingAnchor
110
113
`innerJoin` table @ OffChainVoteFetchError
111
114
`on` (\ (va :& ocpfe) -> ocpfe ^. OffChainVoteFetchErrorVotingAnchorId ==. va ^. VotingAnchorId )
112
- where_
113
- ( notExists $
114
- from (table @ OffChainVoteData ) >>= \ ocvd ->
115
- where_ (ocvd ^. OffChainVoteDataVotingAnchorId ==. ocpfe ^. OffChainVoteFetchErrorVotingAnchorId )
116
- )
117
- orderBy [desc (ocpfe ^. OffChainVoteFetchErrorFetchTime )]
115
+ orderBy [asc (ocpfe ^. OffChainVoteFetchErrorId )]
116
+ where_ (just (ocpfe ^. OffChainVoteFetchErrorId ) `in_` latestRefs)
117
+ limit $ fromIntegral maxCount
118
118
pure
119
119
( ocpfe ^. OffChainVoteFetchErrorFetchTime
120
120
, va ^. VotingAnchorId
@@ -133,23 +133,35 @@ queryOffChainVoteWorkQueue _now = do
133
133
, oVoteWqUrl = url
134
134
}
135
135
136
+ latestRefs :: SqlExpr (ValueList (Maybe OffChainVoteFetchErrorId ))
137
+ latestRefs =
138
+ subList_select $ do
139
+ ocvfe <- from (table @ OffChainVoteFetchError )
140
+ groupBy (ocvfe ^. OffChainVoteFetchErrorVotingAnchorId )
141
+ where_
142
+ ( notExists $
143
+ from (table @ OffChainVoteData ) >>= \ ocvd ->
144
+ where_ (ocvd ^. OffChainVoteDataVotingAnchorId ==. ocvfe ^. OffChainVoteFetchErrorVotingAnchorId )
145
+ )
146
+ pure $ max_ (ocvfe ^. OffChainVoteFetchErrorId )
147
+
136
148
---------------------------------------------------------------------------------------------------------------------------------
137
149
-- Query OffChain PoolData
138
150
---------------------------------------------------------------------------------------------------------------------------------
139
151
getOffChainPoolData :: MonadIO m => POSIXTime -> Int -> ReaderT SqlBackend m [OffChainPoolWorkQueue ]
140
152
getOffChainPoolData now maxCount = do
141
153
-- Results from the query are shuffles so we don't continuously get the same entries.
142
- xs <- queryNewPoolWorkQueue now
154
+ xs <- queryNewPoolWorkQueue now maxCount
143
155
if length xs >= maxCount
144
156
then take maxCount <$> liftIO (shuffleM xs)
145
157
else do
146
- ys <- queryOffChainPoolWorkQueue (Time. posixSecondsToUTCTime now)
158
+ ys <- queryOffChainPoolWorkQueue (Time. posixSecondsToUTCTime now) maxCount
147
159
take maxCount . (xs ++ ) <$> liftIO (shuffleM ys)
148
160
149
161
-- Get pool work queue data for new pools (ie pools that had OffChainPoolData entry and no
150
162
-- OffChainPoolFetchError).
151
- queryNewPoolWorkQueue :: MonadIO m => POSIXTime -> ReaderT SqlBackend m [OffChainPoolWorkQueue ]
152
- queryNewPoolWorkQueue now = do
163
+ queryNewPoolWorkQueue :: MonadIO m => POSIXTime -> Int -> ReaderT SqlBackend m [OffChainPoolWorkQueue ]
164
+ queryNewPoolWorkQueue now maxCount = do
153
165
res <- select $ do
154
166
(ph :& pmr) <-
155
167
from
@@ -167,6 +179,7 @@ queryNewPoolWorkQueue now = do
167
179
from (table @ OffChainPoolFetchError ) >>= \ pofe ->
168
180
where_ (pofe ^. OffChainPoolFetchErrorPmrId ==. pmr ^. PoolMetadataRefId )
169
181
)
182
+ limit $ fromIntegral maxCount
170
183
pure
171
184
( ph ^. PoolHashId
172
185
, pmr ^. PoolMetadataRefId
@@ -198,8 +211,8 @@ queryNewPoolWorkQueue now = do
198
211
}
199
212
200
213
-- Get pool fetch data for pools that have previously errored.
201
- queryOffChainPoolWorkQueue :: MonadIO m => UTCTime -> ReaderT SqlBackend m [OffChainPoolWorkQueue ]
202
- queryOffChainPoolWorkQueue _now = do
214
+ queryOffChainPoolWorkQueue :: MonadIO m => UTCTime -> Int -> ReaderT SqlBackend m [OffChainPoolWorkQueue ]
215
+ queryOffChainPoolWorkQueue _now maxCount = do
203
216
res <- select $ do
204
217
(ph :& pmr :& pofe) <-
205
218
from
@@ -209,8 +222,8 @@ queryOffChainPoolWorkQueue _now = do
209
222
`innerJoin` table @ OffChainPoolFetchError
210
223
`on` (\ (_ph :& pmr :& pofe) -> pofe ^. OffChainPoolFetchErrorPmrId ==. pmr ^. PoolMetadataRefId )
211
224
where_ (just (pofe ^. OffChainPoolFetchErrorId ) `in_` latestRefs)
212
- where_ (notExists $ from (table @ OffChainPoolData ) >>= \ pod -> where_ (pod ^. OffChainPoolDataPmrId ==. pofe ^. OffChainPoolFetchErrorPmrId ))
213
- orderBy [desc (pofe ^. OffChainPoolFetchErrorFetchTime )]
225
+ orderBy [asc ( pofe ^. OffChainPoolFetchErrorId )]
226
+ limit $ fromIntegral maxCount
214
227
pure
215
228
( pofe ^. OffChainPoolFetchErrorFetchTime
216
229
, pofe ^. OffChainPoolFetchErrorPmrId
@@ -228,6 +241,7 @@ queryOffChainPoolWorkQueue _now = do
228
241
latestRefs =
229
242
subList_select $ do
230
243
pofe <- from (table @ OffChainPoolFetchError )
244
+ where_ (notExists $ from (table @ OffChainPoolData ) >>= \ pod -> where_ (pod ^. OffChainPoolDataPmrId ==. pofe ^. OffChainPoolFetchErrorPmrId ))
231
245
groupBy (pofe ^. OffChainPoolFetchErrorPoolId )
232
246
pure $ max_ (pofe ^. OffChainPoolFetchErrorId )
233
247
0 commit comments