Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""remove_is_subscribed_field

Revision ID: f6a7b8c9d0e1
Revises: e5f6a7b8c9d0
Create Date: 2026-01-16 10:00:00.000000

Remove the legacy is_subscribed field from podcasts table.
Subscription status is now determined by UserSubscription entries.
"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = 'f6a7b8c9d0e1'
down_revision: Union[str, None] = 'e5f6a7b8c9d0'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.drop_column('podcasts', 'is_subscribed')


def downgrade() -> None:
op.add_column(
'podcasts',
sa.Column('is_subscribed', sa.Boolean(), server_default=sa.text('TRUE'), nullable=False)
)
29 changes: 17 additions & 12 deletions src/cli/podcast_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,13 @@ def list_podcasts(args, config: Config):
"""
Prints a table of podcasts to stdout.

Displays podcasts from the repository as rows containing ID, title (truncated to 40 characters), episode count, and subscription status. Honors the following fields on `args`: `all` (when true, include unsubscribed podcasts) and `limit` (maximum number of podcasts to list).
Displays podcasts from the repository as rows containing ID, title (truncated to 40 characters),
episode count, and subscriber count. When `args.all` is false, only shows podcasts with subscribers.

Parameters:
args: argparse.Namespace with at least:
- all (bool): If true, include unsubscribed podcasts; otherwise only subscribed podcasts.
- limit (int | None): Maximum number of podcasts to retrieve; when None, the repository default is used.
- all (bool): If true, include all podcasts; otherwise only podcasts with subscribers.
- limit (int | None): Maximum number of podcasts to retrieve; when None, no limit is applied.
"""
repository = create_repository(
database_url=config.DATABASE_URL,
Expand All @@ -236,27 +237,31 @@ def list_podcasts(args, config: Config):
)

try:
podcasts = repository.list_podcasts(
subscribed_only=not args.all,
limit=args.limit,
)
if args.all:
podcasts = repository.list_podcasts(limit=args.limit)
else:
podcasts = repository.list_podcasts_with_subscribers(limit=args.limit)

if not podcasts:
print("No podcasts found")
return

print(f"\n{'ID':<36} {'Title':<40} {'Episodes':<10} {'Status'}")
print("-" * 100)
# Get subscriber counts for all podcasts in one query
podcast_ids = [p.id for p in podcasts]
subscriber_counts = repository.get_podcast_subscriber_counts(podcast_ids)

print(f"\n{'ID':<36} {'Title':<40} {'Episodes':<10} {'Subscribers'}")
print("-" * 105)

for podcast in podcasts:
# Get episode count
episodes = repository.list_episodes(podcast_id=podcast.id)
status = "Subscribed" if podcast.is_subscribed else "Unsubscribed"
sub_count = subscriber_counts.get(podcast.id, 0)
print(
f"{podcast.id:<36} "
f"{podcast.title[:40]:<40} "
f"{len(episodes):<10} "
f"{status}"
f"{sub_count}"
)

