Skip to content

Commit 74cd874

Browse files
author
David Eigen
committed
stream and video utils files
1 parent 5e96b5c commit 74cd874

File tree

2 files changed

+257
-0
lines changed

2 files changed

+257
-0
lines changed
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
import io
2+
3+
import requests
4+
5+
MB = 1024 * 1024
6+
7+
8+
class BufferStream(io.RawIOBase):
9+
'''
10+
A buffer that reads data from a chunked stream and provides a file-like interface for reading.
11+
12+
:param chunk_iterator: An iterator that yields chunks of data (bytes)
13+
'''
14+
15+
def __init__(self, chunk_iterator):
16+
self._chunk_iterator = chunk_iterator
17+
self.response = None
18+
self.buffer = b''
19+
self.file_pos = 0
20+
self.b_pos = 0
21+
self._eof = False
22+
23+
#### read() methods
24+
25+
def readable(self):
26+
return True
27+
28+
def readinto(self, output_buf):
29+
if self._eof:
30+
return 0
31+
32+
try:
33+
# load next chunk if necessary
34+
if self.b_pos == len(self.buffer):
35+
self.buffer = next(self._chunk_iterator)
36+
self.b_pos = 0
37+
38+
# copy data to output buffer
39+
n = min(len(output_buf), len(self.buffer - self.b_pos))
40+
assert n > 0
41+
42+
output_buf[:n] = self.buffer[self.b_pos:self.b_pos + n]
43+
44+
# advance positions
45+
self.b_pos += n
46+
assert self.b_pos <= len(self.buffer)
47+
48+
return n
49+
50+
except StopIteration:
51+
self._eof = True
52+
return 0
53+
54+
55+
class SeekableBufferStream(io.RawIOBase):
56+
'''
57+
EXPERIMENTAL
58+
A buffer that reads data from a chunked stream and provides a file-like interface for reading.
59+
60+
:param chunk_iterator: An iterator that yields chunks of data (bytes)
61+
:param buffer_size: The maximum size of the buffer in bytes
62+
'''
63+
64+
def __init__(self, chunk_iterator, buffer_size=100 * MB):
65+
self._chunk_iterator = chunk_iterator
66+
self.buffer_size = buffer_size
67+
self.buffer_vec = []
68+
self.file_pos = 0
69+
self.vec_pos = 0
70+
self.b_pos = 0
71+
self._eof = False
72+
73+
#### read() methods
74+
75+
def readable(self):
76+
return True
77+
78+
def readinto(self, output_buf):
79+
if self._eof:
80+
return 0
81+
82+
assert self.vec_pos <= len(self.buffer_vec)
83+
84+
try:
85+
# load next chunk if necessary
86+
if self.vec_pos == len(self.buffer_vec):
87+
self._load_next_chunk()
88+
89+
# copy data from buffer_vec to output buffer
90+
n = min(len(output_buf), len(self.buffer_vec[self.vec_pos]) - self.b_pos)
91+
assert n > 0
92+
93+
output_buf[:n] = self.buffer_vec[self.vec_pos][self.b_pos:self.b_pos + n]
94+
95+
# advance positions
96+
self.file_pos += n
97+
self.b_pos += n
98+
assert self.b_pos <= len(self.buffer_vec[self.vec_pos])
99+
if self.b_pos == len(self.buffer_vec[self.vec_pos]):
100+
self.vec_pos += 1
101+
self.b_pos = 0
102+
return n
103+
except StopIteration:
104+
self._eof = True
105+
return 0
106+
107+
def _load_next_chunk(self, check_bounds=True):
108+
self.buffer_vec.append(next(self._chunk_iterator))
109+
total = sum(len(chunk) for chunk in self.buffer_vec)
110+
while total > self.buffer_size:
111+
chunk = self.buffer_vec.pop(0)
112+
total -= len(chunk)
113+
self.vec_pos -= 1
114+
if check_bounds:
115+
assert self.vec_pos >= 0, 'current position fell outside the buffer'
116+
117+
#### seek() methods (experimental)
118+
119+
def seekable(self):
120+
return True
121+
122+
def tell(self):
123+
return self.file_pos
124+
125+
def seek(self, offset, whence=io.SEEK_SET):
126+
#printerr(f"seek(offset={offset}, whence={('SET', 'CUR', 'END')[whence]})")
127+
# convert to offset from start of file stream
128+
if whence == io.SEEK_SET:
129+
seek_pos = offset
130+
elif whence == io.SEEK_CUR:
131+
seek_pos = self.file_pos + offset
132+
elif whence == io.SEEK_END:
133+
self._seek_to_end()
134+
seek_pos = self.file_pos + offset
135+
else:
136+
raise ValueError(f"Invalid whence: {whence}")
137+
138+
# set positions to start of buffer vec to begin seeking
139+
self.file_pos -= self.b_pos
140+
self.b_pos = 0
141+
while self.vec_pos > 0:
142+
self.vec_pos -= 1
143+
self.file_pos -= len(self.buffer_vec[self.vec_pos])
144+
145+
# check if still seeking backwards off the start of the buffer
146+
if seek_pos < self.file_pos:
147+
raise IOError('seek before start of buffer')
148+
149+
# seek forwards to desired position
150+
while self.file_pos < seek_pos:
151+
if self.vec_pos == len(self.buffer_vec):
152+
self._load_next_chunk()
153+
n = len(self.buffer_vec[self.vec_pos])
154+
if self.file_pos + n > seek_pos:
155+
self.b_pos = seek_pos - self.file_pos
156+
self.file_pos = seek_pos
157+
break
158+
self.file_pos += n
159+
self.vec_pos += 1
160+
161+
# unset EOF flag
162+
self._eof = False
163+
164+
return self.file_pos
165+
166+
def _seek_to_end(self):
167+
try:
168+
# skip positions to end of the current buffer vec
169+
if self.b_pos > 0:
170+
self.file_pos += len(self.buffer_vec[self.vec_pos]) - self.b_pos
171+
self.vec_pos += 1
172+
self.b_pos = 0
173+
# keep loading chunks until EOF
174+
while True:
175+
while self.vec_pos < len(self.buffer_vec):
176+
self.file_pos += len(self.buffer_vec[self.vec_pos])
177+
self.vec_pos += 1
178+
self._load_next_chunk(check_bounds=False)
179+
except StopIteration:
180+
pass
181+
# advance to end of buffer vec
182+
while self.vec_pos < len(self.buffer_vec):
183+
self.file_pos += len(self.buffer_vec[self.vec_pos])
184+
self.vec_pos += 1
185+
186+
187+
class URLStream(BufferStream):
188+
189+
def __init__(self, url, chunk_size=1 * MB, buffer_size=10 * MB, requests_kwargs={}):
190+
self.url = url
191+
self.chunk_size = chunk_size
192+
self.response = requests.get(self.url, stream=True, **requests_kwargs)
193+
self.response.raise_for_status()
194+
super().__init__(
195+
self.response.iter_content(chunk_size=self.chunk_size), buffer_size=buffer_size)
196+
197+
def close(self):
198+
super().close()
199+
self.response.close()
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import io
2+
import tempfile
3+
4+
import av
5+
import requests
6+
7+
from clarifai.runners.utils import stream_utils
8+
9+
10+
def stream_video_from_url(url, download_ok=True):
11+
"""
12+
Streams a video at the specified resolution using PyAV.
13+
14+
:param url: The video URL
15+
:param download_ok: Whether to download the video if the URL is not a stream
16+
"""
17+
protocol = url.split('://', 1)[0]
18+
if protocol == 'rtsp':
19+
# stream from RTSP and send to PyAV
20+
container = av.open(url)
21+
elif protocol in ('http', 'https'):
22+
if not download_ok:
23+
raise ValueError('Download not allowed for URL scheme')
24+
# download the video to the temporary file
25+
# TODO: download just enough to get the file header and stream to pyav if possible,
26+
# otherwise download the whole file
27+
# e.g. if linking to a streamable file format like mpegts (not mp4)
28+
file = tempfile.NamedTemporaryFile(delete=True)
29+
_download_video(url, file)
30+
container = av.open(file.name)
31+
else:
32+
# TODO others: s3, etc.
33+
raise ValueError('Unsupported URL scheme')
34+
35+
# Decode video frames
36+
yield from container.decode(video=0)
37+
38+
39+
def _download_video(url, file):
40+
response = requests.get(url, stream=True)
41+
response.raise_for_status()
42+
for chunk in response.iter_content(chunk_size=1024):
43+
file.write(chunk)
44+
file.flush()
45+
return file
46+
47+
48+
def stream_video_from_bytes(bytes_iterator):
49+
"""
50+
Streams a video from a sequence of chunked byte strings of a streamable video
51+
container format.
52+
53+
:param bytes_iterator: An iterator that yields byte chunks with the video data
54+
"""
55+
buffer = stream_utils.BufferStream(bytes_iterator)
56+
reader = io.BufferedReader(buffer)
57+
container = av.open(reader)
58+
yield from container.decode(video=0)

0 commit comments

Comments
 (0)