Skip to content

Commit f420f80

Browse files
committed
streaming lookups
1 parent 3ed66cf commit f420f80

File tree

7 files changed

+266
-6
lines changed

7 files changed

+266
-6
lines changed

src/model/medias.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2671,6 +2671,22 @@ impl ModelController {
26712671
}
26722672
}
26732673

2674+
if let Ok(existing_thumb) = mc_progress.media_image(
2675+
&lib_progress,
2676+
&media_id_progress,
2677+
None,
2678+
&ConnectedUser::ServerAdmin,
2679+
).await {
2680+
if let Err(e) = mc_progress.update_media_image(
2681+
&lib_progress,
2682+
&media.id,
2683+
existing_thumb.stream,
2684+
&ConnectedUser::ServerAdmin,
2685+
).await {
2686+
tracing::warn!("Failed to copy thumbnail to converted media: {:?}", e);
2687+
}
2688+
}
2689+
26742690
log_info(
26752691
crate::tools::log::LogServiceType::Source,
26762692
format!(

src/model/plugins/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,19 @@ impl ModelController {
249249

250250
}
251251

252+
pub async fn exec_lookup_stream_grouped(&self, query: RsLookupQuery, library_id: Option<String>, requesting_user: &ConnectedUser, target: Option<PluginTarget>, sources: Option<&[String]>) -> RsResult<tokio::sync::mpsc::Receiver<(String, String, Vec<RsGroupDownload>)>> {
253+
if let Some(library_id) = &library_id {
254+
requesting_user.check_library_role(library_id, crate::domain::library::LibraryRole::Read)?;
255+
} else {
256+
requesting_user.check_role(&UserRole::Admin)?;
257+
}
258+
let plugins: Vec<_> = self.get_plugins_with_credential(PluginQuery { kind: Some(PluginType::Lookup), library: library_id, ..Default::default() })
259+
.await?
260+
.filter(|p| sources.map_or(true, |s| s.iter().any(|id| id == &p.plugin.path)))
261+
.collect();
252262

263+
self.plugin_manager.lookup_stream_grouped(query, plugins, target).await
264+
}
253265

254266
pub async fn exec_lookup_metadata(&self, query: RsLookupQuery, library_id: Option<String>, requesting_user: &ConnectedUser, target: Option<PluginTarget>) -> RsResult<RsLookupMetadataResults> {
255267
if let Some(library_id) = &library_id {

src/plugins/url.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,84 @@ impl PluginManager {
357357
Ok(results)
358358
}
359359

360+
pub async fn lookup_stream_grouped(self: &std::sync::Arc<Self>, query: RsLookupQuery, plugins: Vec<PluginWithCredential>, target: Option<PluginTarget>) -> RsResult<tokio::sync::mpsc::Receiver<(String, String, Vec<RsGroupDownload>)>> {
361+
let plugins = Self::filter_plugins_by_target(plugins, &target, None)?;
362+
let (tx, rx) = tokio::sync::mpsc::channel(16);
363+
364+
let tasks: Vec<_> = {
365+
let plugins_guard = self.plugins.read().await;
366+
plugins.into_iter().filter_map(|plugin_with_cred| {
367+
plugins_guard.iter()
368+
.find(|p| p.filename == plugin_with_cred.plugin.path)
369+
.filter(|p| p.infos.capabilities.contains(&PluginType::Lookup))
370+
.map(|p| {
371+
let plugin_arc = p.plugin.clone();
372+
let plugin_id = plugin_with_cred.plugin.path.clone();
373+
let plugin_name = plugin_with_cred.plugin.name.clone();
374+
let wrapped_query = RsLookupWrapper {
375+
query: query.clone(),
376+
credential: plugin_with_cred.credential.clone().map(PluginCredential::from),
377+
params: build_plugin_params(&plugin_with_cred),
378+
};
379+
(plugin_arc, plugin_id, plugin_name, wrapped_query)
380+
})
381+
}).collect()
382+
};
383+
384+
tokio::spawn(async move {
385+
let mut pending: futures::stream::FuturesUnordered<_> = tasks.into_iter().map(|(plugin_arc, plugin_id, plugin_name, wrapped_query)| {
386+
tokio::task::spawn_blocking(move || {
387+
let mut plugin_m = plugin_arc.lock().unwrap();
388+
println!("Executing lookup stream for plugin {} ...", plugin_name);
389+
let res = plugin_m.call_get_error_code::<Json<RsLookupWrapper>, Json<RsLookupSourceResult>>("lookup", Json(wrapped_query));
390+
(plugin_id, plugin_name, res)
391+
})
392+
}).collect();
393+
394+
while let Some(handle_result) = pending.next().await {
395+
match handle_result {
396+
Ok((plugin_id, plugin_name, Ok(Json(RsLookupSourceResult::GroupRequest(mut groups))))) => {
397+
log_info(crate::tools::log::LogServiceType::Plugin, format!("Lookup stream result from plugin {}", plugin_name));
398+
for group in &mut groups {
399+
for req in &mut group.requests {
400+
req.plugin_id = Some(plugin_id.clone());
401+
req.plugin_name = Some(plugin_name.clone());
402+
}
403+
}
404+
if tx.send((plugin_id, plugin_name, groups)).await.is_err() {
405+
break;
406+
}
407+
}
408+
Ok((plugin_id, plugin_name, Ok(Json(RsLookupSourceResult::Requests(request))))) => {
409+
log_info(crate::tools::log::LogServiceType::Plugin, format!("Lookup stream result from plugin {}", plugin_name));
410+
let groups: Vec<RsGroupDownload> = request.into_iter().map(|mut req| {
411+
req.plugin_id = Some(plugin_id.clone());
412+
req.plugin_name = Some(plugin_name.clone());
413+
RsGroupDownload {
414+
requests: vec![req],
415+
group: false,
416+
..Default::default()
417+
}
418+
}).collect();
419+
if tx.send((plugin_id, plugin_name, groups)).await.is_err() {
420+
break;
421+
}
422+
}
423+
Ok((_, _, Ok(_))) => {}
424+
Ok((_, _, Err((error, code)))) => {
425+
if code != 404 {
426+
log_error(crate::tools::log::LogServiceType::Plugin, format!("Error request lookup stream {} {:?}", code, error));
427+
}
428+
}
429+
Err(e) => {
430+
log_error(crate::tools::log::LogServiceType::Plugin, format!("Plugin task panicked: {:?}", e));
431+
}
432+
}
433+
}
434+
});
435+
Ok(rx)
436+
}
437+
360438
pub async fn lookup_metadata(&self, query: RsLookupQuery, plugins: Vec<PluginWithCredential>, target: Option<PluginTarget>) -> RsResult<RsLookupMetadataResults> {
361439
let plugins = Self::filter_plugins_by_target(plugins, &target, None)?;
362440

src/routes/books.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use serde::Deserialize;
2222
use crate::{
2323
domain::book::{Book, BookForUpdate},
2424
model::{books::BookQuery, medias::MediaQuery, users::ConnectedUser, ModelController},
25-
routes::{ImageRequestOptions, ImageUploadOptions, RatingUpdateBody, SearchQuery, SearchResultGroup, SseSearchEvent},
25+
routes::{ImageRequestOptions, ImageUploadOptions, RatingUpdateBody, SearchQuery, SearchResultGroup, SseLookupSearchEvent, SseLookupSearchResult, SseSearchEvent},
2626
Error, Result,
2727
};
2828

@@ -44,6 +44,7 @@ pub fn routes(mc: ModelController) -> Router {
4444
.route("/:id/rating", get(handler_rating_get))
4545
.route("/:id/rating", patch(handler_rating_set))
4646
.route("/:id/search", get(handler_lookup))
47+
.route("/:id/searchstream", get(handler_lookup_stream))
4748
.with_state(mc)
4849
}
4950

@@ -177,6 +178,37 @@ async fn handler_lookup(
177178
Ok(Json(json!(results)))
178179
}
179180

181+
async fn handler_lookup_stream(
182+
Path((library_id, book_id)): Path<(String, String)>,
183+
State(mc): State<ModelController>,
184+
user: ConnectedUser,
185+
) -> Result<Sse<impl Stream<Item = std::result::Result<Event, Infallible>>>> {
186+
let book = mc.get_book(&library_id, book_id, &user).await?;
187+
let name = book.item.name.clone();
188+
let ids: RsIds = book.item.into();
189+
let query = RsLookupQuery::Book(RsLookupBook {
190+
name: Some(name),
191+
ids: Some(ids),
192+
page_key: None,
193+
});
194+
let mut rx = mc.exec_lookup_stream_grouped(query, Some(library_id), &user, None, None).await?;
195+
196+
let stream = async_stream::stream! {
197+
while let Some((source_id, source_name, groups)) = rx.recv().await {
198+
let results = SseLookupSearchResult::from_groups(groups);
199+
if let Ok(data) = serde_json::to_string(&SseLookupSearchEvent { source_id: &source_id, source_name: &source_name, results: &results }) {
200+
yield Ok(Event::default().event("results").data(data));
201+
}
202+
}
203+
};
204+
205+
Ok(Sse::new(stream).keep_alive(
206+
KeepAlive::new()
207+
.interval(Duration::from_secs(30))
208+
.text("ping"),
209+
))
210+
}
211+
180212
async fn handler_medias(
181213
Path((library_id, book_id)): Path<(String, String)>,
182214
State(mc): State<ModelController>,

src/routes/episodes.rs

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11

22
use std::io::Cursor;
33

4+
use std::{convert::Infallible, time::Duration};
5+
46
use crate::{domain::{episode::{self, Episode}, media::{FileEpisode, Media, MediaForUpdate}, progress, view_progress::{ViewProgressForAdd, ViewProgressLigh}, watched::{WatchedForAdd, WatchedForDelete, WatchedLight}, RsIdsExt}, error::RsError, model::{episodes::{EpisodeForUpdate, EpisodeQuery}, medias::MediaQuery, users::{ConnectedUser, HistoryQuery}, ModelController}, plugins::sources::error::SourcesError, Error, Result};
5-
use axum::{body::Body, debug_handler, extract::{Multipart, Path, Query, State}, response::{IntoResponse, Response}, routing::{delete, get, patch, post}, Json, Router};
6-
use futures::TryStreamExt;
7+
use axum::{body::Body, debug_handler, extract::{Multipart, Path, Query, State}, response::{sse::{Event, KeepAlive, Sse}, IntoResponse, Response}, routing::{delete, get, patch, post}, Json, Router};
8+
use futures::{Stream, TryStreamExt};
79
use rs_plugin_common_interfaces::{domain::rs_ids::RsIds, lookup::{RsLookupEpisode, RsLookupQuery}, request::{RsGroupDownload, RsRequest}, ElementType, ImageType, MediaType};
810
use serde_json::{json, ser, Value};
911
use tokio::io::AsyncRead;
1012
use tokio_util::io::{ReaderStream, StreamReader};
1113

12-
use super::{ImageRequestOptions, ImageUploadOptions, RatingUpdateBody};
14+
use super::{ImageRequestOptions, ImageUploadOptions, RatingUpdateBody, SseLookupSearchEvent, SseLookupSearchResult};
1315

1416

1517
pub fn routes(mc: ModelController) -> Router {
@@ -19,13 +21,15 @@ pub fn routes(mc: ModelController) -> Router {
1921
.route("/episodes/refresh", get(handler_refresh))
2022

2123
.route("/seasons/:season/search", get(handler_lookup_season))
24+
.route("/seasons/:season/searchstream", get(handler_lookup_season_stream))
2225
.route("/seasons/:season/episodes", get(handler_list_season_episodes))
2326
.route("/seasons/:season/episodes/:number", get(handler_get))
2427
.route("/seasons/:season/episodes/:number", post(handler_post_episode))
2528
.route("/seasons/:season/episodes/:number", patch(handler_patch))
2629
.route("/seasons/:season/episodes/:number", delete(handler_delete))
2730
.route("/seasons/:season/episodes/:number/image", get(handler_image))
2831
.route("/seasons/:season/episodes/:number/search", get(handler_lookup))
32+
.route("/seasons/:season/episodes/:number/searchstream", get(handler_lookup_stream))
2933
.route("/seasons/:season/episodes/:number/search", post(handler_lookup_add))
3034
.route("/seasons/:season/episodes/:number/medias", get(handler_medias))
3135
.route("/seasons/:season/episodes/:number/progress", get(handler_progress_get))
@@ -120,6 +124,67 @@ async fn handler_lookup(Path((library_id, serie_id, season, number)): Path<(Stri
120124
Ok(body)
121125
}
122126

127+
async fn handler_lookup_stream(Path((library_id, serie_id, season, number)): Path<(String, String, u32, u32)>, State(mc): State<ModelController>, user: ConnectedUser) -> Result<Sse<impl Stream<Item = std::result::Result<Event, Infallible>>>> {
128+
let episode = mc.get_episode(&library_id, serie_id.clone(), season, number, &user).await?;
129+
let serie = mc.get_serie(&library_id, serie_id.clone(), &user).await?.ok_or(SourcesError::UnableToFindSerie(library_id.to_string(), serie_id.to_string(), "handler_lookup_stream".to_string()))?;
130+
let name = serie.item.name.clone();
131+
let ids: RsIds = serie.item.into();
132+
let query_episode = RsLookupEpisode {
133+
name: Some(name),
134+
ids: Some(ids),
135+
season: episode.season,
136+
number: Some(episode.number),
137+
page_key: None,
138+
};
139+
let query = RsLookupQuery::Episode(query_episode);
140+
let mut rx = mc.exec_lookup_stream_grouped(query, Some(library_id), &user, None, None).await?;
141+
142+
let stream = async_stream::stream! {
143+
while let Some((source_id, source_name, groups)) = rx.recv().await {
144+
let results = SseLookupSearchResult::from_groups(groups);
145+
if let Ok(data) = serde_json::to_string(&SseLookupSearchEvent { source_id: &source_id, source_name: &source_name, results: &results }) {
146+
yield Ok(Event::default().event("results").data(data));
147+
}
148+
}
149+
};
150+
151+
Ok(Sse::new(stream).keep_alive(
152+
KeepAlive::new()
153+
.interval(Duration::from_secs(30))
154+
.text("ping"),
155+
))
156+
}
157+
158+
async fn handler_lookup_season_stream(Path((library_id, serie_id, season)): Path<(String, String, u32)>, State(mc): State<ModelController>, user: ConnectedUser) -> Result<Sse<impl Stream<Item = std::result::Result<Event, Infallible>>>> {
159+
let serie = mc.get_serie(&library_id.clone(), serie_id.clone(), &user).await?.ok_or(SourcesError::UnableToFindSerie(library_id.to_string(), serie_id.to_string(), "handler_lookup_season_stream".to_string()))?;
160+
let name = serie.item.name.clone();
161+
let ids: RsIds = serie.item.into();
162+
let query_episode = RsLookupEpisode {
163+
name: Some(name),
164+
ids: Some(ids),
165+
season,
166+
number: None,
167+
page_key: None,
168+
};
169+
let query = RsLookupQuery::Episode(query_episode);
170+
let mut rx = mc.exec_lookup_stream_grouped(query, Some(library_id), &user, None, None).await?;
171+
172+
let stream = async_stream::stream! {
173+
while let Some((source_id, source_name, groups)) = rx.recv().await {
174+
let results = SseLookupSearchResult::from_groups(groups);
175+
if let Ok(data) = serde_json::to_string(&SseLookupSearchEvent { source_id: &source_id, source_name: &source_name, results: &results }) {
176+
yield Ok(Event::default().event("results").data(data));
177+
}
178+
}
179+
};
180+
181+
Ok(Sse::new(stream).keep_alive(
182+
KeepAlive::new()
183+
.interval(Duration::from_secs(30))
184+
.text("ping"),
185+
))
186+
}
187+
123188
async fn handler_lookup_add(Path((library_id, serie_id, season, number)): Path<(String, String, u32, u32)>, State(mc): State<ModelController>, user: ConnectedUser, Json(mut request): Json<RsRequest>) -> Result<Json<Value>> {
124189
// Set series info directly on the request
125190
request.albums = Some(vec![serie_id]);

src/routes/mod.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use rs_plugin_common_interfaces::{lookup::RsLookupMetadataResults, ImageType};
1+
use rs_plugin_common_interfaces::{lookup::{RsLookupMatchType, RsLookupMetadataResults}, request::{RsGroupDownload, RsRequest}, ImageType};
22
use serde::{Deserialize, Serialize};
33

44
use crate::tools::image_tools::ImageSize;
@@ -61,6 +61,35 @@ impl<T> SearchQuery<T> {
6161
}
6262
}
6363

64+
#[derive(Serialize)]
65+
#[serde(rename_all = "camelCase")]
66+
pub struct SseLookupSearchEvent<'a> {
67+
pub source_id: &'a str,
68+
pub source_name: &'a str,
69+
pub results: &'a [SseLookupSearchResult],
70+
}
71+
72+
#[derive(Serialize)]
73+
#[serde(rename_all = "camelCase")]
74+
pub struct SseLookupSearchResult {
75+
pub request: RsRequest,
76+
pub match_type: Option<RsLookupMatchType>,
77+
}
78+
79+
impl SseLookupSearchResult {
80+
pub fn from_groups(groups: Vec<RsGroupDownload>) -> Vec<Self> {
81+
groups.into_iter().flat_map(|group| {
82+
let match_type = group.match_type;
83+
group.requests.into_iter().map(move |request| {
84+
SseLookupSearchResult {
85+
request,
86+
match_type: match_type.clone(),
87+
}
88+
})
89+
}).collect()
90+
}
91+
}
92+
6493
#[derive(Debug, Serialize, Deserialize, Clone)]
6594
pub struct ImageRequestOptions {
6695
size: Option<ImageSize>,

src/routes/movies.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use serde_json::{json, Value};
99
use tokio::io::AsyncRead;
1010
use tokio_util::io::{ReaderStream, StreamReader};
1111

12-
use super::{ImageRequestOptions, ImageUploadOptions, RatingUpdateBody, SearchQuery, SearchResultGroup, SseSearchEvent};
12+
use super::{ImageRequestOptions, ImageUploadOptions, RatingUpdateBody, SearchQuery, SearchResultGroup, SseLookupSearchEvent, SseLookupSearchResult, SseSearchEvent};
1313

1414

1515

@@ -26,6 +26,7 @@ pub fn routes(mc: ModelController) -> Router {
2626

2727
.route("/:id/medias", get(handler_medias))
2828
.route("/:id/search", get(handler_lookup))
29+
.route("/:id/searchstream", get(handler_lookup_stream))
2930
.route("/:id/search", post(handler_lookup_add))
3031
.route("/:id", patch(handler_patch))
3132
.route("/:id/import", put(handler_import))
@@ -124,6 +125,33 @@ async fn handler_lookup(Path((library_id, movie_id)): Path<(String, String)>, St
124125
Ok(body)
125126
}
126127

128+
async fn handler_lookup_stream(Path((library_id, movie_id)): Path<(String, String)>, State(mc): State<ModelController>, user: ConnectedUser) -> Result<Sse<impl Stream<Item = std::result::Result<Event, Infallible>>>> {
129+
let movie = mc.get_movie(&library_id, movie_id, &user).await?;
130+
let name = movie.name.clone();
131+
let ids: RsIds = movie.into();
132+
let query = RsLookupQuery::Movie(RsLookupMovie {
133+
name: Some(name),
134+
ids: Some(ids),
135+
page_key: None,
136+
});
137+
let mut rx = mc.exec_lookup_stream_grouped(query, Some(library_id), &user, None, None).await?;
138+
139+
let stream = async_stream::stream! {
140+
while let Some((source_id, source_name, groups)) = rx.recv().await {
141+
let results = SseLookupSearchResult::from_groups(groups);
142+
if let Ok(data) = serde_json::to_string(&SseLookupSearchEvent { source_id: &source_id, source_name: &source_name, results: &results }) {
143+
yield Ok(Event::default().event("results").data(data));
144+
}
145+
}
146+
};
147+
148+
Ok(Sse::new(stream).keep_alive(
149+
KeepAlive::new()
150+
.interval(Duration::from_secs(30))
151+
.text("ping"),
152+
))
153+
}
154+
127155
async fn handler_lookup_add(Path((library_id, movie_id)): Path<(String, String)>, State(mc): State<ModelController>, user: ConnectedUser, Json(mut request): Json<RsRequest>) -> Result<Json<Value>> {
128156
// Set movie info directly on the request
129157
request.movie = Some(movie_id);

0 commit comments

Comments
 (0)