@@ -11,6 +11,7 @@ const { FETCH_AMOUNT, APIS } = require('./config')
1111const { logger } = require ( './utils/logger' )
1212const { chunk } = require ( 'lodash' )
1313const { queueEvents, queue, Job } = require ( './utils/queue' )
14+ const { timed } = require ( './utils/time' )
1415
1516const API_MAPPING = {
1617 [ APIS . ori ] : oriRequest ,
@@ -31,17 +32,19 @@ queueEvents.on('failed', async ({ jobId }) => {
3132 parts . push ( entities . splice ( 0 , half ) )
3233 parts . push ( entities . splice ( half - 1 , entities . length ) )
3334 logger . info ( 'Splitting the job and trying again, ' , {
34- orignal : entities . length ,
35+ original : entities . length ,
3536 part1 : parts [ 0 ] . length ,
3637 part2 : parts [ 1 ] . length
3738 } )
3839 } else {
3940 return
4041 }
4142
42- for ( const part in parts ) {
43- await queue . add ( job . name , part )
44- }
43+ const { duration } = await timed ( Promise . all ( parts . map ( part => queue . add ( job . name , part ) ) ) )
44+ logger . info ( {
45+ message : 'Requeued two jobs' ,
46+ timeMs : duration
47+ } )
4548} )
4649
4750const fetchBy = async ( api , url , ordinal , customRequest , limit = 1000 , query ) => {
@@ -59,30 +62,45 @@ const scheduleBMQ = async serviceId => {
5962
6063 if ( ONETIME && latestOrdinal > 0 ) return
6164
62- const { hasMore, entities, greatestOrdinal } = await fetchBy (
63- API ,
64- API_URL ,
65- latestOrdinal ,
66- customRequest ,
67- FETCH_AMOUNT ,
68- { QUERY , GRAPHQL_KEY }
69- )
70- if ( ! entities || ! entities . length ) return
71-
72- logger . info ( `Creating BMQ jobs for ${ CHANNEL } ` )
73- await queue . addBulk (
74- chunk ( entities , 1000 ) . map ( entityChunk => ( {
75- name : CHANNEL ,
76- data : entityChunk
77- } ) )
65+ const {
66+ result : { hasMore, entities, greatestOrdinal } ,
67+ duration : fetchDuration
68+ } = await timed (
69+ fetchBy ( API , API_URL , latestOrdinal , customRequest , FETCH_AMOUNT , {
70+ QUERY ,
71+ GRAPHQL_KEY
72+ } )
7873 )
79- logger . info ( `Created BMQ jobs for ${ CHANNEL } ` )
74+ if ( ! entities || ! entities . length ) {
75+ logger . info ( `No entities fetched from ${ API_URL } ` , {
76+ serviceId,
77+ channel : CHANNEL ,
78+ timeMs : fetchDuration
79+ } )
80+ return
81+ }
82+
83+ logger . info ( `Fetched ${ entities . length } entities from ${ API_URL } ` , {
84+ serviceId,
85+ channel : CHANNEL ,
86+ timeMs : fetchDuration
87+ } )
88+
89+ const jobs = chunk ( entities , 1000 ) . map ( entityChunk => ( {
90+ name : CHANNEL ,
91+ data : entityChunk
92+ } ) )
8093
81- const amountScheduled = entities . length
94+ logger . info ( `Queuing BMQ jobs for ${ CHANNEL } ` , {
95+ count : jobs . length
96+ } )
97+ const { duration : queueDuration } = await timed ( queue . addBulk ( jobs ) )
8298
8399 logger . info ( {
84- message : `Queued ${ serviceId } ${ amountScheduled } ` ,
85- count : amountScheduled ,
100+ message : `Queued ${ serviceId } ${ entities . length } ` ,
101+ count : entities . length ,
102+ channel : CHANNEL ,
103+ timeMs : queueDuration ,
86104 serviceId
87105 } )
88106
0 commit comments