Skip to content

Commit 8eb6cb5

Browse files
committed
fixes
1 parent 435f7fb commit 8eb6cb5

File tree

4 files changed

+146
-108
lines changed

4 files changed

+146
-108
lines changed

js/botasaurus-js/src/playwright.ts

Lines changed: 35 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ import {
2828
_get,
2929
_remove,
3030
} from "./cache";
31-
import { flatten } from "./list-utils";
3231
import { FormatType } from "./formats";
3332
import { createPlaywrightChrome, PlaywrightChrome } from "./page";
3433
import { getBotasaurusStorage } from "./botasaurus-storage"
34+
import { determineMaxLimit, drainQueue } from "./task"
3535

3636
type PlaywrightRunOptions<I = any> = {
3737
page: Page;
@@ -392,12 +392,24 @@ export function playwright<I = any>(options: PlaywrightOptions<I>) {
392392
export function playwrightQueue<I = any>(
393393
options: PlaywrightOptions<I> & { sequential?: boolean }
394394
) {
395-
const run = createPlaywright<I>(options, true);
395+
// Extract parallel from options - it controls queue-level concurrency, not passed to createPlaywright
396+
const { parallel: parallelOption, ...playwrightOptions } = options
397+
const run = createPlaywright<I>(playwrightOptions as PlaywrightOptions<I>, true);
396398
const performPlaywright = () => {
397399
let seenItems = new Set();
398400
let lastPromise: Promise<any> = Promise.resolve();
399-
let promises: any[] = [];
400-
const sequential = "sequential" in options ? options.sequential : true;
401+
const state = { promises: [] as any[] };
402+
let sequential = "sequential" in options ? options.sequential : false;
403+
404+
// Create concurrency limiter for parallel mode
405+
const maxLimit = determineMaxLimit(parallelOption)
406+
407+
let limit: pLimit.Limit
408+
if (maxLimit <= 1) {
409+
sequential = true
410+
} else {
411+
limit = pLimit(maxLimit)
412+
}
401413

402414
function getUnique(items: any[]) {
403415
let singleItem = false;
@@ -438,6 +450,11 @@ export function playwrightQueue<I = any>(
438450
return singleItem && newItems.length ? newItems[0] : newItems;
439451
}
440452

453+
const cleanup = () => {
454+
seenItems.clear();
455+
lastPromise = Promise.resolve();
456+
};
457+
441458
return {
442459
put: function (
443460
data: any,
@@ -451,56 +468,24 @@ export function playwrightQueue<I = any>(
451468
run(uniqueData, overrideOptions)
452469
);
453470
lastPromise = promise;
471+
472+
state.promises.push(promise);
473+
return promise.then((x) => x.result);
454474
} else {
455-
// runs in parallel
456-
promise = run(uniqueData, overrideOptions);
475+
if (Array.isArray(uniqueData)) {
476+
promise = Promise.all(uniqueData.map(x => limit(() => run(x, overrideOptions))))
477+
state.promises.push(promise.then(results => ({ originalData: uniqueData, result: results.map((x: any) => x.result) })))
478+
return promise.then(results => results.map((x: any) => x.result))
479+
} else {
480+
promise = limit(() => run(uniqueData, overrideOptions))
481+
482+
state.promises.push(promise)
483+
return promise.then((x) => x.result);
484+
}
457485
}
458-
promises.push(promise);
459-
return promise.then((x) => x.result);
460486
},
461487
get: async function () {
462-
// return flatten(self.result_list)
463-
const result_list = [];
464-
const orignal_data = [];
465-
466-
try {
467-
const results = await Promise.all(promises);
468-
for (let index = 0; index < results.length; index++) {
469-
const { originalData, result } = results[index];
470-
if (Array.isArray(originalData)) {
471-
orignal_data.push(...originalData);
472-
} else {
473-
orignal_data.push(originalData);
474-
}
475-
476-
if (Array.isArray(originalData)) {
477-
result_list.push(...result);
478-
} else {
479-
result_list.push(result);
480-
}
481-
}
482-
483-
promises = [];
484-
seenItems.clear();
485-
lastPromise = Promise.resolve();
486-
const { output = "default", outputFormats = null } =
487-
options;
488-
// fix if output is []
489-
const final = flatten(result_list);
490-
writeOutput(
491-
output,
492-
outputFormats,
493-
orignal_data,
494-
final,
495-
run.__name__
496-
);
497-
return final;
498-
} catch (error) {
499-
promises = [];
500-
seenItems.clear();
501-
lastPromise = Promise.resolve();
502-
throw error;
503-
}
488+
return drainQueue(state, cleanup, options, run.__name__);
504489
},
505490
close: async () => {
506491
await run.close();

js/botasaurus-js/src/task.ts

Lines changed: 85 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -228,13 +228,24 @@ export function task<I=any>(options: TaskOptions<I>) {
228228
}
229229

230230
export function taskQueue<I=any>(options: TaskOptions<I>& { sequential?: boolean }) {
231-
const run = createTask<I>(options, true)
231+
// Extract parallel from options - it controls queue-level concurrency, not passed to createTask
232+
const { parallel: parallelOption, ...taskOptions } = options
233+
const run = createTask<I>(taskOptions as TaskOptions<I>, true)
232234
const performTask = () => {
233235
let seenItems = new Set()
234236
let lastPromise: Promise<any> = Promise.resolve()
235-
let promises: any[] = []
236-
const sequential = 'sequential' in options ? options.sequential : true
237-
237+
const state = { promises: [] as any[] }
238+
let sequential = 'sequential' in options ? options.sequential : false
239+
240+
// Create concurrency limiter for parallel mode
241+
const maxLimit = determineMaxLimit(parallelOption)
242+
243+
let limit: pLimit.Limit
244+
if (maxLimit <=1) {
245+
sequential = true
246+
} else {
247+
limit = pLimit(maxLimit)
248+
}
238249

239250
function getUnique(items: any[]) {
240251
let singleItem = false
@@ -269,6 +280,11 @@ export function taskQueue<I=any>(options: TaskOptions<I>& { sequential?: boolean
269280
return singleItem && newItems.length ? newItems[0] : newItems
270281
}
271282

283+
const cleanup = () => {
284+
seenItems.clear()
285+
lastPromise = Promise.resolve()
286+
}
287+
272288
return {
273289
put: function (data: any, overrideOptions: Omit<TaskOptions<any>, 'run'> = {}) {
274290
const uniqueData = getUnique(data)
@@ -277,53 +293,78 @@ export function taskQueue<I=any>(options: TaskOptions<I>& { sequential?: boolean
277293
// runs sequentially
278294
promise = lastPromise.then(() => run(uniqueData, overrideOptions))
279295
lastPromise = promise
296+
297+
state.promises.push(promise)
298+
return promise.then(x=>x.result)
280299
} else {
281-
// runs in parallel
282-
promise = run(uniqueData, overrideOptions)
300+
if (Array.isArray(uniqueData)) {
301+
promise = Promise.all(uniqueData.map(x=> limit(() => run(x, overrideOptions))))
302+
state.promises.push(promise.then(results=>({originalData: uniqueData, result: results.map((x: any)=>x.result)})))
303+
return promise.then(results=>results.map((x: any)=>x.result))
304+
} else {
305+
promise = limit(() => run(uniqueData, overrideOptions))
306+
307+
state.promises.push(promise)
308+
return promise.then(x=>x.result)
309+
}
283310
}
284-
promises.push(promise)
285-
return promise.then(x=>x.result)
286311
},
287312
get: async function () {
288-
// return flatten(self.result_list)
289-
const result_list = []
290-
const orignal_data = []
291-
292-
try {
293-
const results = await Promise.all(promises)
294-
for (let index = 0; index < results.length; index++) {
295-
const { originalData, result } = results[index]
296-
if (Array.isArray(originalData)) {
297-
orignal_data.push(...originalData)
298-
} else {
299-
orignal_data.push(originalData)
300-
}
301-
302-
if (Array.isArray(originalData)) {
303-
result_list.push(...result)
304-
} else {
305-
result_list.push(result)
306-
}
307-
}
313+
return drainQueue(state, cleanup, options, run.__name__)
314+
}
315+
}
316+
}
317+
performTask._isQueue = true
318+
return performTask
319+
}
308320

309-
promises = []
310-
seenItems.clear()
311-
lastPromise = Promise.resolve()
312-
const { output = 'default', outputFormats = null } = options
313-
// fix if output is []
314-
const final = flatten(result_list)
315-
writeOutput(output, outputFormats, orignal_data, final, run.__name__)
316-
return final
317-
} catch (error) {
318-
promises = []
319-
seenItems.clear()
320-
lastPromise = Promise.resolve()
321-
throw error
321+
export function determineMaxLimit(parallelOption: number | ((data: any) => number) | undefined) {
322+
const parallelCount = typeof parallelOption === 'function' ? parallelOption(null) : (parallelOption ?? 1)
323+
324+
const maxLimit = Math.max(1, parallelCount)
325+
return maxLimit
326+
}
327+
328+
export async function drainQueue(
329+
state: { promises: Promise<any>[] },
330+
cleanup: () => void,
331+
options: { output?: string | ((data: any, result: any) => void) | null, outputFormats?: FormatType[] | null },
332+
fnName: string
333+
) {
334+
const result_list: any[] = []
335+
const orignal_data: any[] = []
336+
337+
try {
338+
// Drain the queue - keep processing until no new promises are added
339+
while (state.promises.length > 0) {
340+
const currentPromises = state.promises
341+
state.promises = []
342+
343+
const results = await Promise.all(currentPromises)
344+
for (let index = 0; index < results.length; index++) {
345+
const { originalData, result } = results[index]
346+
if (Array.isArray(originalData)) {
347+
orignal_data.push(...originalData)
348+
} else {
349+
orignal_data.push(originalData)
322350
}
323351

352+
if (Array.isArray(originalData)) {
353+
result_list.push(...result)
354+
} else {
355+
result_list.push(result)
356+
}
324357
}
325358
}
359+
360+
cleanup()
361+
const { output = 'default', outputFormats = null } = options
362+
const final = flatten(result_list)
363+
writeOutput(output, outputFormats, orignal_data, final, fnName)
364+
return final
365+
} catch (error) {
366+
state.promises = []
367+
cleanup()
368+
throw error
326369
}
327-
performTask._isQueue = true
328-
return performTask
329-
}
370+
}

js/botasaurus-server-js/src/server.ts

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -485,26 +485,35 @@ class _Server {
485485
}
486486

487487
validateRateLimit(): void {
488+
const toBeValidatedObject = this.rateLimit
489+
const kind = 'rate limit'
490+
491+
this.validateAgainstLimit(toBeValidatedObject, kind)
492+
}
493+
494+
validateAgainstLimit(toBeValidatedObject: Record<string, number> | { browser?: number; request?: number; task?: number }, kind: 'rate limit' | 'task timeout') {
488495
if (this.isScraperBasedRateLimit) {
489-
const scraperNames = new Set(this.getScrapersNames());
490-
const invalidKeys = Object.keys(this.rateLimit).filter(
496+
const scraperNames = new Set(this.getScrapersNames())
497+
const invalidKeys = Object.keys(toBeValidatedObject).filter(
491498
(key) => !scraperNames.has(key)
492-
);
499+
)
493500

494501
if (invalidKeys.length > 0) {
495-
const invalidKeysMessage = invalidKeys.length === 1
496-
? `Scraper with name '${invalidKeys[0]}' does not exist.`
497-
: `Scrapers with names ${invalidKeys.join(', ')} do not exist.`;
502+
const invalidKeysMessage = invalidKeys.length === 1
503+
? `Scraper with name '${invalidKeys[0]}' does not exist.`
504+
: `Scrapers with names ${invalidKeys.join(', ')} do not exist.`
505+
506+
const formattedLimit = JSON.stringify(this.rateLimit).replaceAll(",", ", ").replaceAll(":", ": ")
507+
498508

499-
const formattedLimit = JSON.stringify(this.rateLimit).replaceAll(",", ", ").replaceAll(":", ": ");
500509

501-
502510
throw new Error(
503-
`Your rate limit is set to ${formattedLimit}, but ${invalidKeysMessage}`
504-
);
511+
`Your ${kind} is set to ${formattedLimit}, but ${invalidKeysMessage}`
512+
)
505513
}
506514
}
507515
}
516+
508517
getRateLimit(): { browser?: number; request?: number; task?: number } | Record<string, number> {
509518
return this.rateLimit;
510519
}

js/botasaurus-server-js/src/task-executor.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ class TaskExecutor {
3131
public async start(): Promise<void> {
3232
await this.fixInProgressTasks()
3333
await this.completePendingButCompletedAllTask()
34-
34+
this.startTaskWorker()
35+
}
36+
37+
protected startTaskWorker(): void {
3538
if (Server.isScraperBasedRateLimit) {
3639
setImmediate(this.taskWorkerScraperBased.bind(this))
3740
} else {
@@ -145,12 +148,12 @@ class TaskExecutor {
145148
console.error(error)
146149
}
147150
}
148-
private async taskWorkerScraperBased(): Promise<void> {
151+
protected async taskWorkerScraperBased(): Promise<void> {
149152
await this.processScraperBasedTasks();
150153
setTimeout(this.taskWorkerScraperBased.bind(this), 1000);
151154
}
152155

153-
private async taskWorkerScraperTypeBased(): Promise<void> {
156+
protected async taskWorkerScraperTypeBased(): Promise<void> {
154157
await this.processScraperTypeBasedTasks();
155158
setTimeout(this.taskWorkerScraperTypeBased.bind(this), 1000);
156159
}
@@ -318,7 +321,7 @@ class TaskExecutor {
318321
this.currentCapacity[key] = (this.currentCapacity[key] ?? 0) - 1;
319322
}
320323

321-
private async runTask(task: any): Promise<void> {
324+
protected async runTask(task: any): Promise<void> {
322325
const key = Server.isScraperBasedRateLimit ? task.scraper_name : task.scraper_type
323326
const taskId = task.id
324327
const scraperName = task.scraper_name

0 commit comments

Comments
 (0)