@@ -23,9 +23,11 @@ import { kebabCase } from "change-case";
2323import { validateDirectCallRequest } from "./validation" ;
2424import { sleep } from "botasaurus/utils" ;
2525import { DirectCallCacheService } from "./task-results" ;
26- import { cleanBasePath , isNotEmptyObject } from "./utils" ;
26+ import { cleanBasePath , isNotEmptyObject , isObject } from "./utils" ;
2727import { isDontCache } from "botasaurus/dontcache" ;
2828import { JsonHTTPResponseWithMessage } from "./errors" ;
29+ import { cleanDataInPlace } from "botasaurus/output" ;
30+ import { removeDuplicatesByKey } from "./models" ;
2931
3032function addCorsHeaders ( reply : any ) {
3133 reply . header ( "Access-Control-Allow-Origin" , "*" ) ;
@@ -69,71 +71,179 @@ function addScraperRoutes(app: FastifyInstance, apiBasePath: string) {
6971 params
7072 ) ;
7173
72- let cacheKey : string | undefined ;
73- if ( Server . cache ) {
74- // Check cache
75- cacheKey = DirectCallCacheService . createCacheKey (
76- scraper . scraper_name ,
77- validatedData
78- ) ;
74+ // Check if scraper has split_task
75+ const splitTask = scraper . split_task ;
76+ const scraperName = scraper . scraper_name ;
7977
80- if ( DirectCallCacheService . has ( cacheKey ) ) {
81- try {
82- const cachedResult =
83- DirectCallCacheService . get ( cacheKey ) ;
84- return cachedResult ?? { result : null } ;
85- } catch ( error ) {
86- console . error ( error ) ;
87- // Continue with normal execution if cache fails
88- }
89- }
78+ // If split_task exists, split the data
79+ let dataItems : any [ ] ;
80+ if ( splitTask ) {
81+ dataItems = splitTask ( validatedData ) ;
82+ } else {
83+ dataItems = [ validatedData ] ;
9084 }
9185
92- // Wait for capacity if needed
93- while ( ! executor . hasCapacity ( key ) ) {
94- await sleep ( 0.1 ) ;
86+ const mt = isNotEmptyObject ( metadata ) ? { metadata } : { } ;
87+ let shouldDecrementCapacity = false ;
88+
89+ function restoreCapacity ( ) {
90+ if ( shouldDecrementCapacity ) {
91+ executor . decrementCapacity ( key ) ;
92+ shouldDecrementCapacity = false ;
93+ }
9594 }
9695
97- // Execute scraper
98- executor . incrementCapacity ( key ) ;
99- const mt = isNotEmptyObject ( metadata ) ? { metadata } : { } ;
96+ // Collect results with metadata
97+ let collectedResults : Array < {
98+ isFromCache : boolean ;
99+ isDontCache : boolean ;
100+ result : any ;
101+ cacheKey ?: string ;
102+ } > = [ ] ;
100103 try {
101- const results = await fn ( validatedData , {
102- ...mt ,
103- parallel : null ,
104- cache : false ,
105- beep : false ,
106- raiseException : true ,
107- closeOnCrash : true ,
108- output : null ,
109- createErrorLogs : false ,
110- returnDontCacheAsIs : true ,
111- } ) ;
112-
113- let isResultDontCached = false ;
114- let finalResults = results ;
115-
116- // Handle don't cache flag
117- if ( isDontCache ( results ) ) {
118- isResultDontCached = true ;
119- finalResults = results . data ;
104+ // Execute function for each data item
105+ for ( const dataItem of dataItems ) {
106+ let cacheKey : string | undefined ;
107+ let isFromCache = false ;
108+ let resultData : any = null ;
109+ let isDontCacheFlag = false ;
110+
111+ // Check cache for this specific data item
112+ if ( Server . cache ) {
113+ cacheKey = DirectCallCacheService . createCacheKey (
114+ scraperName ,
115+ dataItem
116+ ) ;
117+
118+ if ( DirectCallCacheService . has ( cacheKey ) ) {
119+ try {
120+ resultData = DirectCallCacheService . get (
121+ cacheKey
122+ ) ?? { result : null } ;
123+ isFromCache = true ;
124+ } catch ( error ) {
125+ console . error ( error ) ;
126+ // Continue with normal execution if cache fails
127+ }
128+ }
129+ }
130+
131+ // Execute if not from cache
132+ if ( ! isFromCache ) {
133+ // Wait for capacity if needed (only on first execution)
134+ if ( ! shouldDecrementCapacity ) {
135+ while ( ! executor . hasCapacity ( key ) ) {
136+ await sleep ( 0.1 ) ;
137+ }
138+ executor . incrementCapacity ( key ) ;
139+ shouldDecrementCapacity = true ;
140+ }
141+
142+ const result = await fn ( dataItem , {
143+ ...mt ,
144+ parallel : null ,
145+ cache : false ,
146+ beep : false ,
147+ raiseException : true ,
148+ closeOnCrash : true ,
149+ output : null ,
150+ createErrorLogs : false ,
151+ returnDontCacheAsIs : true ,
152+ } ) ;
153+
154+ // Handle don't cache flag
155+ if ( isDontCache ( result ) ) {
156+ isDontCacheFlag = true ;
157+ resultData = result . data ;
158+ } else {
159+ resultData = result ;
160+ }
161+ }
162+
163+ // Collect result
164+ collectedResults . push ( {
165+ isFromCache,
166+ isDontCache : isDontCacheFlag ,
167+ result : resultData ,
168+ cacheKey,
169+ } ) ;
120170 }
171+ } finally {
172+ restoreCapacity ( ) ;
173+ }
174+
175+ // Determine if we should return first object
176+ const firstResult =
177+ collectedResults . length > 0
178+ ? collectedResults [ 0 ] . result
179+ : null ;
180+ const returnFirstObject = ! splitTask && isObject ( firstResult ) ;
181+
182+ if ( Server . cache ) {
183+ // Handle caching for each item
184+ for ( const item of collectedResults ) {
185+ // Skip if from cache or don't cache
186+ if ( item . isFromCache || item . isDontCache ) {
187+ continue ;
188+ }
121189
122- // Cache results if appropriate
123- if ( Server . cache && ! isResultDontCached ) {
124- try {
125- // @ts -ignore
126- DirectCallCacheService . put ( cacheKey , finalResults ) ;
127- } catch ( cacheError ) {
128- console . error ( "Cache storage error:" , cacheError ) ;
129- // Continue even if caching fails
190+ // If not an object, clean and remove duplicates
191+ if ( ! isObject ( item . result ) ) {
192+ item . result = cleanDataInPlace ( item . result ) ;
193+ const removeDuplicatesBy =
194+ Server . getRemoveDuplicatesBy ( scraperName ) ;
195+ if ( removeDuplicatesBy ) {
196+ item . result = removeDuplicatesByKey (
197+ item . result ,
198+ removeDuplicatesBy
199+ ) ;
200+ }
201+ }
202+
203+ // Save to cache
204+ if ( item . cacheKey ) {
205+ try {
206+ DirectCallCacheService . put (
207+ item . cacheKey ,
208+ item . result
209+ ) ;
210+ } catch ( cacheError ) {
211+ console . error (
212+ "Cache storage error:" ,
213+ cacheError
214+ ) ;
215+ }
130216 }
131217 }
218+ }
132219
133- return finalResults ?? { result : null } ;
134- } finally {
135- executor . decrementCapacity ( key ) ;
220+ if ( returnFirstObject ) {
221+ return collectedResults [ 0 ] . result ;
222+ }
223+
224+ // Aggregate all results
225+ let aggregatedResults : any [ ] = [ ] ;
226+ for ( const item of collectedResults ) {
227+ if ( Array . isArray ( item . result ) ) {
228+ aggregatedResults . push ( ...item . result ) ;
229+ } else {
230+ aggregatedResults . push ( item . result ) ;
231+ }
232+ }
233+ // Release references, else cleanDataInPlace will be useless due to double references
234+ collectedResults = [ ]
235+ // Final pass: clean and remove duplicates
236+ aggregatedResults = cleanDataInPlace ( aggregatedResults ) ;
237+ const removeDuplicatesBy =
238+ Server . getRemoveDuplicatesBy ( scraperName ) ;
239+ if ( removeDuplicatesBy ) {
240+ aggregatedResults = removeDuplicatesByKey (
241+ aggregatedResults ,
242+ removeDuplicatesBy
243+ ) ;
136244 }
245+
246+ return aggregatedResults ;
137247 } catch ( error : any ) {
138248 if ( error instanceof JsonHTTPResponseWithMessage ) {
139249 throw error ; // Re-throw the error to be handled elsewhere
@@ -165,7 +275,7 @@ export function buildApp(
165275 scrapers : any [ ] ,
166276 apiBasePath : string ,
167277 routeAliases : any ,
168- enable_cache : boolean ,
278+ enable_cache : boolean
169279) : FastifyInstance {
170280 const app = fastify ( { logger : true } ) ;
171281
@@ -350,8 +460,8 @@ async function startServer(
350460 port : number ,
351461 scrapers : any [ ] ,
352462 apiBasePath : string ,
353- routeAliases : any ,
354- enable_cache : boolean ,
463+ routeAliases : any ,
464+ enable_cache : boolean
355465) : Promise < void > {
356466 try {
357467 if ( server ) {
@@ -424,7 +534,6 @@ class ApiConfig {
424534 this . apiPort = port ;
425535 }
426536
427-
428537 /**
429538 * Disables automatic API server startup on application launch.
430539 * When enabled, API must be manually started from the API Tab in desktop GUI.
@@ -469,14 +578,14 @@ class ApiConfig {
469578
470579 /**
471580 * Adds custom routes to the API server using Fastify's routing system.
472- *
581+ *
473582 * The function receives a FastifyInstance object, which you can use to define your routes.
474- *
583+ *
475584 * When to use:
476585 * - Authentication middleware
477586 * - Custom data processing endpoints
478587 * - Adding Webhook receivers
479- *
588+ *
480589 * @param {(server: FastifyInstance) => void } routeSetupFn - Function that receives FastifyInstance to add custom routes
481590 * @example
482591 * // Adding a custom health check endpoint
@@ -485,17 +594,17 @@ class ApiConfig {
485594 * return reply.send({ status: 'OK'});
486595 * });
487596 * });
488- *
597+ *
489598 * @example
490599 * // Adding a validation middleware
491600 * ApiConfig.addCustomRoutes((server) => {
492601 * server.addHook('onRequest', (request, reply, done) => {
493602 * // Check for secret
494603 * const secret = request.headers['x-secret'] as string;
495- *
604+ *
496605 * if (secret === '49cb1de3-419b-4647-bf06-22c9e1110313') {
497606 * // Valid secret, proceed
498- * return done();
607+ * return done();
499608 * } else {
500609 * return reply.status(401).send({
501610 * message: 'Unauthorized: Invalid secret.',
0 commit comments