Skip to content

Commit 8204b9f

Browse files
committed
fixes
1 parent 0376df7 commit 8204b9f

File tree

4 files changed

+104
-75
lines changed

4 files changed

+104
-75
lines changed

js/botasaurus-server-js/src/api-config.ts

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import { DirectCallCacheService } from "./task-results";
3333
import { cleanBasePath, isNotEmptyObject, isObject } from "./utils";
3434
import { isDontCache } from "botasaurus/dontcache";
3535
import { JsonHTTPResponseWithMessage } from "./errors";
36-
import { cleanDataInPlace } from "botasaurus/output";
36+
import { cleanDataInPlace, normalizeData } from "botasaurus/output";
3737
import { db, removeDuplicatesByKey } from "./models";
3838
import { DEFAULT_TASK_TIMEOUT, MasterExecutor, TaskCompletionPayload, TaskFailurePayload, PushDataChunkPayload, PushDataCompletePayload } from "./master-executor";
3939
import { WorkerExecutor } from "./worker-executor";
@@ -119,6 +119,12 @@ function addScraperRoutes(app: FastifyInstance, apiBasePath: string) {
119119
: scraper.scraper_type;
120120

121121
const scrapingFunction = async (request: any, reply: any) => {
122+
// Track if request was aborted by client
123+
let aborted = false;
124+
const onAbort = () => { aborted = true; };
125+
request.raw.on('close', onAbort);
126+
const isAborted = () => aborted;
127+
122128
try {
123129
const params: Record<string, any> = {};
124130
for (const [key, value] of Object.entries(
@@ -171,6 +177,11 @@ function addScraperRoutes(app: FastifyInstance, apiBasePath: string) {
171177
try {
172178
// Execute function for each data item
173179
for (const dataItem of dataItems) {
180+
// Early exit if client disconnected
181+
if (aborted) {
182+
return [];
183+
}
184+
174185
let cacheKey: string | undefined;
175186
let isFromCache = false;
176187
let resultData: any = null;
@@ -200,15 +211,26 @@ function addScraperRoutes(app: FastifyInstance, apiBasePath: string) {
200211
if (!isFromCache) {
201212
// Wait for capacity if needed (only on first execution)
202213
if (!shouldDecrementCapacity) {
203-
while (!getExecutor().hasCapacity(key)) {
214+
while (!getExecutor().hasCapacity(key) && !aborted) {
204215
await sleep(0.1);
205216
}
217+
if (aborted) {
218+
return [];
219+
}
206220
getExecutor().incrementCapacity(key);
207221
shouldDecrementCapacity = true;
208222
}
223+
const xs:any = []
224+
function pushData(data: any) {
225+
const items = normalizeData(data)
226+
xs.push(...items)
227+
return Promise.resolve();
228+
}
209229

210230
const result = await fn(dataItem, {
211231
...mt,
232+
isAborted,
233+
pushData,
212234
parallel: null,
213235
cache: false,
214236
beep: false,
@@ -226,6 +248,8 @@ function addScraperRoutes(app: FastifyInstance, apiBasePath: string) {
226248
} else {
227249
resultData = result;
228250
}
251+
pushData(resultData)
252+
resultData = xs
229253
}
230254

231255
// Collect result
@@ -238,6 +262,7 @@ function addScraperRoutes(app: FastifyInstance, apiBasePath: string) {
238262
}
239263
} finally {
240264
restoreCapacity();
265+
request.raw.off('close', onAbort);
241266
}
242267

243268
// Determine if we should return first object

js/botasaurus-server-js/src/master-executor.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -220,15 +220,16 @@ export class MasterExecutor extends TaskExecutor {
220220
}
221221

222222
private async acquireNextTasks(capacity: { scraperType?: string; scraperName?: string; maxTasks: number|null } | null | undefined) {
223-
let nextTasks: any[] = []
224-
if (capacity) {
225-
if (capacity.scraperType) {
226-
nextTasks = await this.acquireTasksByScraperType(capacity.scraperType, capacity.maxTasks)
227-
} else if (capacity.scraperName) {
228-
nextTasks = await this.acquireTasksByScraperName(capacity.scraperName, capacity.maxTasks)
229-
}
223+
if (!capacity || capacity.maxTasks === 0) {
224+
return { nextTasks: [] };
225+
}
226+
if (capacity.scraperType) {
227+
return { nextTasks: await this.acquireTasksByScraperType(capacity.scraperType, capacity.maxTasks) };
228+
}
229+
if (capacity.scraperName) {
230+
return { nextTasks: await this.acquireTasksByScraperName(capacity.scraperName, capacity.maxTasks) };
230231
}
231-
return { nextTasks };
232+
return { nextTasks: [] };
232233
}
233234

234235
/**

js/botasaurus-server-js/src/task-executor.ts

Lines changed: 63 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -498,72 +498,77 @@ class TaskExecutor {
498498
const pushDataWriter = new PushDataWriter(taskId, removeDuplicatesBy, onResultCountUpdate)
499499
const pushData = pushDataWriter.push.bind(pushDataWriter)
500500

501-
let exceptionLog: any = null
501+
// Track whether task resources have been released to prevent double-release
502+
let released = false
503+
const releaseTask = () => {
504+
if (!released) {
505+
this.decrementCapacity(key)
506+
cleanup()
507+
released = true
508+
}
509+
}
510+
502511
try {
503-
let result: any = null
504-
try {
505-
result = await fn(taskData, {
506-
...metadata,
507-
isAborted,
508-
pushData,
509-
parallel: null,
510-
cache: false,
511-
beep: false,
512-
raiseException: true,
513-
closeOnCrash: true,
514-
output: null,
515-
createErrorLogs: false,
516-
returnDontCacheAsIs: true,
517-
})
518-
let isResultDontCached = false
519-
if (isDontCache(result)) {
520-
isResultDontCached = true
521-
result = result.data
522-
}
512+
const result = await fn(taskData, {
513+
...metadata,
514+
isAborted,
515+
pushData,
516+
parallel: null,
517+
cache: false,
518+
beep: false,
519+
raiseException: true,
520+
closeOnCrash: true,
521+
output: null,
522+
createErrorLogs: false,
523+
returnDontCacheAsIs: true,
524+
})
523525

526+
let isResultDontCached = false
527+
let processedResult = result
528+
if (isDontCache(result)) {
529+
isResultDontCached = true
530+
processedResult = result.data
531+
}
524532

525-
this.decrementCapacity(key)
526-
cleanup()
533+
releaseTask()
527534

528-
if (pushDataWriter.wasUsed()) {
529-
// Push any returned result as well
530-
await pushData(result)
531-
532-
// Close stream and perform normalization (returns final result count)
533-
await pushDataWriter.close()
534-
535-
// pushData was used - report success with the normalized result
536-
await this.reportTaskSuccessWithPushData(
537-
taskId,
538-
pushDataWriter.getFilePath(),
539-
pushDataWriter.getItemCount(),
540-
isResultDontCached,
541-
scraperName,
542-
taskData,
543-
parent_task_id,
544-
key
545-
)
546-
} else {
547-
// Normal flow - result was returned
548-
result = cleanDataInPlace(result)
549-
if (removeDuplicatesBy && Array.isArray(result)) {
550-
result = removeDuplicatesByKey(result, removeDuplicatesBy)
551-
}
552-
await this.reportTaskSuccess(taskId, result, isResultDontCached, scraperName, taskData, parent_task_id, key)
553-
}
535+
if (pushDataWriter.wasUsed()) {
536+
// Push any returned result as well
537+
await pushData(processedResult)
554538

555-
} catch (error) {
539+
// Close stream and perform normalization (returns final result count)
556540
await pushDataWriter.close()
557-
cleanup()
558-
this.decrementCapacity(key)
559-
exceptionLog = formatExc(error)
560-
console.error(error)
561-
await this.reportTaskFailure(taskId, exceptionLog, parent_task_id, key)
562-
}
541+
542+
// pushData was used - report success with the normalized result
543+
await this.reportTaskSuccessWithPushData(
544+
taskId,
545+
pushDataWriter.getFilePath(),
546+
pushDataWriter.getItemCount(),
547+
isResultDontCached,
548+
scraperName,
549+
taskData,
550+
parent_task_id,
551+
key
552+
)
553+
} else {
554+
// Normal flow - result was returned
555+
processedResult = cleanDataInPlace(processedResult)
556+
if (removeDuplicatesBy && Array.isArray(processedResult)) {
557+
processedResult = removeDuplicatesByKey(processedResult, removeDuplicatesBy)
558+
}
559+
await this.reportTaskSuccess(taskId, processedResult, isResultDontCached, scraperName, taskData, parent_task_id, key)
560+
}
563561
} catch (error) {
562+
// Release task resources FIRST to prevent capacity leak
563+
releaseTask()
564564
await pushDataWriter.close()
565-
cleanup()
566-
console.error("Error in run_task", error)
565+
const exceptionLog = formatExc(error)
566+
console.error(error)
567+
try {
568+
await this.reportTaskFailure(taskId, exceptionLog, parent_task_id, key)
569+
} catch (reportError) {
570+
console.error("Error reporting task failure:", reportError)
571+
}
567572
}
568573
}
569574

js/botasaurus-server-js/src/worker-executor.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ export class WorkerExecutor extends TaskExecutor {
132132
* Worker loop for scraper-type-based rate limiting.
133133
* Polls master for tasks by scraper type.
134134
*/
135-
private async startScraperTypeBasedWorker(): Promise<void> {
135+
private startScraperTypeBasedWorker(): void {
136136
const keys: string[] = [];
137137

138138
if (Server.getBrowserScrapers().length > 0) {
@@ -152,7 +152,7 @@ export class WorkerExecutor extends TaskExecutor {
152152
* Worker loop for scraper-name-based rate limiting.
153153
* Polls master for tasks by scraper name.
154154
*/
155-
private async startScraperNameBasedWorker(): Promise<void> {
155+
private startScraperNameBasedWorker(): void {
156156
this.runWorkerLoop(Server.getScrapersNames());
157157
}
158158

@@ -425,23 +425,21 @@ export class WorkerExecutor extends TaskExecutor {
425425

426426

427427

428-
private runNextTasks(nextTasks: any, key: string) {
429-
428+
private async runNextTasks(nextTasks: any, key: string): Promise<void> {
430429
if (!nextTasks || nextTasks.length === 0) {
431430
return;
432431
}
433432

434433
if (this.isShuttingDown) {
435434
console.log(`[Worker] Shutting down, skipping next tasks`);
436-
return this.releaseTasksToPending(nextTasks.map((task:any) => task.id));
437-
435+
await this.releaseTasksToPending(nextTasks.map((task:any) => task.id));
436+
return;
438437
}
439438
this.consecutiveEmptyPolls = 0
440439
for (const task of nextTasks) {
441440
this.inProgressTaskIds.add(task.id);
442441
this.runTaskAndUpdateCapacity(key, task)
443442
}
444-
return;
445443
}
446444

447445
/**

0 commit comments

Comments
 (0)