Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 87 additions & 24 deletions mapillary_tools/camm/camm_builder.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import io
import typing as T

from .. import geo, types
from .. import geo, telemetry, types
from ..mp4 import (
construct_mp4_parser as cparser,
mp4_sample_parser as sample_parser,
Expand All @@ -11,20 +11,65 @@
from . import camm_parser


def build_camm_sample(point: geo.Point) -> bytes:
return camm_parser.CAMMSampleData.build(
{
"type": camm_parser.CAMMType.MIN_GPS.value,
"data": [
point.lat,
point.lon,
-1.0 if point.alt is None else point.alt,
],
}
)
TelemetryMeasurement = T.Union[
geo.Point,
telemetry.TelemetryMeasurement,
]


def _create_edit_list(
def _build_camm_sample(measurement: TelemetryMeasurement) -> bytes:
if isinstance(measurement, geo.Point):
return camm_parser.CAMMSampleData.build(
{
"type": camm_parser.CAMMType.MIN_GPS.value,
"data": [
measurement.lat,
measurement.lon,
-1.0 if measurement.alt is None else measurement.alt,
],
}
)
elif isinstance(measurement, telemetry.AccelerationData):
# Accelerometer reading in meters/second^2 along XYZ axes of the camera.
return camm_parser.CAMMSampleData.build(
{
"type": camm_parser.CAMMType.ACCELERATION.value,
"data": [
measurement.x,
measurement.y,
measurement.z,
],
}
)
elif isinstance(measurement, telemetry.GyroscopeData):
# Gyroscope signal in radians/seconds around XYZ axes of the camera. Rotation is positive in the counterclockwise direction.
return camm_parser.CAMMSampleData.build(
{
"type": camm_parser.CAMMType.GYRO.value,
"data": [
measurement.x,
measurement.y,
measurement.z,
],
}
)
elif isinstance(measurement, telemetry.MagnetometerData):
# Ambient magnetic field.
return camm_parser.CAMMSampleData.build(
{
"type": camm_parser.CAMMType.MAGNETIC_FIELD.value,
"data": [
measurement.x,
measurement.y,
measurement.z,
],
}
)
else:
raise ValueError(f"unexpected measurement type {type(measurement)}")


def _create_edit_list_from_points(
point_segments: T.Sequence[T.Sequence[geo.Point]],
movie_timescale: int,
media_timescale: int,
Expand Down Expand Up @@ -82,18 +127,30 @@ def _create_edit_list(
}


def convert_points_to_raw_samples(
points: T.Sequence[geo.Point], timescale: int
def _multiplex(
points: T.Sequence[geo.Point],
measurements: T.Optional[T.List[telemetry.TelemetryMeasurement]] = None,
) -> T.List[TelemetryMeasurement]:
mutiplexed: T.List[TelemetryMeasurement] = [*points, *(measurements or [])]
mutiplexed.sort(key=lambda m: m.time)

return mutiplexed


def convert_telemetry_to_raw_samples(
measurements: T.Sequence[TelemetryMeasurement],
timescale: int,
) -> T.Generator[sample_parser.RawSample, None, None]:
for idx, point in enumerate(points):
camm_sample_data = build_camm_sample(point)
for idx, measurement in enumerate(measurements):
camm_sample_data = _build_camm_sample(measurement)

if idx + 1 < len(points):
timedelta = int((points[idx + 1].time - point.time) * timescale)
if idx + 1 < len(measurements):
timedelta = int((measurements[idx + 1].time - measurement.time) * timescale)
else:
timedelta = 0

assert 0 <= timedelta <= builder.UINT32_MAX, (
f"expected timedelta {timedelta} between {points[idx]} and {points[idx + 1]} with timescale {timescale} to be <= UINT32_MAX"
f"expected timedelta {timedelta} between {measurements[idx]} and {measurements[idx + 1]} with timescale {timescale} to be <= UINT32_MAX"
)

yield sample_parser.RawSample(
Expand Down Expand Up @@ -232,19 +289,23 @@ def create_camm_trak(
}


def camm_sample_generator2(video_metadata: types.VideoMetadata):
def camm_sample_generator2(
video_metadata: types.VideoMetadata,
telemetry_measurements: T.Optional[T.List[telemetry.TelemetryMeasurement]] = None,
):
def _f(
fp: T.BinaryIO,
moov_children: T.List[builder.BoxDict],
) -> T.Generator[io.IOBase, None, None]:
movie_timescale = builder.find_movie_timescale(moov_children)
# make sure the precision of timedeltas not lower than 0.001 (1ms)
media_timescale = max(1000, movie_timescale)
measurements = _multiplex(video_metadata.points, telemetry_measurements)
camm_samples = list(
convert_points_to_raw_samples(video_metadata.points, media_timescale)
convert_telemetry_to_raw_samples(measurements, media_timescale)
)
camm_trak = create_camm_trak(camm_samples, media_timescale)
elst = _create_edit_list(
elst = _create_edit_list_from_points(
[video_metadata.points], movie_timescale, media_timescale
)
if T.cast(T.Dict, elst["data"])["entries"]:
Expand Down Expand Up @@ -280,6 +341,8 @@ def _f(
)

# if yield, the moov_children will not be modified
return (io.BytesIO(build_camm_sample(point)) for point in video_metadata.points)
return (
io.BytesIO(_build_camm_sample(measurement)) for measurement in measurements
)

return _f
159 changes: 117 additions & 42 deletions mapillary_tools/camm/camm_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,22 @@

import construct as C

from .. import geo
from ..mp4 import mp4_sample_parser as sample_parser, simple_mp4_parser as sparser
from .. import geo, telemetry
from ..mp4 import simple_mp4_parser as sparser
from ..mp4.mp4_sample_parser import MovieBoxParser, Sample, TrackBoxParser


LOG = logging.getLogger(__name__)


TelemetryMeasurement = T.Union[
geo.Point,
telemetry.AccelerationData,
telemetry.GyroscopeData,
telemetry.MagnetometerData,
]


# Camera Motion Metadata Spec https://developers.google.com/streetview/publish/camm-spec
class CAMMType(Enum):
ANGLE_AXIS = 0
Expand Down Expand Up @@ -75,9 +84,9 @@ class CAMMType(Enum):
)


def _parse_point_from_sample(
fp: T.BinaryIO, sample: sample_parser.Sample
) -> T.Optional[geo.Point]:
def _parse_telemetry_from_sample(
fp: T.BinaryIO, sample: Sample
) -> T.Optional[TelemetryMeasurement]:
fp.seek(sample.raw_sample.offset, io.SEEK_SET)
data = fp.read(sample.raw_sample.size)
box = CAMMSampleData.parse(data)
Expand All @@ -99,12 +108,34 @@ def _parse_point_from_sample(
alt=box.data.altitude,
angle=None,
)
elif box.type == CAMMType.ACCELERATION.value:
return telemetry.AccelerationData(
time=sample.exact_time,
x=box.data[0],
y=box.data[1],
z=box.data[2],
)
elif box.type == CAMMType.GYRO.value:
return telemetry.GyroscopeData(
time=sample.exact_time,
x=box.data[0],
y=box.data[1],
z=box.data[2],
)
elif box.type == CAMMType.MAGNETIC_FIELD.value:
return telemetry.MagnetometerData(
time=sample.exact_time,
x=box.data[0],
y=box.data[1],
z=box.data[2],
)
return None


def filter_points_by_elst(
points: T.Iterable[geo.Point], elst: T.Sequence[T.Tuple[float, float]]
) -> T.Generator[geo.Point, None, None]:
def _filter_telemetry_by_elst_segments(
measurements: T.Iterable[TelemetryMeasurement],
elst: T.Sequence[T.Tuple[float, float]],
) -> T.Generator[TelemetryMeasurement, None, None]:
empty_elst = [entry for entry in elst if entry[0] == -1]
if empty_elst:
offset = empty_elst[-1][1]
Expand All @@ -114,20 +145,26 @@ def filter_points_by_elst(
elst = [entry for entry in elst if entry[0] != -1]

if not elst:
for p in points:
yield dataclasses.replace(p, time=p.time + offset)
for m in measurements:
if dataclasses.is_dataclass(m):
yield dataclasses.replace(m, time=m.time + offset)
else:
m._replace(time=m.time + offset)
return

elst.sort(key=lambda entry: entry[0])
elst_idx = 0
for p in points:
for m in measurements:
if len(elst) <= elst_idx:
break
media_time, duration = elst[elst_idx]
if p.time < media_time:
if m.time < media_time:
pass
elif p.time <= media_time + duration:
yield dataclasses.replace(p, time=p.time + offset)
elif m.time <= media_time + duration:
if dataclasses.is_dataclass(m):
yield dataclasses.replace(m, time=m.time + offset)
else:
m._replace(time=m.time + offset)
else:
elst_idx += 1

Expand All @@ -148,46 +185,84 @@ def _is_camm_description(description: T.Dict) -> bool:
return description["format"] == b"camm"


def _contains_camm_description(track: TrackBoxParser) -> bool:
descriptions = track.extract_sample_descriptions()
return any(_is_camm_description(d) for d in descriptions)


def _filter_telemetry_by_track_elst(
moov: MovieBoxParser,
track: TrackBoxParser,
measurements: T.Iterable[TelemetryMeasurement],
) -> T.List[TelemetryMeasurement]:
elst_boxdata = track.extract_elst_boxdata()

if elst_boxdata is not None:
elst_entries = elst_boxdata["entries"]
if elst_entries:
# media_timescale
mdhd_boxdata = track.extract_mdhd_boxdata()
media_timescale = mdhd_boxdata["timescale"]

# movie_timescale
mvhd_boxdata = moov.extract_mvhd_boxdata()
movie_timescale = mvhd_boxdata["timescale"]

segments = [
elst_entry_to_seconds(
entry,
movie_timescale=movie_timescale,
media_timescale=media_timescale,
)
for entry in elst_entries
]

return list(_filter_telemetry_by_elst_segments(measurements, segments))

return list(measurements)


def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.Point]]:
"""
Return a list of points (could be empty) if it is a valid CAMM video,
otherwise None
"""

points = None
moov = MovieBoxParser.parse_stream(fp)

moov = sample_parser.MovieBoxParser.parse_stream(fp)
for track in moov.extract_tracks():
descriptions = track.extract_sample_descriptions()
if any(_is_camm_description(d) for d in descriptions):
maybe_points = (
_parse_point_from_sample(fp, sample)
if _contains_camm_description(track):
maybe_measurements = (
_parse_telemetry_from_sample(fp, sample)
for sample in track.extract_samples()
if _is_camm_description(sample.description)
)
points = [p for p in maybe_points if p is not None]
if points:
elst_boxdata = track.extract_elst_boxdata()
if elst_boxdata is not None:
elst_entries = elst_boxdata["entries"]
if elst_entries:
# media_timescale
mdhd_boxdata = track.extract_mdhd_boxdata()
media_timescale = mdhd_boxdata["timescale"]
# movie_timescale
mvhd_boxdata = moov.extract_mvhd_boxdata()
movie_timescale = mvhd_boxdata["timescale"]
segments = [
elst_entry_to_seconds(
entry,
movie_timescale=movie_timescale,
media_timescale=media_timescale,
)
for entry in elst_entries
]
points = list(filter_points_by_elst(points, segments))
points = [m for m in maybe_measurements if isinstance(m, geo.Point)]

return points
return T.cast(
T.List[geo.Point], _filter_telemetry_by_track_elst(moov, track, points)
)

return None


def extract_telemetry_data(fp: T.BinaryIO) -> T.Optional[T.List[TelemetryMeasurement]]:
moov = MovieBoxParser.parse_stream(fp)

for track in moov.extract_tracks():
if _contains_camm_description(track):
maybe_measurements = (
_parse_telemetry_from_sample(fp, sample)
for sample in track.extract_samples()
if _is_camm_description(sample.description)
)
measurements = [m for m in maybe_measurements if m is not None]

measurements = _filter_telemetry_by_track_elst(moov, track, measurements)

return measurements

return None


def parse_gpx(path: pathlib.Path) -> T.List[geo.Point]:
Expand Down
Loading
Loading