From ef244bbac6dfbf917621a8281f016173de84055d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 27 Jul 2025 10:44:26 +0000 Subject: [PATCH 1/2] Initial plan From 6c5e755d36a9745d13b78a0f3ceeb30274100a58 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 27 Jul 2025 11:08:28 +0000 Subject: [PATCH 2/2] Add search results download functionality Co-authored-by: Ljzd-PRO <63289359+Ljzd-PRO@users.noreply.github.com> --- ktoolbox/action/search.py | 18 ++++- ktoolbox/api/posts/__init__.py | 1 + ktoolbox/api/posts/search_posts.py | 35 ++++++++ ktoolbox/cli.py | 123 ++++++++++++++++++++++++++++- ktoolbox/utils.py | 25 ++++++ tests/ktoolbox/test_cli.py | 27 +++++++ tests/ktoolbox/test_utils.py | 22 ++++++ 7 files changed, 248 insertions(+), 3 deletions(-) create mode 100644 ktoolbox/api/posts/search_posts.py create mode 100644 tests/ktoolbox/test_utils.py diff --git a/ktoolbox/action/search.py b/ktoolbox/action/search.py index 7d7e724e..70f10bdc 100644 --- a/ktoolbox/action/search.py +++ b/ktoolbox/action/search.py @@ -3,10 +3,10 @@ from ktoolbox._enum import RetCodeEnum from ktoolbox.action import ActionRet from ktoolbox.api.model import Creator, Post -from ktoolbox.api.posts import get_creators, get_creator_post +from ktoolbox.api.posts import get_creators, get_creator_post, search_posts as search_posts_api from ktoolbox.utils import BaseRet, generate_msg -__all__ = ["search_creator", "search_creator_post"] +__all__ = ["search_creator", "search_creator_post", "search_posts"] # noinspection PyShadowingBuiltins @@ -92,3 +92,17 @@ async def inner(**kwargs): ) return await inner(id=id, name=name, service=service, q=q, o=o) + + +async def search_posts(q: str, o: int = None) -> BaseRet[List[Post]]: + """ + Search for posts by query across all creators and services. + + :param q: Search query + :param o: Result offset, stepping of 50 is enforced + """ + ret = await search_posts_api(q=q, o=o) + if ret: + return ActionRet(data=ret.data) + else: + return ret diff --git a/ktoolbox/api/posts/__init__.py b/ktoolbox/api/posts/__init__.py index a87b0ad8..ac80948e 100644 --- a/ktoolbox/api/posts/__init__.py +++ b/ktoolbox/api/posts/__init__.py @@ -2,3 +2,4 @@ from .get_creator_post import * from .get_creators import * from .get_post import * +from .search_posts import * diff --git a/ktoolbox/api/posts/search_posts.py b/ktoolbox/api/posts/search_posts.py new file mode 100644 index 00000000..4ef60819 --- /dev/null +++ b/ktoolbox/api/posts/search_posts.py @@ -0,0 +1,35 @@ +from typing import List + +from pydantic import RootModel + +from ktoolbox.api import BaseAPI, APIRet +from ktoolbox.api.model import Post + +__all__ = ["SearchPosts", "search_posts"] + + +class SearchPosts(BaseAPI): + path = "/posts" + method = "get" + + class Response(RootModel[List[Post]]): + root: List[Post] + + @classmethod + async def __call__(cls, q: str, o: int = None) -> APIRet[List[Post]]: + """ + Search for posts by query + + :param q: Search query + :param o: Result offset, stepping of 50 is enforced + """ + return await cls.request( + path=cls.path, + params={ + "q": q, + "o": o + } + ) + + +search_posts = SearchPosts.__call__ \ No newline at end of file diff --git a/ktoolbox/cli.py b/ktoolbox/cli.py index 9e39b4ac..c9863c1c 100644 --- a/ktoolbox/cli.py +++ b/ktoolbox/cli.py @@ -11,11 +11,12 @@ from ktoolbox._enum import TextEnum from ktoolbox.action import create_job_from_post, create_job_from_creator, generate_post_path_name from ktoolbox.action import search_creator as search_creator_action, search_creator_post as search_creator_post_action +from ktoolbox.action import search_posts as search_posts_action from ktoolbox.api.misc import get_app_version from ktoolbox.api.posts import get_post as get_post_api from ktoolbox.configuration import config from ktoolbox.job import JobRunner -from ktoolbox.utils import dump_search, parse_webpage_url, generate_msg +from ktoolbox.utils import dump_search, parse_webpage_url, parse_search_url, generate_msg __all__ = ["KToolBoxCli"] @@ -114,6 +115,29 @@ async def search_creator_post( else: return ret.message + @staticmethod + async def search_posts( + q: str, + o: int = None, + *, + dump: Path = None + ): + """ + Search for posts by query across all creators and services. + + :param q: Search query + :param o: Result offset, stepping of 50 is enforced + :param dump: Dump the result to a JSON file + """ + logger.info(repr(config)) + ret = await search_posts_action(q=q, o=o) + if ret: + if dump: + await dump_search(ret.data, dump) + return ret.data or TextEnum.SearchResultEmpty.value + else: + return ret.message + @staticmethod async def get_post(service: str, creator_id: str, post_id: str, *, dump: Path = None): """ @@ -334,3 +358,100 @@ async def sync_creator( await job_runner.start() else: return ret.message + + @staticmethod + async def download_search( + url: str = None, + query: str = None, + path: Union[Path, str] = Path("."), + *, + dump_post_data: bool = True, + offset: int = 0, + limit: int = None + ): + """ + Download all posts from a search query + + :param url: Search URL like https://kemono.su/posts?q=search+term + :param query: Search query string (alternative to URL) + :param path: Download path, default is current directory + :param dump_post_data: Whether to dump post data (post.json) in post directory + :param offset: Result offset for pagination + :param limit: Maximum number of posts to download (downloads all if None) + """ + logger.info(repr(config)) + + # Extract query from URL or use provided query + search_query = None + if url: + search_query = parse_search_url(url) + if not search_query: + return generate_msg( + "Invalid search URL format. Expected format: https://kemono.su/posts?q=search+term", + url=url + ) + elif query: + search_query = query + else: + return generate_msg( + "Missing search parameter", + use_at_lease_one=[["url"], ["query"]] + ) + + path = path if isinstance(path, Path) else Path(path) + + # Create folder name based on search query + folder_name = sanitize_filename(search_query.replace("+", " ")) + search_path = path / folder_name + search_path.mkdir(exist_ok=True) + + # Search for posts + logger.info(f"Searching for posts with query: {search_query}") + + all_posts = [] + current_offset = offset + + while True: + ret = await search_posts_action(q=search_query, o=current_offset) + if not ret: + return ret.message + + posts = ret.data + if not posts: + break + + all_posts.extend(posts) + + # Check if we've reached the limit + if limit and len(all_posts) >= limit: + all_posts = all_posts[:limit] + break + + # If we got less than 50 posts, we've reached the end + if len(posts) < 50: + break + + current_offset += 50 + + if not all_posts: + logger.info("No posts found for the search query") + return TextEnum.SearchResultEmpty.value + + logger.info(f"Found {len(all_posts)} posts, starting download...") + + # Create download jobs for all posts + job_list = [] + for post in all_posts: + post_path = search_path / generate_post_path_name(post) + jobs = await create_job_from_post( + post=post, + post_path=post_path, + dump_post_data=dump_post_data + ) + job_list.extend(jobs) + + # Start downloading + job_runner = JobRunner(job_list=job_list) + await job_runner.start() + + logger.info(f"Download completed! Files saved to: {search_path}") diff --git a/ktoolbox/utils.py b/ktoolbox/utils.py index 55250e73..9e943877 100644 --- a/ktoolbox/utils.py +++ b/ktoolbox/utils.py @@ -3,6 +3,7 @@ import sys from pathlib import Path from typing import Generic, TypeVar, Optional, List, Tuple +from urllib.parse import urlparse, parse_qs import aiofiles from loguru import logger @@ -19,6 +20,7 @@ "logger_init", "dump_search", "parse_webpage_url", + "parse_search_url", "uvloop_init" ] @@ -107,6 +109,29 @@ def parse_webpage_url(url: str) -> Tuple[Optional[str], Optional[str], Optional[ return service, user_id, post_id +def parse_search_url(url: str) -> Optional[str]: + """ + Extract search query from a Kemono search URL + + :param url: Kemono search URL like https://kemono.su/posts?q=search+term + :return: Search query string, or None if not a valid search URL + """ + parsed = urlparse(url) + + # Check if it's a search URL (path should be /posts) + if parsed.path != "/posts": + return None + + # Parse query parameters + query_params = parse_qs(parsed.query) + + # Extract the 'q' parameter (search query) + if 'q' in query_params and query_params['q']: + return query_params['q'][0] # take the first value + + return None + + def uvloop_init() -> bool: """ Set event loop policy to uvloop if available. diff --git a/tests/ktoolbox/test_cli.py b/tests/ktoolbox/test_cli.py index aff6b10d..f11cc497 100644 --- a/tests/ktoolbox/test_cli.py +++ b/tests/ktoolbox/test_cli.py @@ -211,3 +211,30 @@ async def test_sync_creator(): assert (dir_new := next(dir_path.iterdir(), None)) is not None posts = list(filter(lambda x: x.is_dir(), dir_new.iterdir())) assert len(posts) == 3 + + +@pytest.mark.asyncio +async def test_search_posts(): + """Test the new search_posts functionality""" + # Test with a simple query + ret = await KToolBoxCli.search_posts(q="test") + if isinstance(ret, list): + assert all(isinstance(post, Post) for post in ret) + elif isinstance(ret, str): + # Handle both empty results and network errors + assert ret == settings.cli_conf.search_empty_text or "resolution" in ret or "failed" in ret.lower() + else: + # Could be an error message due to network issues in test environment + assert isinstance(ret, str) + + +@pytest.mark.asyncio +async def test_download_search(): + """Test the new download_search functionality with validation""" + # Test invalid URL + invalid_url = await KToolBoxCli.download_search(url="https://kemono.su/fanbox/user/123") + assert "Invalid search URL format" in invalid_url + + # Test missing parameters + missing_params = await KToolBoxCli.download_search() + assert "Missing search parameter" in missing_params diff --git a/tests/ktoolbox/test_utils.py b/tests/ktoolbox/test_utils.py new file mode 100644 index 00000000..614ef3a6 --- /dev/null +++ b/tests/ktoolbox/test_utils.py @@ -0,0 +1,22 @@ +import pytest +from ktoolbox.utils import parse_search_url + + +def test_parse_search_url(): + """Test parsing of search URLs""" + # Valid search URLs + assert parse_search_url("https://kemono.su/posts?q=mai+sakurajima") == "mai sakurajima" + assert parse_search_url("https://kemono.su/posts?q=test") == "test" + assert parse_search_url("https://kemono.su/posts?q=hello%20world") == "hello world" + + # Invalid URLs (not search URLs) + assert parse_search_url("https://kemono.su/fanbox/user/123") is None + assert parse_search_url("https://kemono.su/fanbox/user/123/post/456") is None + assert parse_search_url("https://example.com/other") is None + + # Edge cases + assert parse_search_url("https://kemono.su/posts") is None # No query parameter + assert parse_search_url("https://kemono.su/posts?other=value") is None # No 'q' parameter + + # Test with multiple parameters + assert parse_search_url("https://kemono.su/posts?q=test&o=50") == "test" \ No newline at end of file