Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions mapillary_tools/video_data_extraction/extractors/camm_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ class CammParser(BaseParser):
parser_label = "camm"

@functools.cached_property
def __camera_info(self) -> T.Tuple[str, str]:
with self.videoPath.open("rb") as fp:
def _camera_info(self) -> T.Tuple[str, str]:
source_path = self.geotag_source_path
if not source_path:
return "", ""

with source_path.open("rb") as fp:
return camm_parser.extract_camera_make_and_model(fp)

def extract_points(self) -> T.Sequence[geo.Point]:
Expand All @@ -28,15 +32,7 @@ def extract_points(self) -> T.Sequence[geo.Point]:
return []

def extract_make(self) -> T.Optional[str]:
source_path = self.geotag_source_path
if not source_path:
return None
with source_path.open("rb") as _fp:
return self.__camera_info[0] or None
return self._camera_info[0] or None

def extract_model(self) -> T.Optional[str]:
source_path = self.geotag_source_path
if not source_path:
return None
with source_path.open("rb") as _fp:
return self.__camera_info[1] or None
return self._camera_info[1] or None
75 changes: 53 additions & 22 deletions mapillary_tools/video_data_extraction/extractors/gpx_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,40 +38,71 @@ def extract_points(self) -> T.Sequence[geo.Point]:
if not gpx_points:
return gpx_points

offset = self._synx_gpx_by_first_gps_timestamp(gpx_points)

self._rebase_times(gpx_points, offset=offset)

return gpx_points

def _synx_gpx_by_first_gps_timestamp(
self, gpx_points: T.Sequence[geo.Point]
) -> float:
offset: float = 0.0

if not gpx_points:
return offset

first_gpx_dt = datetime.datetime.fromtimestamp(
gpx_points[0].time, tz=datetime.timezone.utc
)
LOG.info("First GPX timestamp: %s", first_gpx_dt)

# Extract first GPS timestamp (if found) for synchronization
offset: float = 0.0
parser = GenericVideoParser(self.videoPath, self.options, self.parserOptions)
# Use an empty dictionary to force video parsers to extract make/model from the video metadata itself
parser = GenericVideoParser(self.videoPath, self.options, {})
gps_points = parser.extract_points()
if gps_points:
first_gps_point = gps_points[0]
if isinstance(first_gps_point, telemetry.GPSPoint):
if first_gps_point.epoch_time is not None:
first_gps_dt = datetime.datetime.fromtimestamp(
first_gps_point.epoch_time, tz=datetime.timezone.utc
)
LOG.info("First GPS timestamp: %s", first_gps_dt)
offset = gpx_points[0].time - first_gps_point.epoch_time
if offset:
LOG.warning(
"Found offset between GPX %s and video GPS timestamps %s: %s seconds",
first_gpx_dt,
first_gps_dt,
offset,
)

self._rebase_times(gpx_points, offset=offset)
if not gps_points:
LOG.warning(
"Skip GPX synchronization because no GPS found in video %s",
self.videoPath,
)
return offset

return gpx_points
first_gps_point = gps_points[0]
if isinstance(first_gps_point, telemetry.GPSPoint):
if first_gps_point.epoch_time is not None:
first_gps_dt = datetime.datetime.fromtimestamp(
first_gps_point.epoch_time, tz=datetime.timezone.utc
)
LOG.info("First GPS timestamp: %s", first_gps_dt)
offset = gpx_points[0].time - first_gps_point.epoch_time
if offset:
LOG.warning(
"Found offset between GPX %s and video GPS timestamps %s: %s seconds",
first_gpx_dt,
first_gps_dt,
offset,
)
else:
LOG.info(
"GPX and GPS are perfectly synchronized (all starts from %s)",
first_gpx_dt,
)
else:
LOG.warning(
"Skip GPX synchronization because no GPS epoch time found in video %s",
self.videoPath,
)

return offset

def extract_make(self) -> T.Optional[str]:
parser = GenericVideoParser(self.videoPath, self.options, self.parserOptions)
# Use an empty dictionary to force video parsers to extract make/model from the video metadata itself
parser = GenericVideoParser(self.videoPath, self.options, {})
return parser.extract_make()

def extract_model(self) -> T.Optional[str]:
parser = GenericVideoParser(self.videoPath, self.options, self.parserOptions)
# Use an empty dictionary to force video parsers to extract make/model from the video metadata itself
parser = GenericVideoParser(self.videoPath, self.options, {})
return parser.extract_model()
Loading