Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"mute_ads": true,
"skip_ads": true,
"auto_play": true,
"force_end": false,
"apikey": "",
"channel_whitelist": [
{"id": "",
Expand Down
1 change: 1 addition & 0 deletions src/iSponsorBlockTV/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self, data_dir):
self.mute_ads = False
self.skip_ads = False
self.auto_play = True
self.force_end = False
self.__load()

def validate(self):
Expand Down
66 changes: 45 additions & 21 deletions src/iSponsorBlockTV/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class DeviceListener:
def __init__(self, api_helper, config, device, debug: bool, web_session):
self.task: Optional[asyncio.Task] = None
self.api_helper = api_helper
self.config = config
self.offset = device.offset
self.name = device.name
self.cancelled = False
Expand Down Expand Up @@ -93,36 +94,52 @@ async def __call__(self, state):

# Processes the playback state change
async def process_playstatus(self, state, time_start):
position = state.currentTime
segments = []

if state.videoId:
segments = await self.api_helper.get_segments(state.videoId)

if state.state.value == 1: # Playing
self.logger.info(
f"Playing video {state.videoId} with {len(segments)} segments"
)

next_segment = None
start_next_segment = None

if segments: # If there are segments
await self.time_to_segment(segments, state.currentTime, time_start)

# Finds the next segment to skip to and skips to it
async def time_to_segment(self, segments, position, time_start):
start_next_segment = None
next_segment = None
for segment in segments:
if position < 2 and (segment["start"] <= position < segment["end"]):
next_segment = segment
start_next_segment = (
position # different variable so segment doesn't change
for segment in segments:
if position < 2 and (segment["start"] <= position < segment["end"]):
next_segment = segment
start_next_segment = (
position # different variable so segment doesn't change
)
break
if segment["start"] > position:
next_segment = segment
start_next_segment = next_segment["start"]
break

if start_next_segment:
time_to_next = (
start_next_segment
- position
- (time.time() - time_start)
- self.offset
)
await self.skip(
time_to_next, next_segment["end"], next_segment["UUID"]
)

# If there's no segment before the end of the video and the force_end is on
if not start_next_segment and self.config.force_end:
# Schedule a stop task
time_to_end = (
state.duration - position - (time.time() - time_start) - self.offset
)
break
if segment["start"] > position:
next_segment = segment
start_next_segment = next_segment["start"]
break
if start_next_segment:
time_to_next = (
start_next_segment - position - (time.time() - time_start) - self.offset
)
await self.skip(time_to_next, next_segment["end"], next_segment["UUID"])
if time_to_end > 2:
await self.stop(time_to_end)

# Skips to the next segment (waits for the time to pass)
async def skip(self, time_to, position, uuids):
Expand All @@ -131,6 +148,13 @@ async def skip(self, time_to, position, uuids):
await asyncio.create_task(self.api_helper.mark_viewed_segments(uuids))
await asyncio.create_task(self.lounge_controller.seek_to(position))

# Force ends video after expected play time, skiping a video end ad
async def stop(self, time_to):
# Trigger a few frames earlier to avoid the video end event canceling the task
await asyncio.sleep(time_to - 0.1)
self.logger.info("Force stopping video, end of expected play time reached")
await asyncio.create_task(self.lounge_controller._command("stopVideo"))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be better if a new method was created on the YTLounge class


async def cancel(self):
self.cancelled = True
await self.lounge_controller.disconnect()
Expand Down
6 changes: 6 additions & 0 deletions src/iSponsorBlockTV/setup-wizard-style.tcss
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,9 @@ MigrationScreen {
padding: 1;
height: auto;
}

/* Force end */
#force-end-container{
padding: 1;
height: auto;
}
37 changes: 37 additions & 0 deletions src/iSponsorBlockTV/setup_wizard.py
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,40 @@ def changed_skip(self, event: Checkbox.Changed):
self.config.auto_play = event.checkbox.value


class ForceEndManager(Vertical):
"""Manager for force_end, forcefully ends every video skiping post-video ads."""

def __init__(self, config, **kwargs) -> None:
super().__init__(**kwargs)
self.config = config

def compose(self) -> ComposeResult:
yield Label("Force video end", classes="title")
yield Label(
(
"If enabled, every video will be forcefully stopped once it has fully"
" played through. This skips any post-video ads that might play and"
" returns you to the YoutTube home screen. Autoplay cannot be enabled"
" at the same time."
),
classes="subtitle",
id="force-end-subtitle",
)
with Horizontal(id="force-end-container"):
yield Checkbox(
value=self.config.force_end,
id="force-end-switch",
label="Enable force end",
)

@on(Checkbox.Changed, "#force-end-switch")
def changed_skip(self, event: Checkbox.Changed):
self.config.force_end = event.checkbox.value

if event.checkbox.value:
self.app.query_one("#autoplay-switch").value = False


class ISponsorBlockTVSetupMainScreen(Screen):
"""Making this a separate screen to avoid a bug: https://github.com/Textualize/textual/issues/3221"""

Expand Down Expand Up @@ -921,6 +955,9 @@ def compose(self) -> ComposeResult:
yield AutoPlayManager(
config=self.config, id="autoplay-manager", classes="container"
)
yield ForceEndManager(
config=self.config, id="force-end-manager", classes="container"
)

def on_mount(self) -> None:
if self.check_for_old_config_entries():
Expand Down
Loading