1
1
import Apify from 'apify' ;
2
2
import { inspect } from 'util' ;
3
3
import { convertInputToActorConfigs } from './lib/configs.js' ;
4
- import { revivePendingConfigs } from './lib/revivePendingConfigs.js' ;
5
- import { waitForRunToFinish , startRun } from './lib/startRunAndPool.js' ;
4
+ import { waitForRunToFinishAndPushData , startRun } from './lib/startRunAndPool.js' ;
6
5
7
6
const { log, sleep } = Apify . utils ;
8
7
const env = Apify . getEnv ( ) ;
@@ -11,6 +10,8 @@ Apify.main(async () => {
11
10
/** @type {import('../../common/types').ActorInputData } */
12
11
// @ts -ignore It's not null
13
12
const input = await Apify . getInput ( ) ;
13
+ log . debug ( 'Provided inputs:' ) ;
14
+ log . debug ( inspect ( input ) ) ;
14
15
15
16
const { maxConcurrentDomainsChecked, urlsToCheck } = input ;
16
17
@@ -21,70 +22,82 @@ Apify.main(async () => {
21
22
/** @type {import('./types').FrontendActorState } */
22
23
// @ts -expect-error It's an object
23
24
const state = await Apify . getValue ( 'STATE' ) ?? {
24
- preparedConfigs : [ ] ,
25
- pendingConfigs : [ ] ,
26
- totalUrls : 0 ,
25
+ runConfigurations : [ ] ,
26
+ totalUrls : urlsToCheck . length ,
27
+ checkerFinished : false ,
27
28
} ;
28
29
29
30
Apify . events . on ( 'migrating' , async ( ) => {
30
31
await Apify . setValue ( 'STATE' , state ) ;
31
32
} ) ;
32
33
34
+ Apify . events . on ( 'persistState' , async ( ) => {
35
+ await Apify . setValue ( 'STATE' , state ) ;
36
+ } ) ;
37
+
33
38
setInterval ( async ( ) => {
34
39
await Apify . setValue ( 'STATE' , state ) ;
35
40
36
41
log . debug ( 'Internal state:' ) ;
37
42
log . debug ( inspect ( state , false , 3 ) ) ;
38
43
} , 10_000 ) ;
39
44
40
- state . preparedConfigs = convertInputToActorConfigs ( input ) ;
41
- state . totalUrls = urlsToCheck . length ;
42
-
43
- log . info ( `Preparing to process ${ state . totalUrls } URLs...\n` ) ;
44
-
45
- // Check for revivals first, in the event the actor crashed, and handle those to the end
46
- await revivePendingConfigs ( state ) ;
47
-
48
- while ( true ) {
49
- // Each element of domainsToCheck represents a URL with its own run configurations
50
- const domainsToCheck = state . preparedConfigs . splice ( 0 , maxConcurrentDomainsChecked ) ;
51
- // If we got no more URLs to run, exit the loop
52
- if ( domainsToCheck . length === 0 ) break ;
53
-
54
- log . info ( `Starting a batch of ${ domainsToCheck . length } URLs to check` ) ;
45
+ // If we haven't initialized the state yet, do it now
46
+ if ( state . runConfigurations . length === 0 && ! state . checkerFinished ) {
47
+ state . runConfigurations = convertInputToActorConfigs ( input ) ;
48
+ }
55
49
56
- state . pendingConfigs = domainsToCheck ;
57
- // Save the state right off the bat, in the event the actor dies right after
58
- await Apify . setValue ( 'STATE' , state ) ;
50
+ // Sort state based on started runs
51
+ state . runConfigurations = state . runConfigurations . sort ( ( _ , b ) => Number ( Boolean ( b . runId ) ) ) ;
52
+ await Apify . setValue ( 'STATE' , state ) ;
59
53
60
- const promises = [ ] ;
54
+ log . info ( `Preparing to process ${ state . totalUrls } URLs...\n` ) ;
61
55
62
- for ( const domainRunConfigs of domainsToCheck ) {
63
- for ( const run of domainRunConfigs ) {
64
- const result = await startRun ( run ) ;
56
+ /** @type {import('apify').RequestOptions[] } */
57
+ const sources = state . runConfigurations . map ( ( actorInput , index ) => ( {
58
+ url : 'https://localhost' ,
59
+ uniqueKey : index . toString ( ) ,
60
+ userData : { actorInput } ,
61
+ } ) ) ;
62
+
63
+ const requestList = await Apify . openRequestList ( null , sources ) ;
64
+
65
+ const runner = new Apify . BasicCrawler ( {
66
+ maxConcurrency : maxConcurrentDomainsChecked ,
67
+ requestList,
68
+ handleRequestFunction : async ( { request } ) => {
69
+ const { uniqueKey, userData } = request ;
70
+ /** @type {{ actorInput: import('../../common/types').PreparedActorConfig } } */
71
+ // @ts -expect-error JS-style casting
72
+ const { actorInput } = userData ;
73
+
74
+ if ( actorInput . runId ) {
75
+ log . info ( `Found run ${ actorInput . runId } with actor ${ actorInput . actorId } for URL "${ actorInput . url } " - waiting for it to finish.` ) ;
76
+ log . info ( `You can monitor the status of the run by going to https://console.apify.com/actors/runs/${ actorInput . runId } ` ) ;
77
+ } else {
78
+ const result = await startRun ( actorInput ) ;
65
79
log . info (
66
- `Starting run for "${ run . url } " with actor ${ run . actorId } and ${
67
- run . input . proxyConfiguration . useApifyProxy ? `proxy ${ run . proxyUsed ?? 'auto' } ` : 'no proxy'
80
+ `Starting run for "${ actorInput . url } " with actor ${ actorInput . actorId } and ${
81
+ actorInput . input . proxyConfiguration . useApifyProxy ? `proxy ${ actorInput . proxyUsed ?? 'auto' } ` : 'no proxy'
68
82
} .`,
69
83
) ;
70
84
log . info ( `You can monitor the status of the run by going to https://console.apify.com/actors/runs/${ result . id } ` ) ;
71
- run . runId = result . id ;
72
- await Apify . setValue ( 'STATE' , state ) ;
73
-
74
- // Start pooling the run for its results
75
- promises . push ( waitForRunToFinish ( run , result . id ) ) ;
76
-
77
- // Wait a second to not overload the platform
78
- await sleep ( 1000 ) ;
85
+ actorInput . runId = result . id ;
86
+ // TODO(vladfrangu): remove this once I confirm the value is updated, so we don't restart runs for no reason
87
+ console . log ( state . runConfigurations [ Number ( uniqueKey ) ] ) ;
79
88
}
80
- }
81
89
82
- // Await all runs to finish before continuing
83
- await Promise . allSettled ( promises ) ;
84
- }
90
+ // Wait for the run to finish
91
+ await waitForRunToFinishAndPushData ( actorInput ) ;
92
+ } ,
93
+ } ) ;
94
+
95
+ // Run the checker
96
+ await runner . run ( ) ;
85
97
86
98
// Save the state as done, to prevent resurrection doing requests it doesn't have to do
87
- state . preparedConfigs = [ ] ;
99
+ state . runConfigurations = [ ] ;
100
+ state . checkerFinished = true ;
88
101
await Apify . setValue ( 'STATE' , state ) ;
89
102
90
103
log . info ( `\nChecking ${ state . totalUrls } URLs completed!` ) ;
0 commit comments