11import pLimit from "p-limit" ;
22import { mainLog , mongo , redis } from "../index.js" ;
33
4- const limit = pLimit ( 1 ) ;
4+ const limit = pLimit ( Number . parseInt ( process . env . LIMIT || "5" ) ) ;
5+ const BATCH_SIZE = Number . parseInt ( process . env . BATCH_SIZE || "1000" ) ;
56
6- const BATCH_SIZE = Number . parseInt ( process . env . BATCH_SIZE || "10000" ) ;
77export default async function ( ) {
8- limit ( async ( ) => {
9- let count = 0 ;
10- const log = mainLog . extend ( "updateHeartbeats" ) ;
11-
12- let cursor = "" ;
13- do {
14- const result = await redis . hscan (
15- "pmd-api.heartbeatUpdates" ,
16- cursor ,
17- "COUNT" ,
18- BATCH_SIZE
19- ) ;
20- cursor = result [ 0 ] ;
8+ const log = mainLog . extend ( "updateHeartbeats" ) ;
9+ let count = 0 ;
10+
11+ const processBatch = async ( cursor : string ) => {
12+ const result = await redis . hscan (
13+ "pmd-api.heartbeatUpdates" ,
14+ cursor ,
15+ "COUNT" ,
16+ BATCH_SIZE
17+ ) ;
18+ const newCursor = result [ 0 ] ;
2119
22- let batchToDelete = [ ] ;
23- let data = [ ] ;
24- for ( let i = 0 ; i < result [ 1 ] . length ; i += 2 ) {
25- const identifier = result [ 1 ] [ i ] ;
26- data . push ( JSON . parse ( result [ 1 ] [ i + 1 ] ) ) ;
20+ let batchToDelete = [ ] ;
21+ let data = [ ] ;
22+ for ( let i = 0 ; i < result [ 1 ] . length ; i += 2 ) {
23+ const identifier = result [ 1 ] [ i ] ;
24+ data . push ( JSON . parse ( result [ 1 ] [ i + 1 ] ) ) ;
2725
28- batchToDelete . push ( identifier ) ;
29- count ++ ;
30- }
26+ batchToDelete . push ( identifier ) ;
27+ count ++ ;
28+ }
3129
32- if ( ! batchToDelete . length ) continue ;
30+ if ( batchToDelete . length ) {
3331 await redis . hdel ( "pmd-api.heartbeatUpdates" , ...batchToDelete ) ;
3432
3533 const res = await mongo
@@ -40,7 +38,7 @@ export default async function () {
4038 updateOne : {
4139 filter : { identifier : d . identifier } ,
4240 update : {
43- $set : { ...d , updated : new Date ( ) }
41+ $set : { ...d , updated : new Date ( d . updated ) }
4442 } ,
4543 upsert : true
4644 }
@@ -52,8 +50,15 @@ export default async function () {
5250 res . upsertedCount ,
5351 res . modifiedCount
5452 ) ;
55- } while ( cursor !== "0" ) ;
53+ }
54+
55+ return newCursor ;
56+ } ;
57+
58+ let cursor = "0" ;
59+ do {
60+ cursor = await limit ( ( ) => processBatch ( cursor ) ) ;
61+ } while ( cursor !== "0" ) ;
5662
57- if ( count > 0 ) log ( "Updated %s entries" , count ) ;
58- } ) ;
63+ if ( count > 0 ) log ( "Updated %s entries" , count ) ;
5964}
0 commit comments