1+ from __future__ import annotations
2+
13import io
24import typing as T
35
4- from .. import geo , types
6+ from .. import geo
57from ..mp4 import (
68 construct_mp4_parser as cparser ,
79 mp4_sample_parser as sample_parser ,
@@ -23,25 +25,23 @@ def _build_camm_sample(measurement: camm_parser.TelemetryMeasurement) -> bytes:
2325
2426
2527def _create_edit_list_from_points (
26- point_segments : T .Sequence [T .Sequence [geo .Point ]],
28+ tracks : T .Sequence [T .Sequence [geo .Point ]],
2729 movie_timescale : int ,
2830 media_timescale : int ,
2931) -> builder .BoxDict :
30- entries : T . List [ T . Dict ] = []
32+ entries : list [ dict ] = []
3133
32- non_empty_point_segments = [points for points in point_segments if points ]
34+ non_empty_tracks = [track for track in tracks if track ]
3335
34- for idx , points in enumerate (non_empty_point_segments ):
35- assert 0 <= points [0 ].time , (
36- f"expect non-negative point time but got { points [0 ]} "
37- )
38- assert points [0 ].time <= points [- 1 ].time , (
39- f"expect points to be sorted but got first point { points [0 ]} and last point { points [- 1 ]} "
36+ for idx , track in enumerate (non_empty_tracks ):
37+ assert 0 <= track [0 ].time , f"expect non-negative point time but got { track [0 ]} "
38+ assert track [0 ].time <= track [- 1 ].time , (
39+ f"expect points to be sorted but got first point { track [0 ]} and last point { track [- 1 ]} "
4040 )
4141
4242 if idx == 0 :
43- if 0 < points [0 ].time :
44- segment_duration = int (points [0 ].time * movie_timescale )
43+ if 0 < track [0 ].time :
44+ segment_duration = int (track [0 ].time * movie_timescale )
4545 # put an empty edit list entry to skip the initial gap
4646 entries .append (
4747 {
@@ -53,8 +53,8 @@ def _create_edit_list_from_points(
5353 }
5454 )
5555 else :
56- media_time = int (points [0 ].time * media_timescale )
57- segment_duration = int ((points [- 1 ].time - points [0 ].time ) * movie_timescale )
56+ media_time = int (track [0 ].time * media_timescale )
57+ segment_duration = int ((track [- 1 ].time - track [0 ].time ) * movie_timescale )
5858 entries .append (
5959 {
6060 "media_time" : media_time ,
@@ -72,19 +72,6 @@ def _create_edit_list_from_points(
7272 }
7373
7474
75- def _multiplex (
76- points : T .Sequence [geo .Point ],
77- measurements : T .Optional [T .List [camm_parser .TelemetryMeasurement ]] = None ,
78- ) -> T .List [camm_parser .TelemetryMeasurement ]:
79- mutiplexed : T .List [camm_parser .TelemetryMeasurement ] = [
80- * points ,
81- * (measurements or []),
82- ]
83- mutiplexed .sort (key = lambda m : m .time )
84-
85- return mutiplexed
86-
87-
8875def convert_telemetry_to_raw_samples (
8976 measurements : T .Sequence [camm_parser .TelemetryMeasurement ],
9077 timescale : int ,
@@ -237,29 +224,44 @@ def create_camm_trak(
237224 }
238225
239226
240- def camm_sample_generator2 (
241- video_metadata : types .VideoMetadata ,
242- telemetry_measurements : T .Optional [T .List [camm_parser .TelemetryMeasurement ]] = None ,
243- ):
227+ def camm_sample_generator2 (camm_info : camm_parser .CAMMInfo ):
244228 def _f (
245229 fp : T .BinaryIO ,
246- moov_children : T . List [builder .BoxDict ],
230+ moov_children : list [builder .BoxDict ],
247231 ) -> T .Generator [io .IOBase , None , None ]:
248232 movie_timescale = builder .find_movie_timescale (moov_children )
249- # make sure the precision of timedeltas not lower than 0.001 (1ms)
233+ # Make sure the precision of timedeltas not lower than 0.001 (1ms)
250234 media_timescale = max (1000 , movie_timescale )
251235
252- # points with negative time are skipped
253- # TODO: interpolate first point at time == 0
254- # TODO: measurements with negative times should be skipped too
255- points = [point for point in video_metadata .points if point .time >= 0 ]
256-
257- measurements = _multiplex (points , telemetry_measurements )
236+ # Multiplex points for creating elst
237+ track : list [geo .Point ] = [
238+ * (camm_info .gps or []),
239+ * (camm_info .mini_gps or []),
240+ ]
241+ track .sort (key = lambda p : p .time )
242+ if track and track [0 ].time < 0 :
243+ track = [p for p in track if p .time >= 0 ]
244+ elst = _create_edit_list_from_points ([track ], movie_timescale , media_timescale )
245+
246+ # Multiplex telemetry measurements
247+ measurements : list [camm_parser .TelemetryMeasurement ] = [
248+ * (camm_info .gps or []),
249+ * (camm_info .mini_gps or []),
250+ * (camm_info .accl or []),
251+ * (camm_info .gyro or []),
252+ * (camm_info .magn or []),
253+ ]
254+ measurements .sort (key = lambda m : m .time )
255+ if measurements and measurements [0 ].time < 0 :
256+ measurements = [m for m in measurements if m .time >= 0 ]
257+
258+ # Serialize the telemetry measurements into MP4 samples
258259 camm_samples = list (
259260 convert_telemetry_to_raw_samples (measurements , media_timescale )
260261 )
262+
261263 camm_trak = create_camm_trak (camm_samples , media_timescale )
262- elst = _create_edit_list_from_points ([ points ], movie_timescale , media_timescale )
264+
263265 if T .cast (T .Dict , elst ["data" ])["entries" ]:
264266 T .cast (T .List [builder .BoxDict ], camm_trak ["data" ]).append (
265267 {
@@ -269,19 +271,19 @@ def _f(
269271 )
270272 moov_children .append (camm_trak )
271273
272- udta_data : T . List [builder .BoxDict ] = []
273- if video_metadata .make :
274+ udta_data : list [builder .BoxDict ] = []
275+ if camm_info .make :
274276 udta_data .append (
275277 {
276278 "type" : b"@mak" ,
277- "data" : video_metadata .make .encode ("utf-8" ),
279+ "data" : camm_info .make .encode ("utf-8" ),
278280 }
279281 )
280- if video_metadata .model :
282+ if camm_info .model :
281283 udta_data .append (
282284 {
283285 "type" : b"@mod" ,
284- "data" : video_metadata .model .encode ("utf-8" ),
286+ "data" : camm_info .model .encode ("utf-8" ),
285287 }
286288 )
287289 if udta_data :
0 commit comments