diff --git a/backend/btrixcloud/crawlconfigs.py b/backend/btrixcloud/crawlconfigs.py index 6087337411..51ad9c33d9 100644 --- a/backend/btrixcloud/crawlconfigs.py +++ b/backend/btrixcloud/crawlconfigs.py @@ -4,7 +4,15 @@ # pylint: disable=too-many-lines -from typing import List, Optional, TYPE_CHECKING, cast, Dict, Tuple, Annotated, Union +from typing import ( + List, + Optional, + TYPE_CHECKING, + cast, + Dict, + Tuple, + Annotated, +) import asyncio import json @@ -17,10 +25,14 @@ import aiohttp from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi.responses import StreamingResponse import pymongo from .pagination import DEFAULT_PAGE_SIZE, paginated_format from .models import ( + BatchCrawlRunOut, + BatchFilter, + BatchTotalOut, CrawlConfigIn, ConfigRevision, CrawlConfig, @@ -687,8 +699,8 @@ async def update_usernames(self, userid: UUID, updated_name: str) -> None: async def get_crawl_configs( self, org: Organization, - page_size: int = DEFAULT_PAGE_SIZE, - page: int = 1, + page_size: int | None = DEFAULT_PAGE_SIZE, + page: int | None = 1, created_by: Optional[UUID] = None, modified_by: Optional[UUID] = None, profile_ids: Optional[List[UUID]] = None, @@ -705,10 +717,12 @@ async def get_crawl_configs( """Get all crawl configs for an organization is a member of""" # pylint: disable=too-many-locals,too-many-branches,too-many-statements # Zero-index page for query - page = page - 1 - skip = page * page_size + page = page - 1 if page is not None else 1 + skip = page * page_size if page_size is not None else 0 - match_query = {"oid": org.id, "inactive": {"$ne": True}} + match_query: dict[ + str, object | str | int | list[dict[str, object | str | int]] + ] = {"oid": org.id, "inactive": {"$ne": True}} if tags: query_type = "$all" if tag_match == ListFilterType.AND else "$in" @@ -739,7 +753,7 @@ async def get_crawl_configs( match_query["isCrawlRunning"] = is_crawl_running # pylint: disable=duplicate-code - aggregate: List[Dict[str, Union[object, str, int]]] = [ + aggregate: list[dict[str, object | str | int]] = [ {"$match": match_query}, {"$unset": ["config"]}, ] @@ -770,14 +784,15 @@ async def get_crawl_configs( aggregate.extend([{"$sort": sort_query}]) + items_stages = [{"$skip": skip}] + if page_size is not None: + items_stages.append({"$limit": page_size}) + aggregate.extend( [ { "$facet": { - "items": [ - {"$skip": skip}, - {"$limit": page_size}, - ], + "items": items_stages, "total": [{"$count": "count"}], } }, @@ -804,6 +819,64 @@ async def get_crawl_configs( return configs, total + async def run_crawls_by_filters( + self, + org: Organization, + user: User, + created_by: UUID | None = None, + modified_by: UUID | None = None, + profile_ids: list[UUID] | None = None, + first_seed: str | None = None, + name: str | None = None, + description: str | None = None, + tags: list[str] | None = None, + tag_match: ListFilterType | None = ListFilterType.AND, + schedule: bool | None = None, + is_crawl_running: bool | None = None, + ): + crawl_configs, total = await self.get_crawl_configs( + org, + created_by=created_by, + modified_by=modified_by, + profile_ids=profile_ids, + first_seed=first_seed, + name=name, + description=description, + tags=tags, + tag_match=tag_match, + schedule=schedule, + is_crawl_running=is_crawl_running, + page_size=None, + ) + yield BatchTotalOut(total=total).model_dump_json(indent=None) + "\n" + print( + f"Got {total} crawl configs with filter params: {created_by}, {modified_by}, {profile_ids}, {first_seed}, {name}, {description}, {tags}, {tag_match}, {schedule}, {is_crawl_running}" + ) + + async def run_crawl_with_metadata(config: CrawlConfigOut): + """Helper that runs crawl and returns metadata with result""" + try: + crawl_id = await self.run_now(config.id, org, user) + return BatchCrawlRunOut(crawl_id=crawl_id, success=True) + except Exception as e: + print(f"Error running crawl for config {config.id}: {e}") + return BatchCrawlRunOut( + crawl_id=str(config.id), success=False, error=str(e) + ) + + async with asyncio.TaskGroup() as tg: + tasks = [ + tg.create_task(run_crawl_with_metadata(config)) + for config in crawl_configs + ] + + completed = 0 + for task in asyncio.as_completed(tasks): + result = await task + completed += 1 + result.position = completed + yield result.model_dump_json(indent=None) + "\n" + async def is_profile_in_use(self, profileid: UUID, org: Organization) -> bool: """return true/false if any active workflows exist with given profile""" res = await self.crawl_configs.find_one( @@ -1674,6 +1747,41 @@ async def validate_custom_behavior( ): return await ops.validate_custom_behavior(behavior.customBehavior) + # GROUP ACTIONS + + @router.post("/batch/run") + async def run_multiple( + filter: BatchFilter, + org: Organization = Depends(org_crawl_dep), + user: User = Depends(user_dep), + ): + if filter.first_seed: + filter.first_seed = urllib.parse.unquote(filter.first_seed) + + if filter.name: + filter.name = urllib.parse.unquote(filter.name) + + if filter.description: + filter.description = urllib.parse.unquote(filter.description) + + return StreamingResponse( + ops.run_crawls_by_filters( + org=org, + user=user, + created_by=filter.created_by, + modified_by=filter.modified_by, + profile_ids=filter.profile_ids, + first_seed=filter.first_seed, + name=filter.name, + description=filter.description, + tags=filter.tags, + tag_match=filter.tag_match, + schedule=filter.schedule, + is_crawl_running=filter.is_crawl_running, + ), + media_type="application/jsonl", + ) + org_ops.router.include_router(router) return ops diff --git a/backend/btrixcloud/models.py b/backend/btrixcloud/models.py index 3c511f601b..09ed1443cd 100644 --- a/backend/btrixcloud/models.py +++ b/backend/btrixcloud/models.py @@ -3096,3 +3096,54 @@ class ListFilterType(str, Enum): OR = "or" AND = "and" + + +# BATCH ACTIONS +# ============================================================================ + + +class BatchFilter(BaseModel): + """Base model for batch filters""" + + created_by: Annotated[ + UUID | None, Field(alias="createdBy", title="Created By User ID") + ] = None + modified_by: Annotated[ + UUID | None, Field(alias="modifiedBy", title="Modified By User ID") + ] = None + profile_ids: Annotated[ + list[UUID] | None, Field(alias="profileIds", title="Profile IDs") + ] = None + first_seed: Annotated[str | None, Field(alias="firstSeed", title="First Seed")] = ( + None + ) + name: str | None = None + description: str | None = None + tags: list[str] | None = None + tag_match: Annotated[ + ListFilterType | None, + Field( + alias="tagMatch", + title="Tag Match Type", + description='Defaults to `"and"` if omitted', + ), + ] = ListFilterType.AND + schedule: bool | None = None + is_crawl_running: Annotated[ + bool | None, Field(alias="isCrawlRunning", title="Is Crawl Running") + ] = None + + +class BatchTotalOut(BaseModel): + """Response model for batch total""" + + total: int + + +class BatchCrawlRunOut(BaseModel): + """Response model for crawl runs""" + + error: str | None = None + crawl_id: str + success: bool + position: int | None = None diff --git a/frontend/src/controllers/api.ts b/frontend/src/controllers/api.ts index ab50bbc6a8..a8b10d4b4b 100644 --- a/frontend/src/controllers/api.ts +++ b/frontend/src/controllers/api.ts @@ -20,6 +20,30 @@ export enum AbortReason { RequestTimeout = "request-timeout", } +function splitStream(splitOn: string) { + let buffer = ""; + + return new TransformStream({ + transform(chunk, controller) { + buffer += chunk; + const parts = buffer.split(splitOn); + parts.slice(0, -1).forEach((part) => controller.enqueue(part)); + buffer = parts[parts.length - 1]; + }, + flush(controller) { + if (buffer) controller.enqueue(buffer); + }, + }); +} + +function parseJSON() { + return new TransformStream({ + transform(chunk, controller) { + controller.enqueue(JSON.parse(chunk) as T); + }, + }); +} + /** * Utilities for interacting with the Browsertrix backend API * @@ -261,4 +285,28 @@ export class APIController implements ReactiveController { this.onUploadProgress.cancel(); } + + async fetchStream< + T = unknown, + Body extends RequestInit["body"] | object = undefined, + >(path: string, options?: Omit & { body?: Body }) { + const mergedOptions: RequestInit | undefined = options as + | RequestInit + | undefined; + if (options?.body) { + mergedOptions!.body = JSON.stringify(options.body); + } + const response = await fetch(path, mergedOptions); + if (!response.ok) { + throw new APIError({ + message: response.statusText, + status: response.status, + }); + } + const reader = response.body; + if (!reader) { + throw new Error("Response body is not readable"); + } + return reader.pipeThrough(splitStream("\n")).pipeThrough(parseJSON()); + } } diff --git a/frontend/src/pages/org/workflows-list.ts b/frontend/src/pages/org/workflows-list.ts index 24eb56fdee..8995715df7 100644 --- a/frontend/src/pages/org/workflows-list.ts +++ b/frontend/src/pages/org/workflows-list.ts @@ -53,6 +53,13 @@ type Sort = { direction: SortDirection; }; +type FilterBy = { + [K in keyof ListWorkflow]: ListWorkflow[K] extends string ? string : boolean; +}; +type BoolFilterKeys = { + [K in keyof FilterBy]: FilterBy[K] extends boolean ? K : never; +}[keyof FilterBy]; + const FILTER_BY_CURRENT_USER_STORAGE_KEY = "btrix.filterByCurrentUser.crawlConfigs"; const INITIAL_PAGE_SIZE = 10; @@ -128,7 +135,7 @@ export class WorkflowsList extends BtrixElement { private orderBy: Sort = DEFAULT_SORT; @state() - private filterBy: Partial<{ [k in keyof ListWorkflow]: boolean }> = {}; + private filterBy: Partial = {}; @state() private filterByCurrentUser = false; @@ -236,7 +243,7 @@ export class WorkflowsList extends BtrixElement { // Convert string bools to filter values if (value === "true") { - filterBy[key as keyof typeof filterBy] = true; + filterBy[key as BoolFilterKeys] = true; } else if (value === "false") { filterBy[key as keyof typeof filterBy] = false; } else { @@ -1158,4 +1165,44 @@ export class WorkflowsList extends BtrixElement { ); return data; } + + private async runBatchWorkflows() { + const data = await this.api.fetchStream< + BatchWorkflowUpdate, + BatchWorkflowFilter + >(`/orgs/${this.orgId}/crawlconfigs/batch/run`, { + method: "POST", + body: { + ...this.filterBy, + createdBy: this.filterByCurrentUser ? this.userInfo?.id : undefined, + tags: this.filterByTags, + tagMatch: this.filterByTagsType, + profileIds: this.filterByProfiles || undefined, + }, + }); + return data; + } } + +type BatchWorkflowFilter = { + createdBy?: string; + modifiedBy?: string; + profileIds?: string[]; + firstSeed?: string; + name?: string; + description?: string; + tags?: string[]; + tagMatch?: "and" | "or"; + schedule?: boolean; + isCrawlRunning?: boolean; +}; +type BatchWorkflowStart = { + total: number; +}; +type BatchWorkflowProgress = { + error?: string; + crawl_id: string; + success: boolean; + position: number; +}; +type BatchWorkflowUpdate = BatchWorkflowStart | BatchWorkflowProgress;