Skip to content

Commit bb35822

Browse files
committed
channel tags
1 parent 7b09fe3 commit bb35822

File tree

12 files changed

+956
-68
lines changed

12 files changed

+956
-68
lines changed

src/domain/channel.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ pub struct Channel {
1212
#[serde(skip_serializing)]
1313
pub logo: Option<String>,
1414
#[serde(skip_serializing_if = "Option::is_none")]
15-
pub group_tag: Option<String>,
15+
pub tags: Option<Vec<String>>,
1616
#[serde(skip_serializing_if = "Option::is_none")]
1717
pub channel_number: Option<i32>,
1818
#[serde(skip_serializing_if = "Option::is_none")]
@@ -49,7 +49,7 @@ pub struct ChannelForAdd {
4949
#[serde(skip_serializing_if = "Option::is_none")]
5050
pub logo: Option<String>,
5151
#[serde(skip_serializing_if = "Option::is_none")]
52-
pub group_tag: Option<String>,
52+
pub tags: Option<Vec<String>>,
5353
#[serde(skip_serializing_if = "Option::is_none")]
5454
pub channel_number: Option<i32>,
5555
}
@@ -64,8 +64,6 @@ pub struct ChannelForUpdate {
6464
#[serde(skip_serializing_if = "Option::is_none")]
6565
pub logo: Option<String>,
6666
#[serde(skip_serializing_if = "Option::is_none")]
67-
pub group_tag: Option<String>,
68-
#[serde(skip_serializing_if = "Option::is_none")]
6967
pub channel_number: Option<i32>,
7068
}
7169

src/domain/library.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ pub struct ServerLibrarySettings {
117117
// IPTV settings
118118
#[serde(skip_serializing_if = "Option::is_none")]
119119
pub epg_url: Option<String>,
120+
#[serde(skip_serializing_if = "Option::is_none")]
121+
pub max_streams: Option<u32>,
120122
}
121123

122124
#[derive(Debug, Serialize, Deserialize, Clone)]

src/model/channels.rs

Lines changed: 234 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::HashMap, io::Cursor};
1+
use std::{collections::{HashMap, HashSet}, io::Cursor};
22

