|
4 | 4 | from concurrent.futures import ThreadPoolExecutor |
5 | 5 | from enum import Enum |
6 | 6 | from itertools import repeat |
| 7 | +from json import JSONDecodeError, loads |
7 | 8 | from typing import List |
8 | 9 |
|
9 | 10 | from kili.core.helpers import get_mime_type, is_url |
@@ -162,20 +163,90 @@ def get_data_type(self, assets): |
162 | 163 | return VideoDataType.HOSTED_FILE |
163 | 164 | return VideoDataType.LOCAL_FILE |
164 | 165 |
|
| 166 | + @staticmethod |
| 167 | + def are_native_videos(assets) -> bool: |
| 168 | + """Determine if assets should be imported asynchronously and cut into frames.""" |
| 169 | + should_use_native_video_array = [] |
| 170 | + for asset in assets: |
| 171 | + # json_metadata stringification is done later on the call |
| 172 | + json_metadata_ = asset.get("json_metadata", {}) |
| 173 | + processing_parameters = json_metadata_.get("processingParameters", {}) |
| 174 | + should_use_native_video_array.append( |
| 175 | + processing_parameters.get("shouldUseNativeVideo", True) |
| 176 | + ) |
| 177 | + if all(should_use_native_video_array): |
| 178 | + return True |
| 179 | + if all(not b for b in should_use_native_video_array): |
| 180 | + return False |
| 181 | + raise ImportValidationError( |
| 182 | + """ |
| 183 | + Cannot upload videos to split into frames |
| 184 | + and video to keep as native in the same time. |
| 185 | + Please separate the assets into 2 calls |
| 186 | + """ |
| 187 | + ) |
| 188 | + |
| 189 | + @staticmethod |
| 190 | + def has_complete_processing_parameters(asset) -> bool: |
| 191 | + """Determine if assets should be imported asynchronously and cut into frames.""" |
| 192 | + try: |
| 193 | + json_metadata = asset.get("jsonMetadata") |
| 194 | + if not json_metadata: |
| 195 | + return False |
| 196 | + |
| 197 | + processing_parameters = loads(json_metadata).get("processingParameters") |
| 198 | + if not processing_parameters: |
| 199 | + return False |
| 200 | + |
| 201 | + required_keys = [ |
| 202 | + "codec", |
| 203 | + "delayDueToMinPts", |
| 204 | + "framesPlayedPerSecond", |
| 205 | + "numberOfFrames", |
| 206 | + "startTime", |
| 207 | + ] |
| 208 | + required_types = [str, int, float, int, float] |
| 209 | + |
| 210 | + for key, required_type in zip(required_keys, required_types): |
| 211 | + value = processing_parameters.get(key) |
| 212 | + if value is None or not isinstance(value, required_type): |
| 213 | + return False |
| 214 | + |
| 215 | + return True |
| 216 | + except JSONDecodeError: |
| 217 | + return False |
| 218 | + |
| 219 | + def videos_have_complete_processing_parameters(self, assets) -> bool: |
| 220 | + """Determine if assets should be imported asynchronously and cut into frames.""" |
| 221 | + for asset in assets: |
| 222 | + if not self.has_complete_processing_parameters(asset): |
| 223 | + return False |
| 224 | + return True |
| 225 | + |
165 | 226 | def import_assets(self, assets: List[AssetLike]): |
166 | 227 | """Import video assets into Kili.""" |
167 | 228 | self._check_upload_is_allowed(assets) |
168 | 229 | data_type = self.get_data_type(assets) |
169 | 230 | assets = self.filter_duplicate_external_ids(assets) |
170 | 231 | if data_type == VideoDataType.LOCAL_FILE: |
171 | 232 | assets = self.filter_local_assets(assets, self.raise_error) |
172 | | - batch_params = BatchParams(is_hosted=False, is_asynchronous=True) |
| 233 | + are_native_videos = self.are_native_videos(assets) |
| 234 | + videos_have_complete_processing_parameters = ( |
| 235 | + self.videos_have_complete_processing_parameters(assets) |
| 236 | + ) |
| 237 | + is_synchronous = are_native_videos and videos_have_complete_processing_parameters |
| 238 | + batch_params = BatchParams(is_hosted=False, is_asynchronous=not is_synchronous) |
173 | 239 | batch_importer = VideoContentBatchImporter( |
174 | 240 | self.kili, self.project_params, batch_params, self.pbar |
175 | 241 | ) |
176 | 242 | batch_size = IMPORT_BATCH_SIZE |
177 | 243 | elif data_type == VideoDataType.HOSTED_FILE: |
178 | | - batch_params = BatchParams(is_hosted=True, is_asynchronous=True) |
| 244 | + are_native_videos = self.are_native_videos(assets) |
| 245 | + videos_have_complete_processing_parameters = ( |
| 246 | + self.videos_have_complete_processing_parameters(assets) |
| 247 | + ) |
| 248 | + is_synchronous = are_native_videos and videos_have_complete_processing_parameters |
| 249 | + batch_params = BatchParams(is_hosted=True, is_asynchronous=not is_synchronous) |
179 | 250 | batch_importer = VideoContentBatchImporter( |
180 | 251 | self.kili, self.project_params, batch_params, self.pbar |
181 | 252 | ) |
|
0 commit comments