Skip to content

Commit 107f601

Browse files
committed
fixes
1 parent e3b19e4 commit 107f601

File tree

4 files changed

+108
-65
lines changed

4 files changed

+108
-65
lines changed

js/botasaurus-desktop-api/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
],
4545
"scripts": {
4646
"build": "npm run clean && npm run build:node && npm run build:browser",
47-
"build-mv": "npm run build && mv -f ~/Documents/grow/botasaurus/js/botasaurus-desktop-api/dist/* ~/Documents/playground/my-project-name/node_modules/botasaurus-desktop-api/dist/",
47+
"build-mv-maps": "npm run build && mv -f ~/Documents/grow/botasaurus/js/botasaurus-desktop-api/dist/* ~/Documents/playground/my-project-name/node_modules/botasaurus-desktop-api/dist/",
4848
"build-dev": "nodemon --watch \"src/**/*.ts\" --exec \"npm run build-mv\"",
4949
"clean-install": "rm -rf dist/ node_modules/ package-lock.json yarn.lock .next/* && npm install",
5050
"prepublishOnly": "npm run build",

js/botasaurus-js/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@
131131
"scripts": {
132132
"dev": "tsc -w",
133133
"build": "rimraf dist tsconfig.tsbuildinfo && tsc && rimraf tsconfig.tsbuildinfo",
134-
"build-mv": "npm run build && mv -f ~/Documents/grow/botasaurus/js/botasaurus-js/dist/* ~/Documents/google-maps-extractor-desktop/node_modules/botasaurus/dist/",
134+
"build-mv-maps": "npm run build && mv -f ~/Documents/grow/botasaurus/js/botasaurus-js/dist/* ~/Documents/google-maps-extractor-desktop/node_modules/botasaurus/dist/",
135135
"build-mv-starter": "npm run build && mv -f ~/Documents/grow/botasaurus/js/botasaurus-js/dist/* ~/Documents/botasaurus-desktop-starter/node_modules/botasaurus-server/dist/",
136136
"clean": "rimraf dist tsconfig.tsbuildinfo",
137137
"clean-install": "rm -rf dist/ node_modules/ package-lock.json yarn.lock .next/* && npm install",

js/botasaurus-js/src/playwright.ts

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import {
3131
import { FormatType } from "./formats";
3232
import { createPlaywrightChrome, PlaywrightChrome } from "./page";
3333
import { getBotasaurusStorage } from "./botasaurus-storage"
34-
import { determineMaxLimit, drainQueue } from "./task"
34+
import { determineMaxLimit, drainQueue, getItemRepr, removeItemFromSeenItemsSet } from "./task"
3535

3636
type PlaywrightRunOptions<I = any> = {
3737
page: Page;
@@ -398,7 +398,7 @@ export function playwrightQueue<I = any>(
398398
const performPlaywright = () => {
399399
let seenItems = new Set();
400400
let lastPromise: Promise<any> = Promise.resolve();
401-
const state = { promises: [] as any[] };
401+
const state = { promises: [] as any[], draining: false };
402402
let sequential = "sequential" in options ? options.sequential : false;
403403

404404
// Create concurrency limiter for parallel mode
@@ -421,25 +421,7 @@ export function playwrightQueue<I = any>(
421421
let newItems = [];
422422

423423
for (let item of items) {
424-
let itemRepr;
425-
if (Array.isArray(item)) {
426-
itemRepr = JSON.stringify(item);
427-
} else if (item instanceof Set) {
428-
itemRepr = JSON.stringify(Array.from(item));
429-
} else if (
430-
typeof item === "number" ||
431-
typeof item === "string"
432-
) {
433-
itemRepr = item;
434-
} else if (
435-
item &&
436-
typeof item === "object" &&
437-
!Array.isArray(item)
438-
) {
439-
itemRepr = JSON.stringify(item);
440-
} else {
441-
itemRepr = JSON.stringify(item);
442-
}
424+
const itemRepr = getItemRepr(item);
443425

444426
if (!seenItems.has(itemRepr)) {
445427
newItems.push(item);
@@ -451,6 +433,8 @@ export function playwrightQueue<I = any>(
451433
}
452434

453435
const cleanup = () => {
436+
state.promises = [];
437+
state.draining = false;
454438
seenItems.clear();
455439
lastPromise = Promise.resolve();
456440
};
@@ -484,8 +468,11 @@ export function playwrightQueue<I = any>(
484468
}
485469
}
486470
},
487-
get: async function () {
488-
return drainQueue(state, cleanup, options, run.__name__);
471+
get: async function (n: number | null = null) {
472+
return drainQueue(state, cleanup, (items) => removeItemFromSeenItemsSet(items, seenItems), options, run.__name__, n);
473+
},
474+
isCompleted: function () {
475+
return state.promises.length === 0 && !state.draining;
489476
},
490477
close: async () => {
491478
await run.close();

js/botasaurus-js/src/task.ts

Lines changed: 96 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ function createTask<I>(options: TaskOptions<I>, is_async_queue: boolean) {
144144

145145
if (!closeOnCrash) {
146146
await prompt(
147-
"We've paused the browser to help you debug. Press 'Enter' to close."
147+
"We've paused the task to help you debug. Press 'Enter' to close."
148148
)
149149
}
150150
}
@@ -227,14 +227,37 @@ export function task<I=any>(options: TaskOptions<I>) {
227227
return createTask<I>(options, false)
228228
}
229229

230+
export function getItemRepr(item: any): string | number {
231+
if (Array.isArray(item)) {
232+
return JSON.stringify(item)
233+
} else if (item instanceof Set) {
234+
return JSON.stringify(Array.from(item))
235+
} else if (typeof item === 'number' || typeof item === 'string') {
236+
return item
237+
} else if (item && typeof item === 'object' && !Array.isArray(item)) {
238+
return JSON.stringify(item)
239+
} else {
240+
return JSON.stringify(item)
241+
}
242+
}
243+
244+
export function removeItemFromSeenItemsSet(items: any, seenItems: Set<any>) {
245+
if (!Array.isArray(items)) {
246+
items = [items]
247+
}
248+
for (const item of items) {
249+
const itemRepr = getItemRepr(item)
250+
seenItems.delete(itemRepr)
251+
}
252+
}
230253
export function taskQueue<I=any>(options: TaskOptions<I>& { sequential?: boolean }) {
231254
// Extract parallel from options - it controls queue-level concurrency, not passed to createTask
232255
const { parallel: parallelOption, ...taskOptions } = options
233256
const run = createTask<I>(taskOptions as TaskOptions<I>, true)
234257
const performTask = () => {
235258
let seenItems = new Set()
236259
let lastPromise: Promise<any> = Promise.resolve()
237-
const state = { promises: [] as any[] }
260+
const state = { promises: [] as any[], draining: false }
238261
let sequential = 'sequential' in options ? options.sequential : false
239262

240263
// Create concurrency limiter for parallel mode
@@ -247,6 +270,8 @@ export function taskQueue<I=any>(options: TaskOptions<I>& { sequential?: boolean
247270
limit = pLimit(maxLimit)
248271
}
249272

273+
274+
250275
function getUnique(items: any[]) {
251276
let singleItem = false
252277
if (!Array.isArray(items)) {
@@ -257,19 +282,7 @@ export function taskQueue<I=any>(options: TaskOptions<I>& { sequential?: boolean
257282
let newItems = []
258283

259284
for (let item of items) {
260-
let itemRepr
261-
if (Array.isArray(item)) {
262-
itemRepr = JSON.stringify(item)
263-
} else if (item instanceof Set) {
264-
itemRepr = JSON.stringify(Array.from(item))
265-
} else if (typeof item === 'number' || typeof item === 'string') {
266-
itemRepr = item
267-
}
268-
else if (item && typeof item === 'object' && !Array.isArray(item)) {
269-
itemRepr = JSON.stringify(item)
270-
} else {
271-
itemRepr = JSON.stringify(item)
272-
}
285+
const itemRepr = getItemRepr(item)
273286

274287
if (!seenItems.has(itemRepr)) {
275288
newItems.push(item)
@@ -281,6 +294,8 @@ export function taskQueue<I=any>(options: TaskOptions<I>& { sequential?: boolean
281294
}
282295

283296
const cleanup = () => {
297+
state.promises = []
298+
state.draining = false
284299
seenItems.clear()
285300
lastPromise = Promise.resolve()
286301
}
@@ -309,8 +324,11 @@ export function taskQueue<I=any>(options: TaskOptions<I>& { sequential?: boolean
309324
}
310325
}
311326
},
312-
get: async function () {
313-
return drainQueue(state, cleanup, options, run.__name__)
327+
get: async function (n: number | null = null) {
328+
return drainQueue(state, cleanup, (items) => removeItemFromSeenItemsSet(items,seenItems), options, run.__name__, n)
329+
},
330+
isCompleted: function () {
331+
return state.promises.length === 0 && !state.draining
314332
}
315333
}
316334
}
@@ -326,45 +344,83 @@ export function determineMaxLimit(parallelOption: number | ((data: any) => numbe
326344
}
327345

328346
export async function drainQueue(
329-
state: { promises: Promise<any>[] },
347+
state: { promises: Promise<any>[], draining: boolean },
330348
cleanup: () => void,
349+
removeItemFromSeenItemsSet: (item: any) => void,
331350
options: { output?: string | ((data: any, result: any) => void) | null, outputFormats?: FormatType[] | null },
332-
fnName: string
351+
fnName: string,
352+
n: number | null = null
333353
) {
354+
if (n !== null && n < 1) {
355+
throw new Error('n must be >= 1')
356+
}
357+
358+
if (!state.promises.length){
359+
return []
360+
}
361+
362+
334363
const result_list: any[] = []
335364
const orignal_data: any[] = []
336-
365+
// Only partial drain if n is specified AND less than current promises count
366+
const isPartialDrain = n !== null && n < state.promises.length
367+
368+
state.draining = true
337369
try {
338-
// Drain the queue - keep processing until no new promises are added
339-
while (state.promises.length > 0) {
340-
const currentPromises = state.promises
341-
state.promises = []
342-
343-
const results = await Promise.all(currentPromises)
344-
for (let index = 0; index < results.length; index++) {
345-
const { originalData, result } = results[index]
346-
if (Array.isArray(originalData)) {
347-
orignal_data.push(...originalData)
348-
} else {
349-
orignal_data.push(originalData)
370+
if (isPartialDrain) {
371+
// Partial drain - take first n promises
372+
let remaining = n!
373+
while (state.promises.length > 0 && remaining > 0) {
374+
const takeCount = Math.min(remaining, state.promises.length)
375+
const currentPromises = state.promises.slice(0, takeCount)
376+
state.promises = state.promises.slice(takeCount)
377+
remaining -= takeCount
378+
379+
const results = await Promise.all(currentPromises)
380+
for (let index = 0; index < results.length; index++) {
381+
const { originalData, result } = results[index]
382+
if (Array.isArray(originalData)) {
383+
orignal_data.push(...originalData)
384+
result_list.push(...result)
385+
} else {
386+
orignal_data.push(originalData)
387+
result_list.push(result)
388+
}
389+
removeItemFromSeenItemsSet(originalData)
350390
}
351-
352-
if (Array.isArray(originalData)) {
353-
result_list.push(...result)
354-
} else {
355-
result_list.push(result)
391+
}
392+
} else {
393+
// Full drain - keep processing until no new promises are added
394+
while (state.promises.length > 0) {
395+
const currentPromises = state.promises
396+
state.promises = [] // Reset before awaiting so new promises can be added
397+
const results = await Promise.all(currentPromises)
398+
for (let index = 0; index < results.length; index++) {
399+
const { originalData, result } = results[index]
400+
if (Array.isArray(originalData)) {
401+
orignal_data.push(...originalData)
402+
result_list.push(...result)
403+
} else {
404+
orignal_data.push(originalData)
405+
result_list.push(result)
406+
}
356407
}
357408
}
409+
cleanup()
358410
}
359411

360-
cleanup()
361412
const { output = 'default', outputFormats = null } = options
362413
const final = flatten(result_list)
363414
writeOutput(output, outputFormats, orignal_data, final, fnName)
415+
364416
return final
365417
} catch (error) {
366-
state.promises = []
367-
cleanup()
418+
419+
if (!isPartialDrain) {
420+
cleanup()
421+
}
368422
throw error
423+
} finally {
424+
state.draining = false
369425
}
370426
}

0 commit comments

Comments
 (0)