Skip to content

Commit e20dc7b

Browse files
committed
fixes
1 parent 69ef1a9 commit e20dc7b

File tree

14 files changed

+857
-109
lines changed

14 files changed

+857
-109
lines changed

botasaurus-controls/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
"build": "rimraf dist && tsc && rimraf dist/tsconfig.tsbuildinfo && python3 fix.py",
2626
"prepublishOnly": "npm run build",
2727
"build-mv-starter": "npm run build && mv -f ~/Documents/grow/botasaurus/botasaurus-controls/dist/* ~/Documents/botasaurus-desktop-starter/node_modules/botasaurus-controls/dist/",
28+
"build-mv-maps": "npm run build && mv -f ~/Documents/grow/botasaurus/botasaurus-controls/dist/* ~/Documents/google-maps-extractor-desktop/node_modules/botasaurus-controls/dist/",
2829
"lint": "eslint src test",
2930
"clean-install": "rm -rf dist/ node_modules/ package-lock.json yarn.lock .next/* && npm install",
3031
"upload": "python3 increment_version.py && rm -rf dist/ && npm publish",

js/botasaurus-js/src/playwright.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,14 @@ import {
3131
import { FormatType } from "./formats";
3232
import { createPlaywrightChrome, PlaywrightChrome } from "./page";
3333
import { getBotasaurusStorage } from "./botasaurus-storage"
34-
import { determineMaxLimit, drainQueue, getItemRepr, removeItemFromSeenItemsSet } from "./task"
34+
import { determineMaxLimit, drainQueue, getItemRepr, removeItemFromSeenItemsSet, TaskRunOptions } from "./task"
3535

36-
type PlaywrightRunOptions<I = any> = {
36+
37+
38+
type PlaywrightRunOptions<I = any> = TaskRunOptions<I> & {
3739
page: Page;
3840
context: BrowserContext;
39-
data: I;
40-
metadata: any;
41+
4142
};
4243

4344
type PlaywrightOptions<I> = {
@@ -146,6 +147,10 @@ function createPlaywright<I>(
146147
: performPlaywright.__name__;
147148
// @ts-ignore
148149
const returnDontCacheAsIs = combined.returnDontCacheAsIs;
150+
// @ts-ignore
151+
const isAborted = combined.isAborted ?? (() => false);
152+
// @ts-ignore
153+
const pushData = combined.pushData ?? (() => {});
149154
const fn_name = performPlaywright.__name__;
150155

151156
if (cache) {
@@ -192,7 +197,7 @@ function createPlaywright<I>(
192197
// ...
193198
}
194199

195-
result = await run({ data, metadata, ...driver });
200+
result = await run({ data, metadata, isAborted, pushData, ...driver });
196201
if (result === undefined) {
197202
result = null;
198203
}

js/botasaurus-js/src/task.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ import {
1717
import { flatten } from './list-utils'
1818
import { FormatType } from './formats'
1919

20-
type TaskRunOptions<I=any> = {
20+
export type TaskRunOptions<I=any> = {
2121
data: I
2222
metadata: any
23+
isAborted: () => boolean
24+
pushData: (data: Record<string, any> | Record<string, any>[]) => Promise<void>
2325
}
2426

2527
type TaskOptions<I> = {
@@ -55,6 +57,10 @@ function createTask<I>(options: TaskOptions<I>, is_async_queue: boolean) {
5557
performTask.__name__ = isNotEmptyString(name) ? name!.trim() : performTask.__name__
5658
// @ts-ignore
5759
const returnDontCacheAsIs = combined.returnDontCacheAsIs
60+
// @ts-ignore
61+
const isAborted = combined.isAborted ?? (() => false)
62+
// @ts-ignore
63+
const pushData = combined.pushData ?? (() => {})
5864
const fn_name = performTask.__name__
5965

6066
if (cache) {
@@ -80,10 +86,9 @@ function createTask<I>(options: TaskOptions<I>, is_async_queue: boolean) {
8086
path = _getCachePath(fn_name, data)
8187
}
8288

83-
8489
let result: any
8590
try {
86-
result = await run({ data, metadata })
91+
result = await run({ data, metadata, isAborted, pushData })
8792
if (result === undefined) {
8893
result = null
8994
}

js/botasaurus-server-js/package-lock.json

Lines changed: 51 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

js/botasaurus-server-js/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@
220220
},
221221
"dependencies": {
222222
"@aws-sdk/client-s3": "^3.943.0",
223+
"@aws-sdk/lib-storage": "^3.943.0",
223224
"@seald-io/nedb": "latest",
224225
"async-mutex": "^0.5.0",
225226
"botasaurus": "latest",

js/botasaurus-server-js/src/api-config.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import { isDontCache } from "botasaurus/dontcache";
3535
import { JsonHTTPResponseWithMessage } from "./errors";
3636
import { cleanDataInPlace } from "botasaurus/output";
3737
import { db, removeDuplicatesByKey } from "./models";
38-
import { DEFAULT_TASK_TIMEOUT, MasterExecutor, TaskCompletionPayload, TaskFailurePayload } from "./master-executor";
38+
import { DEFAULT_TASK_TIMEOUT, MasterExecutor, TaskCompletionPayload, TaskFailurePayload, PushDataChunkPayload, PushDataCompletePayload } from "./master-executor";
3939
import { WorkerExecutor } from "./worker-executor";
4040

4141
/**
@@ -403,6 +403,33 @@ function registerMasterRoutes(app: FastifyInstance) {
403403
const executor = getExecutor() as MasterExecutor;
404404
return executor.handleWorkerShutdown(taskIds);
405405
});
406+
407+
// Check abortion status for multiple tasks (used by workers)
408+
app.post('/k8s/check-abortion-status', async (request: FastifyRequest<{ Body: { taskIds: number[] } }>, _) => {
409+
const taskIds = request.body?.taskIds || [];
410+
411+
const executor = getExecutor() as MasterExecutor;
412+
const results = await executor.getTasksAbortionResults(taskIds);
413+
return results;
414+
});
415+
416+
// PushData chunk endpoint - receives data chunks from workers
417+
app.post('/k8s/push-data-chunk', async (request: FastifyRequest<{ Body: PushDataChunkPayload }>, _) => {
418+
const payload = request.body;
419+
validateTaskIdInPayload(payload.taskId);
420+
421+
const executor = getExecutor() as MasterExecutor;
422+
return executor.handlePushDataChunk(payload);
423+
});
424+
425+
// PushData complete endpoint - finalizes task after all chunks sent
426+
app.post('/k8s/push-data-complete', async (request: FastifyRequest<{ Body: PushDataCompletePayload }>, _) => {
427+
const payload = request.body;
428+
validateTaskIdInPayload(payload.taskId);
429+
430+
const executor = getExecutor() as MasterExecutor;
431+
return executor.handlePushDataComplete(payload);
432+
});
406433
}
407434

408435
/**

js/botasaurus-server-js/src/master-executor.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { Mutex } from 'async-mutex';
22
import { getPendingTasks, TaskExecutor, TaskPriority } from './task-executor';
33
import { db, Task, TaskStatus } from './models';
44
import { Server } from './server';
5+
import { TaskResults } from './task-results';
56
export const DEFAULT_TASK_TIMEOUT = 8 * 60 * 60
67
/**
78
* Payload for task completion from worker
@@ -34,6 +35,31 @@ export interface TaskFailurePayload {
3435
} | null;
3536
}
3637

38+
/**
39+
* Payload for pushData chunk from worker
40+
*/
41+
export interface PushDataChunkPayload {
42+
taskId: number;
43+
chunk: any[];
44+
}
45+
46+
/**
47+
* Payload for pushData completion from worker
48+
*/
49+
export interface PushDataCompletePayload {
50+
taskId: number;
51+
itemCount: number;
52+
isDontCache: boolean;
53+
scraperName: string;
54+
taskData: any;
55+
parentTaskId?: number | null;
56+
capacity?: {
57+
scraperType?: string;
58+
scraperName?: string;
59+
maxTasks: number;
60+
} | null;
61+
}
62+
3763

3864
/**
3965
* Stale task recovery interval: 60 seconds
@@ -233,4 +259,41 @@ export class MasterExecutor extends TaskExecutor {
233259
console.log(`[Master] Released ${releasedCount}/${inProgressTaskIds.length} tasks from shutting down worker`);
234260
return { releasedCount };
235261
}
262+
263+
/**
264+
* Handle pushData chunk from worker.
265+
* Appends chunk to the task's result file.
266+
*/
267+
async handlePushDataChunk(payload: PushDataChunkPayload) {
268+
const { taskId, chunk } = payload;
269+
270+
try {
271+
if (chunk && chunk.length > 0) {
272+
await TaskResults.appendAllTask(taskId, chunk);
273+
}
274+
return {};
275+
} catch (error) {
276+
console.error('[Master] Error handling pushData chunk:', error);
277+
throw error;
278+
}
279+
}
280+
281+
/**
282+
* Handle pushData completion from worker.
283+
* Finalizes the task (caching, status update, parent update).
284+
* Piggyback pattern: also return new tasks for worker's available capacity.
285+
*/
286+
async handlePushDataComplete(payload: PushDataCompletePayload): Promise<{ nextTasks: any[] }> {
287+
const { taskId, itemCount, isDontCache, scraperName, taskData, parentTaskId, capacity } = payload;
288+
const taskFilePath = TaskResults.generateTaskFilePath(taskId);
289+
290+
try {
291+
await this.reportTaskSuccessWithPushData(taskId, taskFilePath, itemCount, isDontCache, scraperName, taskData, parentTaskId as any, null as any)
292+
} catch (error) {
293+
console.error('[Master] Error handling pushData complete:', error);
294+
}
295+
296+
// Piggyback: Acquire next tasks if capacity is provided
297+
return this.acquireNextTasks(capacity);
298+
}
236299
}

js/botasaurus-server-js/src/ndjson.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ export async function appendNdJson(data: any[], taskPath: string) {
186186
return taskPath
187187
}
188188

189-
export async function readNdJsonCallback(taskPath: string, onData: (item: any, index: number) => undefined | false | Promise<undefined | false>, limit: number | null | undefined = null): Promise<{processedItems: number, hasExited: boolean}> {
189+
export async function readNdJsonCallback(taskPath: string, onData: (item: any, index: number) => void | false | Promise<void | false>, limit: number | null | undefined = null): Promise<{processedItems: number, hasExited: boolean}> {
190190
const fileStream = fs.createReadStream(taskPath, { encoding: 'utf-8' });
191191
let lineNumber = 0;
192192
let processedItems = 0;
@@ -204,13 +204,22 @@ export async function readNdJsonCallback(taskPath: string, onData: (item: any, i
204204
if (trimmedLine !== '') {
205205
try {
206206
const item = JSON.parse(trimmedLine);
207-
const result = await onData(item, processedItems);
207+
let result;
208+
try {
209+
result = await onData(item, processedItems);
210+
} catch (error: any) {
211+
error.isOnDataError = true;
212+
throw error;
213+
}
208214
processedItems++;
209215
if (result === false) {
210216
hasExited = true
211217
break;
212218
}
213-
} catch (error) {
219+
} catch (error: any) {
220+
if (error.isOnDataError) {
221+
throw error;
222+
}
214223
// Handle potential malformed JSON
215224
const splitLines = trimmedLine.split('}{');
216225

@@ -241,7 +250,7 @@ export async function readNdJsonCallback(taskPath: string, onData: (item: any, i
241250
}
242251

243252
// @ts-ignore
244-
isLimitReached = isNotNullish(limit) && processedItems >= limit
253+
isLimitReached = isNotNullish(limit) && processedItems >= limit
245254

246255
if (isLimitReached) {
247256
break;
@@ -279,7 +288,7 @@ function fixNdjsonFilename(filename: string): string {
279288
*/
280289
export async function readNdjson<T = any>(
281290
filename: string,
282-
onData: (item: T, index: number) => undefined | false,
291+
onData: (item: T, index: number) => void | false,
283292
limit?: number | null
284293
): Promise<number>{
285294
filename = fixNdjsonFilename(filename)

0 commit comments

Comments
 (0)