Skip to content

Commit 49a1e09

Browse files
committed
08.05.2024
* `NonLiveStream` now can be used again and again
1 parent 970f512 commit 49a1e09

File tree

4 files changed

+59
-28
lines changed

4 files changed

+59
-28
lines changed

src/blocking/stream/streams/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub use live::LiveStream;
1515
pub use non_live::NonLiveStream;
1616

1717
pub trait Stream {
18-
/// Stream a chunk of the [`u8`] bytes
18+
/// Stream a chunk of the [`Bytes`]
1919
///
2020
/// When the bytes has been exhausted, this will return `None`.
2121
fn chunk(&self) -> Result<Option<Bytes>, VideoError>;

src/stream/streams/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use async_trait::async_trait;
1313

1414
#[async_trait]
1515
pub trait Stream {
16-
/// Stream a chunk of the [`u8`] bytes
16+
/// Stream a chunk of the [`Bytes`]
1717
///
1818
/// When the bytes has been exhausted, this will return `None`.
1919
async fn chunk(&self) -> Result<Option<Bytes>, VideoError>;

src/stream/streams/non_live.rs

Lines changed: 55 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ pub struct NonLiveStream {
2727
dl_chunk_size: u64,
2828
start: RwLock<u64>,
2929
end: RwLock<u64>,
30+
start_static: u64,
31+
end_static: u64,
3032

3133
client: reqwest_middleware::ClientWithMiddleware,
3234

3335
#[cfg(feature = "ffmpeg")]
34-
ffmpeg_args: Option<FFmpegArgs>,
36+
ffmpeg_args: Vec<String>,
3537
#[cfg(feature = "ffmpeg")]
3638
ffmpeg_start_byte: RwLock<Bytes>,
3739
#[cfg(feature = "ffmpeg")]
@@ -60,20 +62,42 @@ impl NonLiveStream {
6062
.build()
6163
};
6264

63-
Ok(Self {
64-
client,
65-
link: options.link,
66-
content_length: options.content_length,
67-
dl_chunk_size: options.dl_chunk_size,
68-
start: RwLock::new(options.start),
69-
end: RwLock::new(options.end),
70-
#[cfg(feature = "ffmpeg")]
71-
ffmpeg_args: options.ffmpeg_args,
72-
#[cfg(feature = "ffmpeg")]
73-
ffmpeg_end_byte: RwLock::new(0),
74-
#[cfg(feature = "ffmpeg")]
75-
ffmpeg_start_byte: RwLock::new(Bytes::new()),
76-
})
65+
#[cfg(feature = "ffmpeg")]
66+
{
67+
let ffmpeg_args = options
68+
.ffmpeg_args
69+
.clone()
70+
.map(|x| x.build())
71+
.unwrap_or_default();
72+
73+
Ok(Self {
74+
client,
75+
link: options.link,
76+
content_length: options.content_length,
77+
dl_chunk_size: options.dl_chunk_size,
78+
start: RwLock::new(options.start),
79+
end: RwLock::new(options.end),
80+
start_static: options.start,
81+
end_static: options.end,
82+
ffmpeg_args: ffmpeg_args.clone(),
83+
ffmpeg_end_byte: RwLock::new(0),
84+
ffmpeg_start_byte: RwLock::new(Bytes::new()),
85+
})
86+
}
87+
88+
#[cfg(not(feature = "ffmpeg"))]
89+
{
90+
Ok(Self {
91+
client,
92+
link: options.link,
93+
content_length: options.content_length,
94+
dl_chunk_size: options.dl_chunk_size,
95+
start: RwLock::new(options.start),
96+
end: RwLock::new(options.end),
97+
start_static: options.start,
98+
end_static: options.end,
99+
})
100+
}
77101
}
78102

79103
pub fn content_length(&self) -> u64 {
@@ -104,8 +128,21 @@ impl Stream for NonLiveStream {
104128
async fn chunk(&self) -> Result<Option<Bytes>, VideoError> {
105129
let end = self.end_index().await;
106130

107-
// Nothing else remain send None to finish
131+
// Nothing else remain set controllers to the beginning state and send None to finish
108132
if end == 0 {
133+
let mut end = self.end.write().await;
134+
let mut start = self.start.write().await;
135+
*end = self.end_static;
136+
*start = self.start_static;
137+
138+
#[cfg(feature = "ffmpeg")]
139+
{
140+
let mut ffmpeg_end_byte = self.ffmpeg_end_byte.write().await;
141+
let mut ffmpeg_start_byte = self.ffmpeg_start_byte.write().await;
142+
*ffmpeg_end_byte = 0;
143+
*ffmpeg_start_byte = Bytes::new();
144+
}
145+
109146
// Send None to close
110147
return Ok(None);
111148
}
@@ -147,17 +184,11 @@ impl Stream for NonLiveStream {
147184

148185
#[cfg(feature = "ffmpeg")]
149186
{
150-
let ffmpeg_args = self
151-
.ffmpeg_args
152-
.clone()
153-
.map(|x| x.build())
154-
.unwrap_or_default();
155-
156-
if !ffmpeg_args.is_empty() {
187+
if !self.ffmpeg_args.is_empty() {
157188
let ffmpeg_start_byte_index = self.ffmpeg_start_byte_index().await;
158189

159190
let cmd_output = ffmpeg_cmd_run(
160-
&ffmpeg_args,
191+
&self.ffmpeg_args,
161192
Bytes::from(
162193
[
163194
BytesMut::from_iter(ffmpeg_start_byte_index.clone()),

src/utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use std::process::Stdio;
99
use std::sync::Mutex;
1010
use std::time::Instant;
1111
use tokio::sync::RwLock;
12-
use tokio::{io::AsyncWriteExt, process::Command};
1312
use urlencoding::decode;
1413

1514
use crate::constants::{
@@ -24,7 +23,7 @@ use crate::structs::{
2423

2524
#[cfg(feature = "ffmpeg")]
2625
pub async fn ffmpeg_cmd_run(args: &Vec<String>, data: Bytes) -> Result<Bytes, VideoError> {
27-
use tokio::io::AsyncReadExt;
26+
use tokio::{io::AsyncWriteExt, process::Command};
2827

2928
let mut cmd = Command::new("ffmpeg");
3029
cmd.args(args)
@@ -44,6 +43,7 @@ pub async fn ffmpeg_cmd_run(args: &Vec<String>, data: Bytes) -> Result<Bytes, Vi
4443
.wait_with_output()
4544
.await
4645
.map_err(|x| VideoError::FFmpeg(x.to_string()))?;
46+
println!("{}", output.stdout.len());
4747

4848
Ok(Bytes::from(output.stdout))
4949
}

0 commit comments

Comments
 (0)