Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions ktoolbox/action/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
from ktoolbox._enum import RetCodeEnum
from ktoolbox.action import ActionRet
from ktoolbox.api.model import Creator, Post
from ktoolbox.api.posts import get_creators, get_creator_post
from ktoolbox.api.posts import get_creators, get_creator_post, search_posts as search_posts_api
from ktoolbox.utils import BaseRet, generate_msg

__all__ = ["search_creator", "search_creator_post"]
__all__ = ["search_creator", "search_creator_post", "search_posts"]


# noinspection PyShadowingBuiltins
Expand Down Expand Up @@ -92,3 +92,17 @@ async def inner(**kwargs):
)

return await inner(id=id, name=name, service=service, q=q, o=o)


async def search_posts(q: str, o: int = None) -> BaseRet[List[Post]]:
"""
Search for posts by query across all creators and services.

:param q: Search query
:param o: Result offset, stepping of 50 is enforced
"""
ret = await search_posts_api(q=q, o=o)
if ret:
return ActionRet(data=ret.data)
else:
return ret
1 change: 1 addition & 0 deletions ktoolbox/api/posts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from .get_creator_post import *
from .get_creators import *
from .get_post import *
from .search_posts import *
35 changes: 35 additions & 0 deletions ktoolbox/api/posts/search_posts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from typing import List

from pydantic import RootModel

from ktoolbox.api import BaseAPI, APIRet
from ktoolbox.api.model import Post

__all__ = ["SearchPosts", "search_posts"]


class SearchPosts(BaseAPI):
path = "/posts"
method = "get"

class Response(RootModel[List[Post]]):
root: List[Post]

@classmethod
async def __call__(cls, q: str, o: int = None) -> APIRet[List[Post]]:
"""
Search for posts by query

:param q: Search query
:param o: Result offset, stepping of 50 is enforced
"""
return await cls.request(
path=cls.path,
params={
"q": q,
"o": o
}
)


search_posts = SearchPosts.__call__
123 changes: 122 additions & 1 deletion ktoolbox/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
from ktoolbox._enum import TextEnum
from ktoolbox.action import create_job_from_post, create_job_from_creator, generate_post_path_name
from ktoolbox.action import search_creator as search_creator_action, search_creator_post as search_creator_post_action
from ktoolbox.action import search_posts as search_posts_action
from ktoolbox.api.misc import get_app_version
from ktoolbox.api.posts import get_post as get_post_api
from ktoolbox.configuration import config
from ktoolbox.job import JobRunner
from ktoolbox.utils import dump_search, parse_webpage_url, generate_msg
from ktoolbox.utils import dump_search, parse_webpage_url, parse_search_url, generate_msg

__all__ = ["KToolBoxCli"]