33
use nanoid::nanoid;
44
use rs_plugin_common_interfaces::ImageType;
@@ -25,7 +25,8 @@ use super::{entity_images::EntityImageConfig, tags::TagForAdd, users::ConnectedU
2525
#[derive(Debug, Deserialize, Default)]
2626
#[serde(rename_all = "camelCase")]
2727
pub struct ChannelQuery {
28-
pub group_tag: Option<String>,
28+
#[serde(alias = "groupTag")]
29+
pub tag: Option<String>,
2930
pub name: Option<String>,
3031
}
3132

@@ -50,7 +51,7 @@ impl ModelController {
5051
) -> RsResult<Vec<Channel>> {
5152
requesting_user.check_library_role(library_id, LibraryRole::Read)?;
5253
let store = self.store.get_library_store(library_id)?;
53-
let mut channels = store.get_channels(query.group_tag, query.name).await?;
54+
let mut channels = store.get_channels(query.tag, query.name).await?;
5455

5556
// Attach variants to each channel
5657
for channel in &mut channels {
@@ -276,7 +277,7 @@ impl ModelController {
276277

277278
// Track existing channels for removal detection
278279
let existing_channel_ids: Vec<String> = store.get_all_channel_ids().await?;
279-
let mut seen_channel_ids: Vec<String> = Vec::new();
280+
let mut seen_channel_ids: HashSet<String> = HashSet::new();
280281
let mut channel_actions: Vec<ChannelWithAction> = Vec::new();
281282

282283
// Upsert channels and variants
@@ -289,7 +290,6 @@ impl ModelController {
289290
.unwrap_or(&entries[0]);
290291

291292
let channel_name = rep.channel_key();
292-
let group_tag_id = rep.group_title.as_ref().and_then(|gt| tag_cache.get(gt)).cloned();
293293

294294
// Try to find existing channel
295295
let existing = if let Some(ref tvg_id) = rep.tvg_id {
@@ -298,28 +298,21 @@ impl ModelController {
298298
store.get_channel_by_name(&channel_name).await?
299299
};
300300

301-
let channel_id = if let Some(existing) = existing {
301+
let (channel_id, is_new) = if let Some(existing) = existing {
302302
// Update if needed
303303
store
304304
.update_channel(
305305
&existing.id,
306306
ChannelForUpdate {
307307
name: Some(channel_name.clone()),
308308
logo: rep.tvg_logo.clone(),
309-
group_tag: group_tag_id,
310309
tvg_id: rep.tvg_id.clone(),
311310
..Default::default()
312311
},
313312
)
314313
.await?;
315314
result.channels_updated += 1;
316-
if let Some(updated) = store.get_channel(&existing.id).await? {
317-
channel_actions.push(ChannelWithAction {
318-
action: ElementAction::Updated,
319-
channel: updated,
320-
});
321-
}
322-
existing.id
315+
(existing.id, false)
323316
} else {
324317
// Create new channel
325318
let id = nanoid!();
@@ -329,7 +322,7 @@ impl ModelController {
329322
name: channel_name.clone(),
330323
tvg_id: rep.tvg_id.clone(),
331324
logo: rep.tvg_logo.clone(),
332-
group_tag: group_tag_id,
325+
tags: None,
333326
channel_number: None,
334327
posterv: None,
335328
modified: None,
@@ -338,16 +331,35 @@ impl ModelController {
338331
})
339332
.await?;
340333
result.channels_added += 1;
341-
if let Some(added) = store.get_channel(&id).await? {
342-
channel_actions.push(ChannelWithAction {
343-
action: ElementAction::Added,
344-
channel: added,
345-
});
346-
}
347-
id
334+
(id, true)
348335
};
349336

350-
seen_channel_ids.push(channel_id.clone());
337+
seen_channel_ids.insert(channel_id.clone());
338+
339+
// Diff-based tag sync: collect desired tag IDs from all entries
340+
let desired_tag_ids: HashSet<String> = entries
341+
.iter()
342+
.filter_map(|e| e.group_title.as_ref())
343+
.filter_map(|gt| tag_cache.get(gt))
344+
.cloned()
345+
.collect();
346+
347+
// Get current auto-imported tags
348+
let current_auto_tags: HashSet<String> = store
349+
.get_channel_auto_tag_ids(&channel_id)
350+
.await?
351+
.into_iter()
352+
.collect();
353+
354+
// Add new auto tags
355+
for tag_id in desired_tag_ids.difference(&current_auto_tags) {
356+
store.add_channel_tag(&channel_id, tag_id, Some(0)).await?;
357+
}
358+
359+
// Remove stale auto tags
360+
for tag_id in current_auto_tags.difference(&desired_tag_ids) {
361+
store.remove_channel_auto_tag(&channel_id, tag_id).await?;
362+
}
351363

352364
// Upsert variants
353365
for entry in entries {
@@ -384,6 +396,14 @@ impl ModelController {
384396
.await?;
385397
}
386398
}
399+
400+
// Fetch updated channel for SSE (after tag sync)
401+
if let Some(updated) = store.get_channel(&channel_id).await? {
402+
channel_actions.push(ChannelWithAction {
403+
action: if is_new { ElementAction::Added } else { ElementAction::Updated },
404+
channel: updated,
405+
});
406+
}
387407
}
388408

389409
// Remove channels no longer in M3U
@@ -439,6 +459,53 @@ impl ModelController {
439459
Ok(result)
440460
}
441461

462+
// -- Channel tag management --
463+
464+
pub async fn add_channel_tag(
465+
&self,
466+
library_id: &str,
467+
channel_id: &str,
468+
tag_id: &str,
469+
requesting_user: &ConnectedUser,
470+
) -> RsResult<()> {
471+
requesting_user.check_library_role(library_id, LibraryRole::Write)?;
472+
let store = self.store.get_library_store(library_id)?;
473+
// confidence = NULL means user-assigned
474+
store.add_channel_tag(channel_id, tag_id, None).await?;
475+
476+
let channel = self.get_channel(library_id, channel_id, requesting_user).await?;
477+
self.broadcast_sse(SseEvent::Channels(ChannelMessage {
478+
library: library_id.to_string(),
479+
channels: vec![ChannelWithAction {
480+
action: ElementAction::Updated,
481+
channel,
482+
}],
483+
}));
484+
Ok(())
485+
}
486+
487+
pub async fn remove_channel_tag(
488+
&self,
489+
library_id: &str,
490+
channel_id: &str,
491+
tag_id: &str,
492+
requesting_user: &ConnectedUser,
493+
) -> RsResult<()> {
494+
requesting_user.check_library_role(library_id, LibraryRole::Write)?;
495+
let store = self.store.get_library_store(library_id)?;
496+
store.remove_channel_tag(channel_id, tag_id).await?;
497+
498+
let channel = self.get_channel(library_id, channel_id, requesting_user).await?;
499+
self.broadcast_sse(SseEvent::Channels(ChannelMessage {
500+
library: library_id.to_string(),
501+
channels: vec![ChannelWithAction {
502+
action: ElementAction::Updated,
503+
channel,
504+
}],
505+
}));
506+
Ok(())
507+
}
508+
442509
pub async fn channel_image(
443510
&self,
444511
library_id: &str,
@@ -509,7 +576,7 @@ impl ModelController {
509576
&self,
510577
library_id: &str,
511578
channel_id: &str,
512-
_kind: &ImageType,
579+
kind: &ImageType,
513580
requesting_user: &ConnectedUser,
514581
) -> RsResult<()> {
515582
let channel = self.get_channel(library_id, channel_id, requesting_user).await?;
@@ -529,12 +596,154 @@ impl ModelController {
529596
self.update_channel_image(
530597
library_id,
531598
channel_id,
532-
&ImageType::Poster,
599+
kind,
533600
reader,
534601
&ConnectedUser::ServerAdmin,
535602
)
536603
.await?;
537604

538605
Ok(())
539606
}
607+
608+
// -- Stream slot management (concurrent stream limiting) --
609+
610+
pub async fn acquire_stream_slot(
611+
&self,
612+
library_id: &str,
613+
channel_id: &str,
614+
) -> RsResult<()> {
615+
let library = self.cache_get_library(library_id).await
616+
.ok_or(crate::Error::NotFound(format!("Library {} not found", library_id)))?;
617+
let max_streams = library.settings.max_streams.unwrap_or(1) as usize;
618+
619+
let mut streams = self.active_streams.write().await;
620+
let channels = streams.entry(library_id.to_string()).or_default();
621+
622+
// Same channel is allowed (doesn't count as extra)
623+
if channels.contains(channel_id) {
624+
return Ok(());
625+
}
626+
627+
if channels.len() >= max_streams {
628+
// Evict the oldest HLS session for this library to make room
629+
let oldest = {
630+
let sessions = self.hls_sessions.read().await;
631+
sessions
632+
.values()
633+
.filter(|s| s.library_id == library_id)
634+
.min_by_key(|s| s.last_active.load(std::sync::atomic::Ordering::Relaxed))
635+
.map(|s| (s.key.clone(), s.channel_id.clone()))
636+
};
637+
638+
if let Some((key, old_channel_id)) = oldest {
639+
log_info(LogServiceType::Source, format!(
640+
"IPTV stream limit reached for library {}, evicting oldest session (channel {})",
641+
library_id, old_channel_id
642+
));
643+
// Drop the streams lock before stopping the session (stop_session needs hls_sessions lock only)
644+
drop(streams);
645+
crate::tools::hls_session::stop_session(&key, &self.hls_sessions).await;
646+
// Re-acquire and update active_streams
647+
let mut streams = self.active_streams.write().await;
648+
if let Some(channels) = streams.get_mut(library_id) {
649+
channels.remove(&old_channel_id);
650+
}
651+
let channels = streams.entry(library_id.to_string()).or_default();
652+
channels.insert(channel_id.to_string());
653+
} else {
654+
// No HLS session to evict (e.g., all slots taken by MPEG2-TS proxies)
655+
return Err(crate::Error::Error(format!(
656+
"Maximum concurrent streams reached for this library ({}). Stop another stream first.",
657+
max_streams
658+
)));
659+
}
660+
} else {
661+
channels.insert(channel_id.to_string());
662+
}
663+
664+
Ok(())
665+
}
666+
667+
pub async fn release_stream_slot(
668+
&self,
669+
library_id: &str,
670+
channel_id: &str,
671+
) {
672+
let mut streams = self.active_streams.write().await;
673+
if let Some(channels) = streams.get_mut(library_id) {
674+
channels.remove(channel_id);
675+
if channels.is_empty() {
676+
streams.remove(library_id);
677+
}
678+
}
679+
}
680+
681+
// -- HLS session management --
682+
683+
pub async fn get_or_create_hls_session(
684+
&self,
685+
library_id: &str,
686+
channel_id: &str,
687+
quality: Option<String>,
688+
requesting_user: &ConnectedUser,
689+
) -> RsResult<(std::path::PathBuf, std::path::PathBuf)> {
690+
let quality_key = quality.as_deref().unwrap_or("best");
691+
let key = format!("{}:{}:{}", library_id, channel_id, quality_key);
692+
693+
// Check if session already exists and is alive
694+
{
695+
let sessions = self.hls_sessions.read().await;
696+
if let Some(session) = sessions.get(&key) {
697+
session.touch();
698+
return Ok((session.output_dir.clone(), session.playlist_path.clone()));
699+
}
700+
}
701+
702+
// Acquire stream slot (enforces max_streams)
703+
self.acquire_stream_slot(library_id, channel_id).await?;
704+
705+
// Resolve the stream URL
706+
let stream_url = self.get_channel_stream_url(library_id, channel_id, quality, requesting_user).await?;
707+
708+
// Create the session
709+
match crate::tools::hls_session::create_session(
710+
key.clone(),
711+
library_id.to_string(),
712+
channel_id.to_string(),
713+
stream_url,
714+
self.hls_sessions.clone(),
715+
).await {
716+
Ok(result) => Ok(result),
717+
Err(e) => {
718+
// Release slot on failure
719+
self.release_stream_slot(library_id, channel_id).await;
720+
Err(e)
721+
}
722+
}
723+
}
724+
725+
pub async fn stop_hls_session(
726+
&self,
727+
library_id: &str,
728+
channel_id: &str,
729+
) -> RsResult<()> {
730+
let keys_to_stop: Vec<String> = {
731+
let sessions = self.hls_sessions.read().await;
732+
sessions
733+
.keys()
734+
.filter(|k| k.starts_with(&format!("{}:{}:", library_id, channel_id)))
735+
.cloned()
736+
.collect()
737+
};
738+
739+
for key in &keys_to_stop {
740+
crate::tools::hls_session::stop_session(key, &self.hls_sessions).await;
741+
}
742+
743+
if !keys_to_stop.is_empty() {
744+
self.release_stream_slot(library_id, channel_id).await;
745+
}
746+
747+
Ok(())
748+
}
540749
}

0 commit comments

Comments
 (0)