finally:
Expand Down Expand Up @@ -500,7 +505,7 @@ def create_parser() -> argparse.ArgumentParser:
list_parser.add_argument(
"--all",
action="store_true",
help="Include unsubscribed podcasts",
help="Include all podcasts (default: only podcasts with subscribers)",
)
list_parser.add_argument(
"--limit",
Expand Down
3 changes: 1 addition & 2 deletions src/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ class Podcast(Base):
image_url: Mapped[str | None] = mapped_column(String(2048))
image_local_path: Mapped[str | None] = mapped_column(String(1024))

# Subscription management
is_subscribed: Mapped[bool] = mapped_column(Boolean, default=True)
# Feed management
last_checked: Mapped[datetime | None] = mapped_column(DateTime)
last_new_episode: Mapped[datetime | None] = mapped_column(DateTime)
check_frequency_hours: Mapped[int] = mapped_column(Integer, default=24)
Expand Down
69 changes: 54 additions & 15 deletions src/db/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def create_podcast(self, feed_url: str, title: str, **kwargs) -> Podcast:
Parameters:
feed_url (str): RSS or Atom feed URL of the podcast.
title (str): Display title for the podcast subscription.
**kwargs: Additional Podcast attributes to set (e.g., description, is_subscribed, image_url).
**kwargs: Additional Podcast attributes to set (e.g., description, author, image_url).

Returns:
Podcast: The persisted Podcast instance with updated identifiers and timestamps.
Expand Down Expand Up @@ -95,25 +95,22 @@ def get_podcast_by_feed_url(self, feed_url: str) -> Podcast | None:
@abstractmethod
def list_podcasts(
self,
subscribed_only: bool = True,
limit: int | None = None,
sort_by: str = "recency",
sort_order: str = "desc"
) -> list[Podcast]:
"""
Return podcasts optionally filtered to subscribed ones and limited in count.
Return all podcasts with configurable sorting.

Queries podcasts with configurable sorting. If `subscribed_only` is True, only podcasts
with an active subscription are returned. `limit` caps the number of results when provided.
Use list_podcasts_with_subscribers() to get only podcasts with active subscribers.

Parameters:
subscribed_only (bool): If True, include only subscribed podcasts. Default is True.
limit (Optional[int]): Maximum number of podcasts to return; if None, no limit is applied.
sort_by (str): Field to sort by ("recency", "subscribers", "alphabetical"). Default is "recency".
sort_order (str): Sort direction ("asc" or "desc"). Default is "desc".

Returns:
List[Podcast]: Podcasts matching the filters, sorted according to parameters.
List[Podcast]: Podcasts sorted according to parameters.
"""
pass

Expand All @@ -124,7 +121,7 @@ def update_podcast(self, podcast_id: str, **kwargs) -> Podcast | None:

Parameters:
podcast_id (str): The podcast's primary key.
**kwargs: Podcast fields to update (for example `title`, `feed_url`, `is_subscribed`).
**kwargs: Podcast fields to update (for example `title`, `feed_url`, `image_url`).

Returns:
Optional[Podcast]: The updated Podcast instance if found and updated, `None` if no podcast with `podcast_id` exists.
Expand All @@ -147,6 +144,23 @@ def delete_podcast(self, podcast_id: str, delete_files: bool = False) -> bool:
"""
pass

@abstractmethod
def list_podcasts_with_subscribers(
self, limit: int | None = None
) -> list[Podcast]:
"""List podcasts that have at least one user subscribed.

This is used by the pipeline to determine which podcasts need to be synced.
Only podcasts with active user subscriptions are returned.

Args:
limit: Maximum number of podcasts to return.

Returns:
List[Podcast]: Podcasts with at least one subscriber.
"""
pass

# --- Episode Operations ---

@abstractmethod
Expand Down Expand Up @@ -1450,16 +1464,16 @@ def get_podcast_by_feed_url(self, feed_url: str) -> Podcast | None:

def list_podcasts(
self,
subscribed_only: bool = True,
limit: int | None = None,
sort_by: str = "recency",
sort_order: str = "desc"
) -> list[Podcast]:
"""
List podcasts, optionally restricting results to subscribed podcasts.
List all podcasts with configurable sorting.

Use list_podcasts_with_subscribers() to get only podcasts with active subscribers.

Parameters:
subscribed_only (bool): If True, include only podcasts with `is_subscribed` set to True.
limit (Optional[int]): Maximum number of podcasts to return; if None, no limit is applied.
sort_by (str): Field to sort by ("recency", "subscribers", "alphabetical")
sort_order (str): Sort direction ("asc" or "desc")
Expand All @@ -1470,8 +1484,6 @@ def list_podcasts(
with self._get_session() as session:
# Build base query
stmt = select(Podcast)
if subscribed_only:
stmt = stmt.where(Podcast.is_subscribed.is_(True))

# Determine sort column
if sort_by == "recency":
Expand Down Expand Up @@ -1561,6 +1573,32 @@ def delete_podcast(self, podcast_id: str, delete_files: bool = False) -> bool:
logger.info(f"Deleted podcast: {podcast.title} ({podcast_id})")
return True

def list_podcasts_with_subscribers(
self, limit: int | None = None
) -> list[Podcast]:
"""List podcasts that have at least one user subscribed.

This is used by the pipeline to determine which podcasts need to be synced.
Only podcasts with active user subscriptions are returned.

Args:
limit: Maximum number of podcasts to return.

Returns:
List[Podcast]: Podcasts with at least one subscriber.
"""
with self._get_session() as session:
# Get distinct podcast IDs that have at least one subscription
subquery = (
select(UserSubscription.podcast_id)
.distinct()
.subquery()
)
stmt = select(Podcast).where(Podcast.id.in_(select(subquery)))
if limit:
stmt = stmt.limit(limit)
return list(session.scalars(stmt).all())

# --- Episode Operations ---

def create_episode(
Expand Down Expand Up @@ -2664,7 +2702,7 @@ def get_overall_stats(self) -> dict[str, Any]:
@returns:
stats (Dict[str, Any]): Mapping of statistic names to integer counts:
- total_podcasts: Total number of podcasts in the repository.
- subscribed_podcasts: Number of podcasts marked as subscribed.
- subscribed_podcasts: Number of podcasts with at least one user subscriber.
- total_episodes: Total number of episodes in the repository.
- pending_download: Episodes with download_status == "pending".
- downloading: Episodes with download_status == "downloading".
Expand All @@ -2683,8 +2721,9 @@ def get_overall_stats(self) -> dict[str, Any]:
with self._get_session() as session:
# Podcast counts (efficient SQL aggregations)
total_podcasts = session.scalar(select(func.count(Podcast.id))) or 0
# Count podcasts with at least one subscriber
subscribed_podcasts = session.scalar(
select(func.count(Podcast.id)).where(Podcast.is_subscribed.is_(True))
select(func.count(func.distinct(UserSubscription.podcast_id)))
) or 0

# Episode counts by status (single query with conditional aggregation)
Expand Down
38 changes: 31 additions & 7 deletions src/podcast/feed_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,27 @@ def sync_podcast(self, podcast_id: str) -> dict[str, Any]:

return result

def sync_all_podcasts(
self,
subscribed_only: bool = True,
) -> dict[str, Any]:
def sync_all_podcasts(self) -> dict[str, Any]:
"""
Synchronize all podcasts from the repository.

Parameters:
subscribed_only (bool): If True, limit synchronization to podcasts marked as subscribed.
Returns:
overall_result (dict): Aggregated sync results with keys:
- synced (int): Number of podcasts successfully synced.
- failed (int): Number of podcasts that failed to sync.
- new_episodes (int): Total number of new episodes added across all podcasts.
- results (list): Per-podcast result dictionaries returned by `sync_podcast`.
"""
podcasts = self.repository.list_podcasts()

return self._sync_podcasts(podcasts)

def sync_podcasts_with_subscribers(self) -> dict[str, Any]:
"""
Synchronize podcasts that have at least one user subscribed.

This is the primary method used by the pipeline to sync feeds.
Only podcasts with active user subscriptions are synced.

Returns:
overall_result (dict): Aggregated sync results with keys:
Expand All @@ -124,8 +136,20 @@ def sync_all_podcasts(
- new_episodes (int): Total number of new episodes added across all podcasts.
- results (list): Per-podcast result dictionaries returned by `sync_podcast`.
"""
podcasts = self.repository.list_podcasts(subscribed_only=subscribed_only)
podcasts = self.repository.list_podcasts_with_subscribers()

return self._sync_podcasts(podcasts)

def _sync_podcasts(self, podcasts: list) -> dict[str, Any]:
"""
Internal method to sync a list of podcasts.

Parameters:
podcasts: List of Podcast objects to sync.

Returns:
overall_result (dict): Aggregated sync results.
"""
overall_result = {
"synced": 0,
"failed": 0,
Expand Down
12 changes: 5 additions & 7 deletions src/workflow/workers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,16 @@ def feed_sync_service(self) -> FeedSyncService:
return self._feed_sync_service

def get_pending_count(self) -> int:
"""Get the count of subscribed podcasts to sync.
"""Get the count of podcasts with subscribers to sync.

Returns:
Number of subscribed podcasts.
Number of podcasts with at least one subscriber.
"""
podcasts = self.repository.list_podcasts(subscribed_only=True)
podcasts = self.repository.list_podcasts_with_subscribers()
return len(podcasts)

def process_batch(self, limit: int = 0) -> WorkerResult:
"""Sync all subscribed podcast feeds.
"""Sync podcast feeds for podcasts with subscribers.

Args:
limit: Ignored for sync worker (always syncs all feeds).
Expand All @@ -71,9 +71,7 @@ def process_batch(self, limit: int = 0) -> WorkerResult:
result = WorkerResult()

try:
sync_result = self.feed_sync_service.sync_all_podcasts(
subscribed_only=True
)
sync_result = self.feed_sync_service.sync_podcasts_with_subscribers()

result.processed = sync_result.get("synced", 0)
result.failed = sync_result.get("failed", 0)
Expand Down
8 changes: 4 additions & 4 deletions tests/test_cli_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ def mock_config(self):
def test_list_podcasts_empty(self, mock_create_repo, mock_config, capsys):
"""Test listing when no podcasts exist."""
mock_repo = Mock()
mock_repo.list_podcasts.return_value = []
mock_repo.list_podcasts_with_subscribers.return_value = []
mock_create_repo.return_value = mock_repo

args = Mock()
Expand All @@ -412,11 +412,11 @@ def test_list_podcasts_with_data(self, mock_create_repo, mock_config, capsys):
mock_podcast = Mock()
mock_podcast.id = "pod-123"
mock_podcast.title = "Test Podcast"
mock_podcast.is_subscribed = True

mock_repo = Mock()
mock_repo.list_podcasts.return_value = [mock_podcast]
mock_repo.list_podcasts_with_subscribers.return_value = [mock_podcast]
mock_repo.list_episodes.return_value = [Mock(), Mock()] # 2 episodes
mock_repo.get_podcast_subscriber_counts.return_value = {"pod-123": 3}
mock_create_repo.return_value = mock_repo

args = Mock()
Expand All @@ -427,7 +427,7 @@ def test_list_podcasts_with_data(self, mock_create_repo, mock_config, capsys):

captured = capsys.readouterr()
assert "Test Podcast" in captured.out
assert "Subscribed" in captured.out
assert "Subscribers" in captured.out # Column header


class TestShowStatus:
Expand Down
Loading