Skip to content

Commit dbd91e8

Browse files
committed
add: playlist get
1 parent c7df421 commit dbd91e8

File tree

8 files changed

+135
-40
lines changed

8 files changed

+135
-40
lines changed

swiftstream/src/caching/stream_tracking.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use reqwest::Client;
1111
use tokio::{sync::RwLock, time::sleep};
1212
use url::Url;
1313

14-
use crate::caching::CachePool;
14+
use crate::{caching::CachePool, transfer::parse_m3u8_async};
1515

1616
pub struct StreamTrackingPool {
1717
tracking: RwLock<HashMap<String, Arc<TrackingItem>>>,
@@ -137,18 +137,16 @@ impl TrackingItem {
137137
&self,
138138
tracking_pool: &Arc<StreamTrackingPool>,
139139
) -> Result<(), anyhow::Error> {
140-
let response = tracking_pool.http_client.get(&self.origin).send().await?;
141-
let data = response.bytes().await?;
140+
let data = tracking_pool
141+
.http_client
142+
.get(&self.origin)
143+
.send()
144+
.await?
145+
.bytes()
146+
.await?;
142147

143148
// parse
144-
let playlist = tokio::task::spawn_blocking(move || {
145-
let mut parser = mediastream_rs::Parser::new(Cursor::new(data));
146-
if let Err(e) = parser.parse() {
147-
return Err(e);
148-
}
149-
Ok(parser.get_result())
150-
})
151-
.await??;
149+
let playlist = parse_m3u8_async(Cursor::new(data)).await?;
152150

153151
self.prepare_all(tracking_pool, &self.origin, playlist)
154152
.await?;

swiftstream/src/config.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::HashMap, fs::File, path::Path};
1+
use std::{fs::File, path::Path};
22

33
use anyhow::Result;
44
use serde::Deserialize;
@@ -12,13 +12,6 @@ pub struct Config {
1212
pub cache_expire: Option<u16>,
1313
pub track_expire: Option<u16>,
1414
pub track_interval: Option<u16>,
15-
pub upstreams: HashMap<String, UpstreamConfig>,
16-
}
17-
18-
#[derive(Debug, Deserialize)]
19-
#[serde(rename_all = "camelCase")]
20-
pub struct UpstreamConfig {
21-
pub url: String,
2215
}
2316

2417
pub fn load_config(path: impl AsRef<Path>) -> Result<Config> {

swiftstream/src/routes/media.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ use mediastream_rs::format::M3uPlaylist;
99
use serde::Deserialize;
1010
use url::Url;
1111

12-
use crate::{AppStateRef, internal_error_with_log};
12+
use crate::{AppStateRef, internal_error_with_log, transfer::parse_m3u8_async};
1313

1414
#[derive(Deserialize)]
15-
pub struct MediaRequest {
15+
pub struct MediaQuery {
1616
origin: String,
1717
}
1818

@@ -46,28 +46,24 @@ async fn prepare_all(
4646

4747
pub async fn get_media(
4848
State(state): State<AppStateRef>,
49-
Query(query): Query<MediaRequest>,
49+
Query(query): Query<MediaQuery>,
5050
) -> Result<Response, StatusCode> {
51-
let response = state
51+
let data = state
5252
.http_client
5353
.get(&query.origin)
5454
.send()
5555
.await
56+
.map_err(internal_error_with_log!())?
57+
.bytes()
58+
.await
5659
.map_err(internal_error_with_log!())?;
57-
let data = response.bytes().await.map_err(internal_error_with_log!())?;
60+
5861
state.tracking_pool.track(&query.origin).await;
5962

6063
// parse
61-
let mut playlist = tokio::task::spawn_blocking(move || {
62-
let mut parser = mediastream_rs::Parser::new(Cursor::new(data));
63-
if let Err(e) = parser.parse() {
64-
return Err(e);
65-
}
66-
Ok(parser.get_result())
67-
})
68-
.await
69-
.map_err(internal_error_with_log!())?
70-
.map_err(internal_error_with_log!())?;
64+
let mut playlist = parse_m3u8_async(Cursor::new(data))
65+
.await
66+
.map_err(internal_error_with_log!())?;
7167

7268
prepare_all(&state, &mut playlist, query.origin)
7369
.await

swiftstream/src/routes/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ use axum::{Router, routing::get};
33
use crate::AppStateRef;
44

55
mod media;
6+
mod playlist;
67
mod stream;
78

89
pub fn get_routes(app_state: &AppStateRef) -> Router {
910
Router::new()
11+
.route("/playlist", get(playlist::get_playlist))
1012
.route("/stream", get(stream::get_stream))
1113
.route("/media", get(media::get_media))
1214
.with_state(app_state.clone())

swiftstream/src/routes/playlist.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use std::io::Cursor;
2+
3+
use axum::{
4+
extract::{Query, State},
5+
response::{IntoResponse, Response},
6+
};
7+
use reqwest::StatusCode;
8+
use serde::Deserialize;
9+
use url::Url;
10+
11+
use crate::{AppStateRef, internal_error_with_log, transfer::parse_m3u8_async};
12+
13+
#[derive(Deserialize)]
14+
pub struct PlaylistQuery {
15+
pub origin: String,
16+
}
17+
pub async fn get_playlist(
18+
State(state): State<AppStateRef>,
19+
Query(query): Query<PlaylistQuery>,
20+
) -> Result<Response, StatusCode> {
21+
let data = state
22+
.http_client
23+
.get(&query.origin)
24+
.send()
25+
.await
26+
.map_err(internal_error_with_log!())?
27+
.bytes()
28+
.await
29+
.map_err(internal_error_with_log!())?;
30+
31+
let mut playlist = parse_m3u8_async(Cursor::new(data))
32+
.await
33+
.map_err(internal_error_with_log!())?;
34+
35+
let base = Url::parse(&query.origin).map_err(internal_error_with_log!())?;
36+
37+
for media in playlist.medias.iter_mut() {
38+
let media_location = media.location.clone();
39+
let mut location = Url::parse(&media_location);
40+
if location == Err(url::ParseError::RelativeUrlWithoutBase) {
41+
location = base.join(&media_location);
42+
}
43+
let location = location.map_err(internal_error_with_log!())?.to_string();
44+
45+
media.location = format!(
46+
"{}/media?origin={}",
47+
state.config.base_url,
48+
urlencoding::encode(&location)
49+
)
50+
.into();
51+
}
52+
53+
Ok(playlist.to_string().into_response())
54+
}

swiftstream/src/routes/stream.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ use crate::{
1818
};
1919

2020
#[derive(Deserialize)]
21-
pub struct StreamRequest {
21+
pub struct StreamQuery {
2222
pub origin: String,
2323
}
2424

2525
pub async fn get_stream(
2626
State(state): State<AppStateRef>,
27-
Query(query): Query<StreamRequest>,
27+
Query(query): Query<StreamQuery>,
2828
headers: HeaderMap,
2929
) -> Result<Response, StatusCode> {
3030
let data = state.cache_pool.get(&query.origin).await;
@@ -56,10 +56,10 @@ pub async fn get_stream(
5656
.into_iter()
5757
.map(|x| (x, Cursor::new(data.bytes.clone())))
5858
.map(|mut x| match x.0 {
59-
HttpRange::Prefix(len) => x.1.take(len),
60-
HttpRange::Suffix(len) => {
61-
_ = x.1.seek(SeekFrom::End(-(len).try_into().unwrap_or(0)));
62-
x.1.take(len)
59+
HttpRange::Suffix(len) => x.1.take(len),
60+
HttpRange::Prefix(len) => {
61+
_ = x.1.seek(SeekFrom::Start(len));
62+
x.1.take(u64::MAX)
6363
}
6464
HttpRange::Range(from, to) => {
6565
_ = x.1.seek(SeekFrom::Start(from));
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
use std::{
2+
fmt::Display,
3+
io::{BufRead, Seek},
4+
};
5+
6+
use mediastream_rs::{ParseError, format::M3uPlaylist};
7+
use std::error::Error;
8+
use tokio::task::JoinError;
9+
10+
#[derive(Debug)]
11+
pub enum ParseM3U8Error {
12+
ParseError(ParseError),
13+
JoinError(JoinError),
14+
}
15+
16+
impl Display for ParseM3U8Error {
17+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18+
match self {
19+
Self::JoinError(e) => e.fmt(f),
20+
Self::ParseError(e) => e.fmt(f),
21+
}
22+
}
23+
}
24+
25+
impl Error for ParseM3U8Error {}
26+
27+
impl From<JoinError> for ParseM3U8Error {
28+
fn from(value: JoinError) -> Self {
29+
Self::JoinError(value)
30+
}
31+
}
32+
33+
impl From<ParseError> for ParseM3U8Error {
34+
fn from(value: ParseError) -> Self {
35+
Self::ParseError(value)
36+
}
37+
}
38+
39+
pub async fn parse_m3u8_async(
40+
stream: impl BufRead + Seek + Send + 'static,
41+
) -> Result<M3uPlaylist, ParseM3U8Error> {
42+
Ok(tokio::task::spawn_blocking(move || {
43+
let mut parser = mediastream_rs::Parser::new(stream);
44+
if let Err(e) = parser.parse() {
45+
return Err(e);
46+
}
47+
Ok(parser.get_result())
48+
})
49+
.await??)
50+
}

swiftstream/src/transfer/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
mod mediastream_parse;
12
mod range;
23

4+
pub use mediastream_parse::*;
35
pub use range::*;

0 commit comments

Comments
 (0)