Skip to content

Commit 6fb0396

Browse files
author
David Eigen
committed
write ffmpeg command in pyav
1 parent 917f075 commit 6fb0396

File tree

2 files changed

+46
-9
lines changed

2 files changed

+46
-9
lines changed

clarifai/client/model.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
import json
2-
import logging
32
import os
4-
import subprocess
53
import time
64
from typing import Any, Dict, Generator, Iterator, List, Tuple, Union
75

@@ -1207,15 +1205,13 @@ def stream_by_video_file(self,
12071205
# TODO this conversion can offset the start time by a little bit; we should account for this
12081206
# by getting the original start time ffprobe and either sending that to the model so it can adjust
12091207
# with the ts of the first frame (too fragile to do all of this adjustment in the client input stream)
1210-
command = 'ffmpeg -i FILEPATH -c copy -f mpegts -muxpreload 0 -muxdelay 0 pipe:'.split()
1211-
command[command.index('FILEPATH')] = filepath # handles special characters in filepath
1212-
proc = subprocess.Popen(
1213-
command,
1214-
stdout=subprocess.PIPE,
1215-
stderr=subprocess.DEVNULL if self.logger.level >= logging.INFO else None)
1208+
# or by adjusting the timestamps in the output stream
1209+
from clarifai.runners.utils import video_utils
1210+
stream = video_utils.recontain_as_streamable(filepath)
12161211

1212+
# TODO accumulate reads to fill the chunk size
12171213
chunk_size = 1024 * 1024 # 1 MB
1218-
chunk_iterator = iter(lambda: proc.stdout.read(chunk_size), b'')
1214+
chunk_iterator = iter(lambda: stream.read(chunk_size), b'')
12191215

12201216
return self.stream_by_bytes(chunk_iterator, input_type, compute_cluster_id, nodepool_id,
12211217
deployment_id, user_id, inference_params, output_config)

clarifai/runners/utils/video_utils.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import io
2+
import os
23
import tempfile
4+
import threading
35

46
import av
57
import requests
@@ -55,3 +57,42 @@ def stream_video_from_bytes(bytes_iterator):
5557
reader = io.BufferedReader(buffer)
5658
container = av.open(reader)
5759
yield from container.decode(video=0)
60+
61+
62+
def recontain_as_streamable(filepath):
63+
return recontain(filepath, "mpegts", {"muxpreload": "0", "muxdelay": "0"})
64+
65+
66+
def recontain(input, format, options={}):
67+
# pyav-only implementation of "ffmpeg -i filepath -f mpegts -muxpreload 0 -muxdelay 0 pipe:"
68+
read_pipe_fd, write_pipe_fd = os.pipe()
69+
read_pipe = os.fdopen(read_pipe_fd, "rb")
70+
write_pipe = os.fdopen(write_pipe_fd, "wb")
71+
72+
def _run_av():
73+
input_container = output_container = None
74+
try:
75+
# open input and output containers, using mpegts as output format
76+
input_container = av.open(input, options=options)
77+
output_container = av.open(write_pipe, mode="w", format=format)
78+
79+
# Copy streams directly without re-encoding
80+
for stream in input_container.streams:
81+
output_container.add_stream_from_template(stream)
82+
83+
# Read packets from input and write them to output
84+
for packet in input_container.demux():
85+
if not packet.size:
86+
break
87+
output_container.mux(packet)
88+
89+
finally:
90+
if output_container:
91+
output_container.close()
92+
if input_container:
93+
input_container.close()
94+
95+
t = threading.Thread(target=_run_av)
96+
t.start()
97+
98+
return read_pipe

0 commit comments

Comments
 (0)