Skip to content

Commit 01398b7

Browse files
author
David Eigen
committed
stream video file function
1 parent f49b1f9 commit 01398b7

File tree

1 file changed

+53
-2
lines changed

1 file changed

+53
-2
lines changed

clarifai/client/model.py

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import json
2+
import logging
23
import os
4+
import subprocess
35
import time
46
from typing import Any, Dict, Generator, Iterator, List, Tuple, Union
57

@@ -924,7 +926,8 @@ def generate_by_url(self,
924926
inference_params=inference_params,
925927
output_config=output_config)
926928

927-
def _req_iterator(self, input_iterator: Iterator[List[Input]], runner_selector: RunnerSelector, model_info: resources_pb2.Model):
929+
def _req_iterator(self, input_iterator: Iterator[List[Input]], runner_selector: RunnerSelector,
930+
model_info: resources_pb2.Model):
928931
for inputs in input_iterator:
929932
yield service_pb2.PostModelOutputsRequest(
930933
user_app_id=self.user_app_id,
@@ -1170,7 +1173,55 @@ def input_generator():
11701173
inference_params=inference_params,
11711174
output_config=output_config)
11721175

1173-
def _get_model_info_for_inference(self, inference_params: Dict = {}, output_config: Dict = {}) -> None:
1176+
def stream_by_video_file(self,
1177+
filepath: str,
1178+
input_type: str = 'video',
1179+
compute_cluster_id: str = None,
1180+
nodepool_id: str = None,
1181+
deployment_id: str = None,
1182+
user_id: str = None,
1183+
inference_params: Dict = {},
1184+
output_config: Dict = {}):
1185+
"""
1186+
Stream the model output based on the given video file.
1187+
1188+
Converts the video file to a streamable format, streams as bytes to the model,
1189+
and streams back the model outputs.
1190+
1191+
Args:
1192+
filepath (str): The filepath to predict.
1193+
input_type (str, optional): The type of input. Can be 'image', 'text', 'video' or 'audio.
1194+
compute_cluster_id (str): The compute cluster ID to use for the model.
1195+
nodepool_id (str): The nodepool ID to use for the model.
1196+
deployment_id (str): The deployment ID to use for the model.
1197+
inference_params (dict): The inference params to override.
1198+
output_config (dict): The output config to override.
1199+
"""
1200+
1201+
if not os.path.isfile(filepath):
1202+
raise UserError('Invalid filepath.')
1203+
1204+
# TODO check if the file is streamable already
1205+
1206+
# Convert the video file to a streamable format
1207+
# TODO this conversion can offset the start time by a little bit; we should account for this
1208+
# by getting the original start time ffprobe and either sending that to the model so it can adjust
1209+
# with the ts of the first frame (too fragile to do all of this adjustment in the client input stream)
1210+
command = 'ffmpeg -i FILEPATH -c copy -f mpegts -muxpreload 0 -muxdelay 0 pipe:'.split()
1211+
command[command.index('FILEPATH')] = filepath # handles special characters in filepath
1212+
proc = subprocess.Popen(
1213+
command,
1214+
stdout=subprocess.PIPE,
1215+
stderr=subprocess.DEVNULL if self.logger.level >= logging.INFO else None)
1216+
1217+
chunk_size = 1024 * 1024 # 1 MB
1218+
chunk_iterator = iter(lambda: proc.stdout.read(chunk_size), b'')
1219+
1220+
return self.stream_by_bytes(chunk_iterator, input_type, compute_cluster_id, nodepool_id,
1221+
deployment_id, user_id, inference_params, output_config)
1222+
1223+
def _get_model_info_for_inference(self, inference_params: Dict = {},
1224+
output_config: Dict = {}) -> None:
11741225
"""Overrides the model version.
11751226
11761227
Args:

0 commit comments

Comments
 (0)