Expand Down Expand Up @@ -114,6 +115,29 @@ async def search_creator_post(
else:
return ret.message

@staticmethod
async def search_posts(
q: str,
o: int = None,
*,
dump: Path = None
):
"""
Search for posts by query across all creators and services.

:param q: Search query
:param o: Result offset, stepping of 50 is enforced
:param dump: Dump the result to a JSON file
"""
logger.info(repr(config))
ret = await search_posts_action(q=q, o=o)
if ret:
if dump:
await dump_search(ret.data, dump)
return ret.data or TextEnum.SearchResultEmpty.value
else:
return ret.message

@staticmethod
async def get_post(service: str, creator_id: str, post_id: str, *, dump: Path = None):
"""
Expand Down Expand Up @@ -334,3 +358,100 @@ async def sync_creator(
await job_runner.start()
else:
return ret.message

@staticmethod
async def download_search(
url: str = None,
query: str = None,
path: Union[Path, str] = Path("."),
*,
dump_post_data: bool = True,
offset: int = 0,
limit: int = None
):
"""
Download all posts from a search query

:param url: Search URL like https://kemono.su/posts?q=search+term
:param query: Search query string (alternative to URL)
:param path: Download path, default is current directory
:param dump_post_data: Whether to dump post data (post.json) in post directory
:param offset: Result offset for pagination
:param limit: Maximum number of posts to download (downloads all if None)
"""
logger.info(repr(config))

# Extract query from URL or use provided query
search_query = None
if url:
search_query = parse_search_url(url)
if not search_query:
return generate_msg(
"Invalid search URL format. Expected format: https://kemono.su/posts?q=search+term",
url=url
)
elif query:
search_query = query
else:
return generate_msg(
"Missing search parameter",
use_at_lease_one=[["url"], ["query"]]
)

path = path if isinstance(path, Path) else Path(path)

# Create folder name based on search query
folder_name = sanitize_filename(search_query.replace("+", " "))
search_path = path / folder_name
search_path.mkdir(exist_ok=True)

# Search for posts
logger.info(f"Searching for posts with query: {search_query}")

all_posts = []
current_offset = offset

while True:
ret = await search_posts_action(q=search_query, o=current_offset)
if not ret:
return ret.message

posts = ret.data
if not posts:
break

all_posts.extend(posts)

# Check if we've reached the limit
if limit and len(all_posts) >= limit:
all_posts = all_posts[:limit]
break

# If we got less than 50 posts, we've reached the end
if len(posts) < 50:
break

current_offset += 50

if not all_posts:
logger.info("No posts found for the search query")
return TextEnum.SearchResultEmpty.value

logger.info(f"Found {len(all_posts)} posts, starting download...")

# Create download jobs for all posts
job_list = []
for post in all_posts:
post_path = search_path / generate_post_path_name(post)
jobs = await create_job_from_post(
post=post,
post_path=post_path,
dump_post_data=dump_post_data
)
job_list.extend(jobs)

# Start downloading
job_runner = JobRunner(job_list=job_list)
await job_runner.start()

logger.info(f"Download completed! Files saved to: {search_path}")
25 changes: 25 additions & 0 deletions ktoolbox/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
from pathlib import Path
from typing import Generic, TypeVar, Optional, List, Tuple
from urllib.parse import urlparse, parse_qs

import aiofiles
from loguru import logger
Expand All @@ -19,6 +20,7 @@
"logger_init",
"dump_search",
"parse_webpage_url",
"parse_search_url",
"uvloop_init"
]

Expand Down Expand Up @@ -107,6 +109,29 @@ def parse_webpage_url(url: str) -> Tuple[Optional[str], Optional[str], Optional[
return service, user_id, post_id


def parse_search_url(url: str) -> Optional[str]:
"""
Extract search query from a Kemono search URL

:param url: Kemono search URL like https://kemono.su/posts?q=search+term
:return: Search query string, or None if not a valid search URL
"""
parsed = urlparse(url)

# Check if it's a search URL (path should be /posts)
if parsed.path != "/posts":
return None

# Parse query parameters
query_params = parse_qs(parsed.query)

# Extract the 'q' parameter (search query)
if 'q' in query_params and query_params['q']:
return query_params['q'][0] # take the first value

return None


def uvloop_init() -> bool:
"""
Set event loop policy to uvloop if available.
Expand Down
27 changes: 27 additions & 0 deletions tests/ktoolbox/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,30 @@ async def test_sync_creator():
assert (dir_new := next(dir_path.iterdir(), None)) is not None
posts = list(filter(lambda x: x.is_dir(), dir_new.iterdir()))
assert len(posts) == 3


@pytest.mark.asyncio
async def test_search_posts():
"""Test the new search_posts functionality"""
# Test with a simple query
ret = await KToolBoxCli.search_posts(q="test")
if isinstance(ret, list):
assert all(isinstance(post, Post) for post in ret)
elif isinstance(ret, str):
# Handle both empty results and network errors
assert ret == settings.cli_conf.search_empty_text or "resolution" in ret or "failed" in ret.lower()
else:
# Could be an error message due to network issues in test environment
assert isinstance(ret, str)


@pytest.mark.asyncio
async def test_download_search():
"""Test the new download_search functionality with validation"""
# Test invalid URL
invalid_url = await KToolBoxCli.download_search(url="https://kemono.su/fanbox/user/123")
assert "Invalid search URL format" in invalid_url

# Test missing parameters
missing_params = await KToolBoxCli.download_search()
assert "Missing search parameter" in missing_params
22 changes: 22 additions & 0 deletions tests/ktoolbox/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import pytest
from ktoolbox.utils import parse_search_url


def test_parse_search_url():
"""Test parsing of search URLs"""
# Valid search URLs
assert parse_search_url("https://kemono.su/posts?q=mai+sakurajima") == "mai sakurajima"
assert parse_search_url("https://kemono.su/posts?q=test") == "test"
assert parse_search_url("https://kemono.su/posts?q=hello%20world") == "hello world"

# Invalid URLs (not search URLs)
assert parse_search_url("https://kemono.su/fanbox/user/123") is None
assert parse_search_url("https://kemono.su/fanbox/user/123/post/456") is None
assert parse_search_url("https://example.com/other") is None

# Edge cases
assert parse_search_url("https://kemono.su/posts") is None # No query parameter
assert parse_search_url("https://kemono.su/posts?other=value") is None # No 'q' parameter

# Test with multiple parameters
assert parse_search_url("https://kemono.su/posts?q=test&o=50") == "test"