Skip to content

Commit ebbf936

Browse files
committed
fixes
1 parent 6df72ca commit ebbf936

File tree

7 files changed

+228
-28
lines changed

7 files changed

+228
-28
lines changed

js/botasaurus-js/src/output.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ function normalizeData(data: any): any[] {
310310
for (const item of data) {
311311
if (item === null || item === undefined) {
312312
continue
313-
} else if (typeof item !== 'object') {
313+
} else if (typeof item !== 'object' || Array.isArray(item)) {
314314
normalizedList.push({ data: item })
315315
} else {
316316
normalizedList.push(item)

js/botasaurus-server-js/src/api-config.ts

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import { cleanDataInPlace } from "botasaurus/output";
3737
import { db, removeDuplicatesByKey } from "./models";
3838
import { DEFAULT_TASK_TIMEOUT, MasterExecutor, TaskCompletionPayload, TaskFailurePayload, PushDataChunkPayload, PushDataCompletePayload } from "./master-executor";
3939
import { WorkerExecutor } from "./worker-executor";
40+
import { isMaster, isWorker } from "./env"
4041

4142
/**
4243
* Node role in K8s deployment
@@ -58,15 +59,6 @@ function isValidUrl(urlString: string): boolean {
5859
/**
5960
* Parse command line arguments for --master or --worker flags
6061
*/
61-
function isMaster() {
62-
const args = process.argv;
63-
return args.includes('--master')
64-
}
65-
66-
function isWorker() {
67-
const args = process.argv;
68-
return args.includes('--worker')
69-
}
7062

7163
/**
7264
* Check master endpoint health, retrying up to 3 times.
@@ -808,7 +800,7 @@ class ApiConfig {
808800

809801
// Parse command line flags
810802

811-
if (isMaster()) {
803+
if (isMaster) {
812804
// Validate visibility timeout against scrapers if it's an object
813805
if (typeof taskTimeout === 'object') {
814806
Server.validateAgainstLimit(taskTimeout, 'task timeout');
@@ -820,6 +812,8 @@ class ApiConfig {
820812
this.nodeRole = 'master';
821813
console.log("[K8s] Starting as Kubernetes MASTER node 👑");
822814

815+
// @ts-ignore
816+
global.master = true
823817
// Test self-connectivity (master calls itself)
824818
// @ts-ignore
825819
global.checkMasterHealth = () =>{
@@ -832,7 +826,7 @@ class ApiConfig {
832826
}
833827
}, 2000); // Delay to allow server to start
834828
}
835-
} else if (isWorker()) {
829+
} else if (isWorker) {
836830
// Set up worker executor
837831
// @ts-ignore
838832
global.executor = new WorkerExecutor(masterEndpoint);

js/botasaurus-server-js/src/env.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
11
const isInKubernetes: boolean = 'KUBERNETES_SERVICE_HOST' in process.env;
22

3-
const isWorker: boolean = isInKubernetes && process.env.NODE_TYPE === 'WORKER';
4-
const isMaster: boolean = isInKubernetes && process.env.NODE_TYPE === 'MASTER';
3+
function _isMaster() {
4+
const args = process.argv;
5+
return args.includes('--master')
6+
}
7+
8+
function _isWorker() {
9+
const args = process.argv;
10+
return args.includes('--worker')
11+
}
12+
13+
const isWorker: boolean = _isWorker();
14+
const isMaster: boolean = _isMaster();
515
const isDev: boolean =
616
process.env.NODE_ENV === 'development' || process.env.DEBUG_PROD === 'true';
717

js/botasaurus-server-js/src/models.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ function isoformat(obj: Date | null | undefined): string | null {
136136
export function createTaskName(taskName:any, taskId:any) {
137137
return taskName !== null ? taskName : `Task ${taskId}`;
138138
}
139-
function serializeUiOutputTask(obj: Task, _: any): any {
139+
function serializeUiOutputTask(obj: Task, _: any) {
140140
const taskId = obj.id;
141141

142142
return {
@@ -147,6 +147,7 @@ function serializeUiOutputTask(obj: Task, _: any): any {
147147
is_all_task: obj.is_all_task,
148148
started_at: isoformat(obj.started_at),
149149
finished_at: isoformat(obj.finished_at),
150+
parent_task_id: obj.parent_task_id,
150151
};
151152
}
152153

js/botasaurus-server-js/src/ndjson.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ export async function readNdJsonCallback(taskPath: string, onData: (item: any, i
204204
if (trimmedLine !== '') {
205205
try {
206206
const item = JSON.parse(trimmedLine);
207+
207208
let result;
208209
try {
209210
result = await onData(item, processedItems);

js/botasaurus-server-js/src/routes-db-logic.ts

Lines changed: 181 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import { downloadResults, downloadResultsHttp } from './download'
2424
import { getBotasaurusStorage } from 'botasaurus/botasaurus-storage'
2525
import { Launcher } from 'chrome-launcher/dist/chrome-launcher';
2626
import { electronConfig } from './paths'
27+
import { isMaster } from './env'
2728

2829

2930

@@ -33,14 +34,17 @@ async function performCreateAllTask(
3334
priority: number,
3435
scraper_name: string,
3536
scraper_type: string,
36-
all_task_sort_id: number,
37-
withResult: boolean = true
37+
all_task_sort_id: number,
38+
withResult: boolean = true,
39+
splitted_task_length: number = 0
3840
): Promise<[any, string]> {
41+
const task_name = splitted_task_length ? `All Task (${splitted_task_length} Tasks)` : 'All Task'
42+
3943
const allTask = new Task({
4044
id: await getAutoincrementId(),
4145
status: TaskStatus.PENDING,
4246
sort_id: all_task_sort_id,
43-
task_name: 'All Task',
47+
task_name: task_name,
4448
scraper_name,
4549
scraper_type,
4650
is_all_task: true,
@@ -275,7 +279,7 @@ async function performPatchTask(action: string, taskId: number): Promise<void> {
275279
if (action === 'delete') {
276280
await deleteTask(taskId, is_all_task, parent_task_id, removeDuplicatesBy);
277281
} else if (action === 'abort') {
278-
await abortTask(taskId, is_all_task, parent_task_id, removeDuplicatesBy);
282+
await abortTask(taskId, is_all_task, parent_task_id, removeDuplicatesBy, status);
279283
} else if (action === 'retry') {
280284
await retryTask(taskId, is_all_task, parent_task_id, status);
281285
}
@@ -362,11 +366,13 @@ async function createTasks(
362366
let all_task_id:any = null;
363367

364368
let tasksData: any[];
369+
let splitted_task_length = 0
365370
if (split_task) {
366371
tasksData = await split_task(deepCloneDict(data));
367372
if (tasksData.length === 0) {
368373
return [[], [], split_task];
369374
}
375+
splitted_task_length = tasksData.length;
370376
} else {
371377
tasksData = [data];
372378
}
@@ -380,6 +386,7 @@ async function createTasks(
380386
scraper_type,
381387
all_task_sort_id,
382388
withResult,
389+
splitted_task_length
383390
);
384391
const new_id = all_task_sort_id -1
385392
// make all task comes at the top
@@ -1028,8 +1035,15 @@ function convertUnicodeDictToAsciiDictInPlace(inputList: any[]): any[] {
10281035
taskId: number,
10291036
is_all_task: boolean,
10301037
parentId: number | null,
1031-
removeDuplicatesBy: any
1038+
removeDuplicatesBy: any,
1039+
status: string
10321040
): Promise<void> {
1041+
// Only abort tasks that are in PENDING or IN_PROGRESS status
1042+
const abortableStatuses: string[] = [TaskStatus.PENDING, TaskStatus.IN_PROGRESS];
1043+
if (!abortableStatuses.includes(status)) {
1044+
return;
1045+
}
1046+
10331047
let fn: (() => Promise<void>) | null = null;
10341048

10351049
if (is_all_task) {
@@ -1178,6 +1192,7 @@ function convertUnicodeDictToAsciiDictInPlace(inputList: any[]): any[] {
11781192
'is_all_task',
11791193
'finished_at',
11801194
'started_at',
1195+
'parent_task_id',
11811196
];
11821197

11831198
async function executeGetUiTasks(page: number): Promise<any> {
@@ -1193,7 +1208,165 @@ async function executeGetUiTasks(page: number): Promise<any> {
11931208
page = 1
11941209
}
11951210

1196-
return queryTasks(outputUiTasksEts, false, page, 100, serializeUiOutputTask);
1211+
const results = await queryTasks(outputUiTasksEts, false, page, 100, serializeUiOutputTask as any)
1212+
return await enrichWithEta(results);
1213+
}
1214+
1215+
// Helper: Get average completion time (in seconds) for completed children of a parent
1216+
async function getAverageCompletionTimeForParent(parentTaskId: number): Promise<number | null> {
1217+
const completedChildren = await db.findAsync(
1218+
{
1219+
parent_task_id: parentTaskId,
1220+
status: TaskStatus.COMPLETED,
1221+
started_at: { $exists: true, $ne: null },
1222+
finished_at: { $exists: true, $ne: null },
1223+
},
1224+
{ started_at: 1, finished_at: 1 }
1225+
) as any[];
1226+
1227+
if (!completedChildren || completedChildren.length === 0) {
1228+
return null;
1229+
}
1230+
1231+
let totalTime = 0;
1232+
for (const child of completedChildren) {
1233+
const startedAt = new Date(child.started_at).getTime();
1234+
const finishedAt = new Date(child.finished_at).getTime();
1235+
totalTime += (finishedAt - startedAt) / 1000; // Convert to seconds
1236+
}
1237+
1238+
return totalTime / completedChildren.length;
1239+
}
1240+
1241+
// Helper: Get count of remaining (pending + in_progress) children for a parent
1242+
async function getRemainingChildrenCount(parentTaskId: number): Promise<number> {
1243+
return db.countAsync({
1244+
parent_task_id: parentTaskId,
1245+
status: { $in: [TaskStatus.PENDING, TaskStatus.IN_PROGRESS] },
1246+
}) as unknown as Promise<number>;
1247+
}
1248+
1249+
interface EnrichedTask {
1250+
id: number;
1251+
status: string;
1252+
task_name: string;
1253+
result_count: number;
1254+
is_all_task: boolean;
1255+
started_at: string | null;
1256+
finished_at: string | null;
1257+
parent_task_id: number | null;
1258+
eta: number | null;
1259+
eta_text: string | null;
1260+
}
1261+
1262+
async function enrichWithEta(paginatedResult: any): Promise<any> {
1263+
const tasks: ReturnType<typeof serializeUiOutputTask>[] = paginatedResult.results;
1264+
1265+
if (!tasks || tasks.length === 0) {
1266+
return paginatedResult;
1267+
}
1268+
1269+
// Identify tasks needing ETA (pending or in_progress)
1270+
const tasksNeedingEta = tasks.filter(
1271+
(t: any) => t.status === TaskStatus.PENDING || t.status === TaskStatus.IN_PROGRESS
1272+
);
1273+
1274+
// If no tasks need ETA, return all tasks with eta: null, eta_text: null
1275+
if (tasksNeedingEta.length === 0) {
1276+
const enrichedResults = tasks.map((t: any) => ({
1277+
...t,
1278+
eta: null,
1279+
eta_text: null,
1280+
}));
1281+
return { ...paginatedResult, results: enrichedResults };
1282+
}
1283+
1284+
// Separate by type
1285+
const allTasks = tasksNeedingEta.filter((t: any) => t.is_all_task === true);
1286+
const individualTasks = tasksNeedingEta.filter((t: any) => t.is_all_task === false);
1287+
const tasksWithParent = individualTasks.filter((t: any) => t.parent_task_id != null);
1288+
1289+
// Collect ALL unique parent IDs upfront (from all tasks + individual tasks with parents)
1290+
const allParentIds = new Set<number>();
1291+
for (const t of allTasks) {
1292+
allParentIds.add(t.id);
1293+
}
1294+
for (const t of tasksWithParent) {
1295+
allParentIds.add(t.parent_task_id as number);
1296+
}
1297+
1298+
// Fetch average times + remaining counts for ALL parents in parallel (single batch)
1299+
const parentDataMap: Record<number, { avgTime: number | null; remainingCount: number }> = {};
1300+
1301+
await Promise.all(
1302+
Array.from(allParentIds).map(async (parentId) => {
1303+
const [remainingCount, avgTime] = await Promise.all([
1304+
getRemainingChildrenCount(parentId),
1305+
getAverageCompletionTimeForParent(parentId),
1306+
]);
1307+
parentDataMap[parentId] = { avgTime, remainingCount };
1308+
})
1309+
);
1310+
1311+
// Map to store computed ETA data by task id
1312+
const etaMap: Record<number, { eta: number | null; eta_text: string | null }> = {};
1313+
1314+
// ============ PROCESS ALL TASKS (sync - data already fetched) ============
1315+
for (const allTask of allTasks) {
1316+
const { avgTime, remainingCount } = parentDataMap[allTask.id];
1317+
1318+
let eta: number | null = null;
1319+
let eta_text: string | null = null;
1320+
1321+
if (remainingCount === 0) {
1322+
eta = null;
1323+
eta_text = null;
1324+
} else if (isMaster) {
1325+
// Master node doesn't execute tasks, so no ETA calculation
1326+
eta = null;
1327+
eta_text = `(${remainingCount} tasks remaining)`;
1328+
} else if (avgTime !== null) {
1329+
eta = Math.round(remainingCount * avgTime);
1330+
eta_text = `(${remainingCount} tasks remaining)`;
1331+
} else {
1332+
eta = null;
1333+
eta_text = `(${remainingCount} tasks remaining)`;
1334+
}
1335+
1336+
etaMap[allTask.id] = { eta, eta_text };
1337+
}
1338+
1339+
// ============ PROCESS INDIVIDUAL TASKS (sync - data already fetched) ============
1340+
for (const task of tasksWithParent) {
1341+
const taskId = task.id;
1342+
const parentTaskId = task.parent_task_id as number;
1343+
const { avgTime } = parentDataMap[parentTaskId];
1344+
1345+
if (avgTime === null) {
1346+
etaMap[taskId] = { eta: null, eta_text: null };
1347+
} else if (task.status === TaskStatus.PENDING) {
1348+
etaMap[taskId] = { eta: Math.round(avgTime), eta_text: null };
1349+
} else {
1350+
// IN_PROGRESS
1351+
const startedAt = task.started_at ? new Date(task.started_at).getTime() : null;
1352+
if (startedAt === null) {
1353+
etaMap[taskId] = { eta: Math.round(avgTime), eta_text: null };
1354+
} else {
1355+
const elapsed = (Date.now() - startedAt) / 1000;
1356+
etaMap[taskId] = elapsed > avgTime
1357+
? { eta: null, eta_text: null }
1358+
: { eta: Math.round(avgTime), eta_text: null };
1359+
}
1360+
}
1361+
}
1362+
1363+
// ============ BUILD FINAL RESULT ============
1364+
const enrichedResults: EnrichedTask[] = tasks.map((task: any) => ({
1365+
...task,
1366+
...(etaMap[task.id] || { eta: null, eta_text: null }),
1367+
}));
1368+
1369+
return { ...paginatedResult, results: enrichedResults };
11971370
}
11981371

11991372
async function executePatchTask(page: number, jsonData: any): Promise<any> {
@@ -1211,7 +1384,8 @@ async function executePatchTask(page: number, jsonData: any): Promise<any> {
12111384
await performPatchTask(action, taskId);
12121385
}
12131386

1214-
return queryTasks(outputUiTasksEts, false, page, 100, serializeUiOutputTask);
1387+
const results = await queryTasks(outputUiTasksEts, false, page, 100, serializeUiOutputTask as any);
1388+
return await enrichWithEta(results);
12151389
}
12161390

12171391

0 commit comments

Comments
 (0)