From dc18ccd268362f427773e2b4d48aff734172e2aa Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Mon, 17 Mar 2025 11:22:45 -0700 Subject: [PATCH 1/5] add tests --- .../process_sequence_properties.py | 501 ++++++++++-------- tests/unit/test_sequence_processing.py | 53 ++ 2 files changed, 329 insertions(+), 225 deletions(-) diff --git a/mapillary_tools/process_sequence_properties.py b/mapillary_tools/process_sequence_properties.py index 7f77a7d5e..08afb8939 100644 --- a/mapillary_tools/process_sequence_properties.py +++ b/mapillary_tools/process_sequence_properties.py @@ -9,60 +9,39 @@ LOG = logging.getLogger(__name__) +SeqItem = T.TypeVar("SeqItem") PointLike = T.TypeVar("PointLike", bound=geo.Point) PointSequence = T.List[PointLike] -def cut_sequence_by_time_or_distance( - sequence: PointSequence, - cutoff_distance: T.Optional[float] = None, - cutoff_time: T.Optional[float] = None, -) -> T.List[PointSequence]: - sequences: T.List[PointSequence] = [] +def split_sequence_by( + sequence: T.List[SeqItem], + should_split: T.Callable[[SeqItem, SeqItem], bool], +) -> T.List[T.List[SeqItem]]: + """ + Split a sequence into multiple sequences by should_split(prev, cur) => True + """ + sequences: T.List[T.List[SeqItem]] = [] - if sequence: - sequences.append([sequence[0]]) + seq = iter(sequence) - for prev, cur in geo.pairwise(sequence): - # invariant: prev is processed + prev = next(seq, None) + if prev is None: + return sequences - # Cut by distance - distance = geo.gps_distance( - (prev.lat, prev.lon), - (cur.lat, cur.lon), - ) - if cutoff_distance is not None: - if cutoff_distance <= distance: - LOG.debug( - "Cut the sequence because the distance gap between two images (%s meters) exceeds the cutoff distance (%s meters): %s: %s -> %s", - round(distance, 2), - round(cutoff_distance, 2), - prev.filename.parent, - prev.filename.name, - cur.filename.name, - ) - sequences.append([cur]) - continue - - # Cut by time - time_diff = cur.time - prev.time - assert 0 <= time_diff, "sequence must be sorted by capture times" - if cutoff_time is not None: - if cutoff_time <= time_diff: - LOG.debug( - "Cut the sequence because the time gap between two images (%s seconds) exceeds the cutoff time (%s seconds): %s: %s -> %s", - round(time_diff, 2), - round(cutoff_time, 2), - prev.filename.parent, - prev.filename.name, - cur.filename.name, - ) - sequences.append([cur]) - continue + sequences.append([prev]) - sequences[-1].append(cur) + for cur in seq: + # invariant: prev is processed + if should_split(prev, cur): + sequences.append([cur]) + else: + sequences[-1].append(cur) + prev = cur # invariant: cur is processed + assert sum(len(s) for s in sequences) == len(sequence) + return sequences @@ -74,13 +53,14 @@ def duplication_check( dedups: PointSequence = [] dups: T.List[types.ErrorMetadata] = [] - sequence_iter = iter(sequence) - prev = next(sequence_iter) + it = iter(sequence) + prev = next(it) if prev is None: return dedups, dups + dedups.append(prev) - for cur in sequence_iter: + for cur in it: # invariant: prev is processed distance = geo.gps_distance( (prev.lat, prev.lon), @@ -96,18 +76,17 @@ def duplication_check( angle_diff is None or angle_diff <= max_duplicate_angle ): msg = f"Duplicate of its previous image in terms of distance <= {max_duplicate_distance} and angle <= {max_duplicate_angle}" - dups.append( - types.describe_error_metadata( - exceptions.MapillaryDuplicationError( - msg, - types.as_desc(cur), - distance=distance, - angle_diff=angle_diff, - ), - cur.filename, - filetype=types.FileType.IMAGE, + dup = types.describe_error_metadata( + exceptions.MapillaryDuplicationError( + msg, + types.as_desc(cur), + distance=distance, + angle_diff=angle_diff, ), + cur.filename, + filetype=types.FileType.IMAGE, ) + dups.append(dup) # prev does not change else: dedups.append(cur) @@ -117,68 +96,93 @@ def duplication_check( return dedups, dups -def cut_sequence( - sequence: T.List[types.ImageMetadata], - max_images: int, - max_sequence_filesize: int, - max_sequence_pixels: int, -) -> T.List[T.List[types.ImageMetadata]]: +def split_sequence_by_agg( + sequence: T.List[SeqItem], + should_split_with_sequence_state: T.Callable[[SeqItem, T.Dict], bool], +) -> T.List[T.List[SeqItem]]: """ - Cut a sequence into multiple sequences by max_images or max filesize + Split a sequence by should_split_with_sequence_state(image, sequence_state) => True """ - sequences: T.List[T.List[types.ImageMetadata]] = [] - last_sequence_file_size = 0 - last_sequence_pixels = 0 + sequences: T.List[T.List[SeqItem]] = [] + sequence_state: T.Dict = {} + + for cur in sequence: + start_new_sequence = should_split_with_sequence_state(cur, sequence_state) + + if not sequences: + sequences.append([]) + + if start_new_sequence: + # DO NOT reset the state because it contains the information of current item + # sequence_state = {} + if sequences[-1]: + sequences.append([]) + + sequences[-1].append(cur) + + assert sum(len(s) for s in sequences) == len(sequence) + + return sequences + + +def _should_spilt_by_limits( + max_sequence_images: int, max_sequence_filesize: float, max_sequence_pixels: float +) -> T.Callable[[types.ImageMetadata, T.Dict], bool]: + def _should_split(image: types.ImageMetadata, sequence_state: T.Dict) -> bool: + last_sequence_images = sequence_state.get("last_sequence_images", 0) + last_sequence_file_size = sequence_state.get("last_sequence_file_size", 0) + last_sequence_pixels = sequence_state.get("last_sequence_pixels", 0) - for image in sequence: # decent default values if width/height not available width = 1024 if image.width is None else image.width height = 1024 if image.height is None else image.height + pixels = width * height - filesize = os.path.getsize(image.filename) + if image.filesize is None: + filesize = os.path.getsize(image.filename) + else: + filesize = image.filesize + + new_sequence_images = last_sequence_images + 1 + new_sequence_file_size = last_sequence_file_size + filesize + new_sequence_pixels = last_sequence_pixels + pixels - if len(sequences) == 0: + if max_sequence_images < new_sequence_images: + LOG.debug( + "Split the sequence because the current sequence (%s) reaches the max number of images per sequence (%s)", + new_sequence_images, + max_sequence_images, + ) + start_new_sequence = True + elif max_sequence_filesize < new_sequence_file_size: + LOG.debug( + "Split the sequence because the current sequence (%s) reaches the max filesize per sequence (%s)", + new_sequence_file_size, + max_sequence_filesize, + ) + start_new_sequence = True + elif max_sequence_pixels < new_sequence_pixels: + LOG.debug( + "Split the sequence because the current sequence (%s) reaches the max pixels per sequence (%s)", + new_sequence_pixels, + max_sequence_pixels, + ) start_new_sequence = True else: - if sequences[-1]: - if max_images < len(sequences[-1]): - LOG.debug( - "Cut the sequence because the current sequence (%s) reaches the max number of images (%s)", - len(sequences[-1]), - max_images, - ) - start_new_sequence = True - elif max_sequence_filesize < last_sequence_file_size + filesize: - LOG.debug( - "Cut the sequence because the current sequence (%s) reaches the max filesize (%s)", - last_sequence_file_size + filesize, - max_sequence_filesize, - ) - start_new_sequence = True - elif max_sequence_pixels < last_sequence_pixels + width * height: - LOG.debug( - "Cut the sequence because the current sequence (%s) reaches the max pixels (%s)", - last_sequence_pixels + width * height, - max_sequence_pixels, - ) - start_new_sequence = True - else: - start_new_sequence = False - else: - start_new_sequence = False - - if start_new_sequence: - sequences.append([]) - last_sequence_file_size = 0 - last_sequence_pixels = 0 + start_new_sequence = False - sequences[-1].append(image) - last_sequence_file_size += filesize - last_sequence_pixels += width * height + if not start_new_sequence: + sequence_state["last_sequence_images"] = new_sequence_images + sequence_state["last_sequence_file_size"] = new_sequence_file_size + sequence_state["last_sequence_pixels"] = new_sequence_pixels + else: + sequence_state["last_sequence_images"] = 1 + sequence_state["last_sequence_file_size"] = filesize + sequence_state["last_sequence_pixels"] = pixels - assert sum(len(s) for s in sequences) == len(sequence) + return start_new_sequence - return sequences + return _should_split def _group_by( @@ -422,142 +426,189 @@ def process_sequence_properties( else: raise RuntimeError(f"invalid metadata type: {metadata}") - # Check limits for videos - video_metadatas, video_error_metadatas = _check_video_limits( - video_metadatas, - max_sequence_filesize_in_bytes=max_sequence_filesize_in_bytes, - max_avg_speed=max_avg_speed, - ) - error_metadatas.extend(video_error_metadatas) - - input_sequences: T.List[PointSequence] - output_sequences: T.List[PointSequence] - - # Group by folder and camera - grouped = _group_by( - image_metadatas, - lambda metadata: ( - str(metadata.filename.parent), - metadata.MAPDeviceMake, - metadata.MAPDeviceModel, - metadata.width, - metadata.height, - ), - ) - for key in grouped: - LOG.debug("Group sequences by %s: %s images", key, len(grouped[key])) - output_sequences = list(grouped.values()) - LOG.info( - "Found %s sequences from different folders and cameras", len(output_sequences) - ) - - # Make sure each sequence is sorted (in-place update) - input_sequences = output_sequences - for sequence in input_sequences: - sequence.sort( - key=lambda metadata: metadata.sort_key(), + if video_metadatas: + # Check limits for videos + video_metadatas, video_error_metadatas = _check_video_limits( + video_metadatas, + max_sequence_filesize_in_bytes=max_sequence_filesize_in_bytes, + max_avg_speed=max_avg_speed, ) - output_sequences = input_sequences - - # Interpolate subseconds for same timestamps (in-place update) - input_sequences = output_sequences - for sequence in input_sequences: - _interpolate_subsecs_for_sorting(sequence) - output_sequences = input_sequences - - # Cut sequences by cutoff time - # NOTE: Do not cut by distance here because it affects the speed limit check - input_sequences = output_sequences - output_sequences = [] - for sequence in input_sequences: - output_sequences.extend( - cut_sequence_by_time_or_distance(sequence, cutoff_time=cutoff_time) + error_metadatas.extend(video_error_metadatas) + + if image_metadatas: + input_sequences: T.List[PointSequence] + output_sequences: T.List[PointSequence] + + # Group by folder and camera + grouped = _group_by( + image_metadatas, + lambda metadata: ( + str(metadata.filename.parent), + metadata.MAPDeviceMake, + metadata.MAPDeviceModel, + metadata.width, + metadata.height, + ), + ) + for key in grouped: + LOG.debug("Group sequences by %s: %s images", key, len(grouped[key])) + output_sequences = list(grouped.values()) + LOG.info( + "Found %s sequences from different folders and cameras", + len(output_sequences), ) - LOG.info( - "Found %s sequences after cut by cutoff time %d seconds", - len(output_sequences), - cutoff_time, - ) - # Duplication check - input_sequences = output_sequences - output_sequences = [] - for sequence in input_sequences: - output_sequence, errors = duplication_check( - sequence, - max_duplicate_distance=duplicate_distance, - max_duplicate_angle=duplicate_angle, + # Make sure each sequence is sorted (in-place update) + input_sequences = output_sequences + for sequence in input_sequences: + sequence.sort( + key=lambda metadata: metadata.sort_key(), + ) + output_sequences = input_sequences + + # Interpolate subseconds for same timestamps (in-place update) + input_sequences = output_sequences + for sequence in input_sequences: + _interpolate_subsecs_for_sorting(sequence) + output_sequences = input_sequences + + # Cut sequences by cutoff time + # NOTE: Do not cut by distance here because it affects the speed limit check + + def _should_split_by_cutoff_time( + prev: types.ImageMetadata, cur: types.ImageMetadata + ) -> bool: + time_diff = cur.time - prev.time + assert 0 <= time_diff, "sequence must be sorted by capture times" + should = cutoff_time < time_diff + if should: + LOG.debug( + "Split the sequence because the time gap between two images (%s seconds) exceeds the cutoff time (%s seconds): %s: %s -> %s", + round(time_diff, 2), + round(cutoff_time, 2), + prev.filename.parent, + prev.filename.name, + cur.filename.name, + ) + return should + + input_sequences = output_sequences + output_sequences = [] + for sequence in input_sequences: + output_sequences.extend( + split_sequence_by(sequence, should_split=_should_split_by_cutoff_time) + ) + LOG.info( + "Found %s sequences after cut by cutoff time %d seconds", + len(output_sequences), + cutoff_time, ) - assert len(sequence) == len(output_sequence) + len(errors) - output_sequences.append(output_sequence) - error_metadatas.extend(errors) - # Interpolate angles (in-place update) - input_sequences = output_sequences - for sequence in input_sequences: - if interpolate_directions: - for image in sequence: - image.angle = None - geo.interpolate_directions_if_none(sequence) - output_sequences = input_sequences - - # Cut sequences by max number of images, max filesize, and max pixels - input_sequences = output_sequences - output_sequences = [] - for sequence in input_sequences: - output_sequences.extend( - cut_sequence( + # Duplication check + input_sequences = output_sequences + output_sequences = [] + for sequence in input_sequences: + output_sequence, errors = duplication_check( sequence, - constants.MAX_SEQUENCE_LENGTH, - max_sequence_filesize_in_bytes, - max_sequence_pixels, + max_duplicate_distance=duplicate_distance, + max_duplicate_angle=duplicate_angle, ) + assert len(sequence) == len(output_sequence) + len(errors) + output_sequences.append(output_sequence) + error_metadatas.extend(errors) + + # Interpolate angles (in-place update) + input_sequences = output_sequences + for sequence in input_sequences: + if interpolate_directions: + for image in sequence: + image.angle = None + geo.interpolate_directions_if_none(sequence) + output_sequences = input_sequences + + # Cut sequences by max number of images, max filesize, and max pixels + should_split = _should_spilt_by_limits( + max_sequence_images=constants.MAX_SEQUENCE_LENGTH, + max_sequence_filesize=max_sequence_filesize_in_bytes, + max_sequence_pixels=max_sequence_pixels, + ) + input_sequences = output_sequences + output_sequences = [] + for sequence in input_sequences: + output_sequences.extend( + split_sequence_by_agg( + sequence, should_split_with_sequence_state=should_split + ) + ) + LOG.info( + "Found %s sequences after cut by sequence limits", len(output_sequences) ) - LOG.info("Found %s sequences after cut by sequence limits", len(output_sequences)) - # Check limits for sequences - input_sequences = output_sequences - output_sequences, errors = _check_sequence_limits( - input_sequences, max_sequence_filesize_in_bytes, max_avg_speed - ) - error_metadatas.extend(errors) - LOG.info("Found %s sequences after sequence limit checks", len(output_sequences)) - - # Cut sequences by cutoff distance - # NOTE: The speed limit check probably rejects most of anomalies - input_sequences = output_sequences - output_sequences = [] - for sequence in input_sequences: - output_sequences.extend( - cut_sequence_by_time_or_distance(sequence, cutoff_distance=cutoff_distance) + # Check limits for sequences + input_sequences = output_sequences + output_sequences, errors = _check_sequence_limits( + input_sequences, max_sequence_filesize_in_bytes, max_avg_speed + ) + error_metadatas.extend(errors) + LOG.info( + "Found %s sequences after sequence limit checks", len(output_sequences) ) - LOG.info( - "Found %s sequences after cut by cutoff distance %d meters", - len(output_sequences), - cutoff_distance, - ) - # Assign sequence UUIDs (in-place update) - sequence_idx = 0 - input_sequences = output_sequences - for sequence in input_sequences: - for image in sequence: - # using incremental id as shorter "uuid", so we can save some space for the desc file - image.MAPSequenceUUID = str(sequence_idx) - sequence_idx += 1 - output_sequences = input_sequences + # Cut sequences by cutoff distance + # NOTE: The speed limit check probably rejects most of anomalies + def _should_split_by_cutoff_distance( + prev: types.ImageMetadata, cur: types.ImageMetadata + ) -> bool: + distance = geo.gps_distance( + (prev.lat, prev.lon), + (cur.lat, cur.lon), + ) + should = cutoff_distance < distance + if should: + LOG.debug( + "Split the sequence because the distance gap between two images (%s meters) exceeds the cutoff distance (%s meters): %s: %s -> %s", + round(distance, 2), + round(cutoff_distance, 2), + prev.filename.parent, + prev.filename.name, + cur.filename.name, + ) + return should - image_metadatas = [] - for sequence in input_sequences: - image_metadatas.extend(sequence) + input_sequences = output_sequences + output_sequences = [] + for sequence in input_sequences: + output_sequences.extend( + split_sequence_by(sequence, _should_split_by_cutoff_distance) + ) + LOG.info( + "Found %s sequences after cut by cutoff distance %d meters", + len(output_sequences), + cutoff_distance, + ) + + # Assign sequence UUIDs (in-place update) + sequence_idx = 0 + input_sequences = output_sequences + for sequence in input_sequences: + for image in sequence: + # using incremental id as shorter "uuid", so we can save some space for the desc file + image.MAPSequenceUUID = str(sequence_idx) + sequence_idx += 1 + output_sequences = input_sequences + + image_metadatas = [] + for sequence in input_sequences: + image_metadatas.extend(sequence) + + assert sequence_idx == len( + set(metadata.MAPSequenceUUID for metadata in image_metadatas) + ) results = error_metadatas + image_metadatas + video_metadatas assert len(metadatas) == len(results), ( f"expected {len(metadatas)} results but got {len(results)}" ) - assert sequence_idx == len( - set(metadata.MAPSequenceUUID for metadata in image_metadatas) - ) return results diff --git a/tests/unit/test_sequence_processing.py b/tests/unit/test_sequence_processing.py index f1370ff5d..b0c374c8f 100644 --- a/tests/unit/test_sequence_processing.py +++ b/tests/unit/test_sequence_processing.py @@ -595,3 +595,56 @@ def test_video_error(tmpdir: py.path.local): duplicate_distance=100, duplicate_angle=5, ) + + +def test_split_sequence_by(): + """Test split_sequence_by function.""" + # Create test points + p1 = geo.Point(1, 1.00000, 1.00000, 1, angle=0) + p2 = geo.Point(2, 1.00001, 1.00001, 2, angle=0) + p3 = geo.Point(3, 1.00002, 1.00002, 3, angle=0) + p4 = geo.Point(10, 1.00003, 1.00003, 4, angle=0) # Large time gap + p5 = geo.Point(11, 1.00004, 1.00004, 5, angle=0) + p6 = geo.Point(12, 1.10000, 1.10000, 6, angle=0) # Large distance gap + p7 = geo.Point(13, 1.10001, 1.10001, 7, angle=0) + + # Create a sequence of points + sequence = [p1, p2, p3, p4, p5, p6, p7] + + # Test split by time gaps (> 5 seconds) + split_by_time = lambda prev, cur: cur.time - prev.time > 5 + sequences = psp.split_sequence_by(sequence, split_by_time) + + # Should be split into two sequences [p1,p2,p3], [p4,p5,p6,p7] + assert len(sequences) == 2 + assert sequences[0] == [p1, p2, p3] + assert sequences[1] == [p4, p5, p6, p7] + + # Test split by large distance gaps + def split_by_distance(prev, cur): + distance = geo.gps_distance( + (prev.lat, prev.lon), + (cur.lat, cur.lon), + ) + should = distance > 1000 # Split if distance > 1000 meters + return should + + sequences = psp.split_sequence_by(sequence, split_by_distance) + + # Should be split into two sequences [p1,p2,p3,p4,p5], [p6,p7] + assert len(sequences) == 2 + assert sequences[0] == [p1, p2, p3, p4, p5] + assert sequences[1] == [p6, p7] + + # Test empty sequence + empty_sequences = psp.split_sequence_by([], split_by_time) + assert len(empty_sequences) == 0 + + # Test single point sequence + single_point = [p1] + single_sequences = psp.split_sequence_by(single_point, split_by_time) + assert len(single_sequences) == 1 + assert single_sequences[0] == [p1] + + sequences = psp.split_sequence_by([], split_by_time) + assert len(sequences) == 0 From 991aa6bb6a5e98bf1de11f5dc92d9e68a9f39692 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Mon, 17 Mar 2025 13:59:52 -0700 Subject: [PATCH 2/5] refactor --- .../process_sequence_properties.py | 519 ++++++++++-------- 1 file changed, 293 insertions(+), 226 deletions(-) diff --git a/mapillary_tools/process_sequence_properties.py b/mapillary_tools/process_sequence_properties.py index 08afb8939..00880f919 100644 --- a/mapillary_tools/process_sequence_properties.py +++ b/mapillary_tools/process_sequence_properties.py @@ -21,28 +21,57 @@ def split_sequence_by( """ Split a sequence into multiple sequences by should_split(prev, cur) => True """ - sequences: T.List[T.List[SeqItem]] = [] + output_sequences: T.List[T.List[SeqItem]] = [] seq = iter(sequence) prev = next(seq, None) if prev is None: - return sequences + return output_sequences - sequences.append([prev]) + output_sequences.append([prev]) for cur in seq: # invariant: prev is processed if should_split(prev, cur): - sequences.append([cur]) + output_sequences.append([cur]) else: - sequences[-1].append(cur) + output_sequences[-1].append(cur) prev = cur # invariant: cur is processed - assert sum(len(s) for s in sequences) == len(sequence) + assert sum(len(s) for s in output_sequences) == len(sequence) - return sequences + return output_sequences + + +def split_sequence_by_agg( + sequence: T.List[SeqItem], + should_split_with_sequence_state: T.Callable[[SeqItem, T.Dict], bool], +) -> T.List[T.List[SeqItem]]: + """ + Split a sequence by should_split_with_sequence_state(cur, sequence_state) => True + """ + output_sequences: T.List[T.List[SeqItem]] = [] + sequence_state: T.Dict = {} + + for cur in sequence: + start_new_sequence = should_split_with_sequence_state(cur, sequence_state) + + if not output_sequences: + output_sequences.append([]) + + if start_new_sequence: + # DO NOT reset the state because it contains the information of current item + # sequence_state = {} + if output_sequences[-1]: + output_sequences.append([]) + + output_sequences[-1].append(cur) + + assert sum(len(s) for s in output_sequences) == len(sequence) + + return output_sequences def duplication_check( @@ -96,95 +125,6 @@ def duplication_check( return dedups, dups -def split_sequence_by_agg( - sequence: T.List[SeqItem], - should_split_with_sequence_state: T.Callable[[SeqItem, T.Dict], bool], -) -> T.List[T.List[SeqItem]]: - """ - Split a sequence by should_split_with_sequence_state(image, sequence_state) => True - """ - sequences: T.List[T.List[SeqItem]] = [] - sequence_state: T.Dict = {} - - for cur in sequence: - start_new_sequence = should_split_with_sequence_state(cur, sequence_state) - - if not sequences: - sequences.append([]) - - if start_new_sequence: - # DO NOT reset the state because it contains the information of current item - # sequence_state = {} - if sequences[-1]: - sequences.append([]) - - sequences[-1].append(cur) - - assert sum(len(s) for s in sequences) == len(sequence) - - return sequences - - -def _should_spilt_by_limits( - max_sequence_images: int, max_sequence_filesize: float, max_sequence_pixels: float -) -> T.Callable[[types.ImageMetadata, T.Dict], bool]: - def _should_split(image: types.ImageMetadata, sequence_state: T.Dict) -> bool: - last_sequence_images = sequence_state.get("last_sequence_images", 0) - last_sequence_file_size = sequence_state.get("last_sequence_file_size", 0) - last_sequence_pixels = sequence_state.get("last_sequence_pixels", 0) - - # decent default values if width/height not available - width = 1024 if image.width is None else image.width - height = 1024 if image.height is None else image.height - pixels = width * height - - if image.filesize is None: - filesize = os.path.getsize(image.filename) - else: - filesize = image.filesize - - new_sequence_images = last_sequence_images + 1 - new_sequence_file_size = last_sequence_file_size + filesize - new_sequence_pixels = last_sequence_pixels + pixels - - if max_sequence_images < new_sequence_images: - LOG.debug( - "Split the sequence because the current sequence (%s) reaches the max number of images per sequence (%s)", - new_sequence_images, - max_sequence_images, - ) - start_new_sequence = True - elif max_sequence_filesize < new_sequence_file_size: - LOG.debug( - "Split the sequence because the current sequence (%s) reaches the max filesize per sequence (%s)", - new_sequence_file_size, - max_sequence_filesize, - ) - start_new_sequence = True - elif max_sequence_pixels < new_sequence_pixels: - LOG.debug( - "Split the sequence because the current sequence (%s) reaches the max pixels per sequence (%s)", - new_sequence_pixels, - max_sequence_pixels, - ) - start_new_sequence = True - else: - start_new_sequence = False - - if not start_new_sequence: - sequence_state["last_sequence_images"] = new_sequence_images - sequence_state["last_sequence_file_size"] = new_sequence_file_size - sequence_state["last_sequence_pixels"] = new_sequence_pixels - else: - sequence_state["last_sequence_images"] = 1 - sequence_state["last_sequence_file_size"] = filesize - sequence_state["last_sequence_pixels"] = pixels - - return start_new_sequence - - return _should_split - - def _group_by( image_metadatas: T.List[types.ImageMetadata], group_key_func=T.Callable[[types.ImageMetadata], T.Hashable], @@ -340,18 +280,24 @@ def _check_video_limits( else: output_video_metadatas.append(video_metadata) + LOG.info( + "Found %s videos and %s errors after video limit checks", + len(output_video_metadatas), + len(error_metadatas), + ) + return output_video_metadatas, error_metadatas -def _check_sequence_limits( - sequences: T.Sequence[PointSequence], +def _check_sequences_by_limits( + input_sequences: T.Sequence[PointSequence], max_sequence_filesize_in_bytes: int, max_avg_speed: float, ) -> T.Tuple[T.List[PointSequence], T.List[types.ErrorMetadata]]: - error_metadatas: T.List[types.ErrorMetadata] = [] output_sequences: T.List[PointSequence] = [] + output_errors: T.List[types.ErrorMetadata] = [] - for sequence in sequences: + for sequence in input_sequences: filesize = 0 for image in sequence: if image.filesize is None: @@ -361,7 +307,7 @@ def _check_sequence_limits( if filesize > max_sequence_filesize_in_bytes: for image in sequence: - error_metadatas.append( + output_errors.append( types.describe_error_metadata( exc=exceptions.MapillaryFileTooLargeError( f"Sequence file size exceeds the maximum allowed file size ({max_sequence_filesize_in_bytes} bytes)", @@ -372,7 +318,7 @@ def _check_sequence_limits( ) elif any(image.lat == 0 and image.lon == 0 for image in sequence): for image in sequence: - error_metadatas.append( + output_errors.append( types.describe_error_metadata( exc=exceptions.MapillaryNullIslandError( "Found GPS coordinates in Null Island (0, 0)", @@ -383,7 +329,7 @@ def _check_sequence_limits( ) elif len(sequence) >= 2 and _avg_speed(sequence) > max_avg_speed: for image in sequence: - error_metadatas.append( + output_errors.append( types.describe_error_metadata( exc=exceptions.MapillaryCaptureSpeedTooFastError( f"Capture speed is too fast (exceeds {round(max_avg_speed, 3)} m/s)", @@ -395,7 +341,226 @@ def _check_sequence_limits( else: output_sequences.append(sequence) - return output_sequences, error_metadatas + assert sum(len(s) for s in output_sequences) + len(output_errors) == sum( + len(s) for s in input_sequences + ) + + LOG.info( + "Found %s sequences and %s errors after sequence limit checks", + len(output_sequences), + len(output_errors), + ) + + return output_sequences, output_errors + + +def _group_by_folder_and_camera( + image_metadatas: T.List[types.ImageMetadata], +) -> T.List[T.List[types.ImageMetadata]]: + grouped = _group_by( + image_metadatas, + lambda metadata: ( + str(metadata.filename.parent), + metadata.MAPDeviceMake, + metadata.MAPDeviceModel, + metadata.width, + metadata.height, + ), + ) + for key in grouped: + LOG.debug("Group sequences by %s: %s images", key, len(grouped[key])) + output_sequences = list(grouped.values()) + + LOG.info( + "Found %s sequences from different folders and cameras", + len(output_sequences), + ) + + return output_sequences + + +def _split_sequences_by_cutoff_time( + input_sequences: T.List[PointSequence], cutoff_time: float +) -> T.List[PointSequence]: + def _should_split_by_cutoff_time( + prev: types.ImageMetadata, cur: types.ImageMetadata + ) -> bool: + time_diff = cur.time - prev.time + assert 0 <= time_diff, "sequence must be sorted by capture times" + should = cutoff_time < time_diff + if should: + LOG.debug( + "Split because the time gap %s seconds exceeds the cutoff time (%s seconds): %s: %s -> %s", + round(time_diff, 2), + round(cutoff_time, 2), + prev.filename.parent, + prev.filename.name, + cur.filename.name, + ) + return should + + output_sequences = [] + for sequence in input_sequences: + output_sequences.extend( + split_sequence_by(sequence, should_split=_should_split_by_cutoff_time) + ) + + assert sum(len(s) for s in output_sequences) == sum(len(s) for s in input_sequences) + + LOG.info( + "Found %s sequences after split by cutoff time %d seconds", + len(output_sequences), + cutoff_time, + ) + + return output_sequences + + +def _split_sequences_by_cutoff_distance( + input_sequences: T.List[PointSequence], cutoff_distance: float +) -> T.List[PointSequence]: + def _should_split_by_cutoff_distance( + prev: types.ImageMetadata, cur: types.ImageMetadata + ) -> bool: + distance = geo.gps_distance( + (prev.lat, prev.lon), + (cur.lat, cur.lon), + ) + should = cutoff_distance < distance + if should: + LOG.debug( + "Split because the distance gap %s meters exceeds the cutoff distance (%s meters): %s: %s -> %s", + round(distance, 2), + round(cutoff_distance, 2), + prev.filename.parent, + prev.filename.name, + cur.filename.name, + ) + return should + + output_sequences = [] + for sequence in input_sequences: + output_sequences.extend( + split_sequence_by(sequence, _should_split_by_cutoff_distance) + ) + + assert sum(len(s) for s in output_sequences) == sum(len(s) for s in input_sequences) + + LOG.info( + "Found %s sequences after split by cutoff distance %d meters", + len(output_sequences), + cutoff_distance, + ) + + return output_sequences + + +def _check_sequences_duplication( + input_sequences: T.List[PointSequence], + duplicate_distance: float, + duplicate_angle: float, +) -> T.Tuple[T.List[PointSequence], T.List[types.ErrorMetadata]]: + output_sequences: T.List[PointSequence] = [] + output_errors: T.List[types.ErrorMetadata] = [] + + for sequence in input_sequences: + output_sequence, errors = duplication_check( + sequence, + max_duplicate_distance=duplicate_distance, + max_duplicate_angle=duplicate_angle, + ) + assert len(sequence) == len(output_sequence) + len(errors) + output_sequences.append(output_sequence) + output_errors.extend(errors) + + assert sum(len(s) for s in output_sequences) + len(output_errors) == sum( + len(s) for s in input_sequences + ) + + LOG.info( + "Found %s sequences and %s errors after duplication check", + len(output_sequences), + len(output_errors), + ) + + return output_sequences, output_errors + + +def _split_sequences_by_limits( + input_sequences: T.List[PointSequence], + max_sequence_filesize_in_bytes: float, + max_sequence_pixels: float, +) -> T.List[PointSequence]: + max_sequence_images = constants.MAX_SEQUENCE_LENGTH + max_sequence_filesize = max_sequence_filesize_in_bytes + + def _should_split(image: types.ImageMetadata, sequence_state: T.Dict) -> bool: + last_sequence_images = sequence_state.get("last_sequence_images", 0) + last_sequence_file_size = sequence_state.get("last_sequence_file_size", 0) + last_sequence_pixels = sequence_state.get("last_sequence_pixels", 0) + + # decent default values if width/height not available + width = 1024 if image.width is None else image.width + height = 1024 if image.height is None else image.height + pixels = width * height + + if image.filesize is None: + filesize = os.path.getsize(image.filename) + else: + filesize = image.filesize + + new_sequence_images = last_sequence_images + 1 + new_sequence_file_size = last_sequence_file_size + filesize + new_sequence_pixels = last_sequence_pixels + pixels + + if max_sequence_images < new_sequence_images: + LOG.debug( + "Split because the current sequence (%s) reaches the max number of images (%s)", + new_sequence_images, + max_sequence_images, + ) + start_new_sequence = True + elif max_sequence_filesize < new_sequence_file_size: + LOG.debug( + "Split because the current sequence (%s) reaches the max filesize (%s)", + new_sequence_file_size, + max_sequence_filesize, + ) + start_new_sequence = True + elif max_sequence_pixels < new_sequence_pixels: + LOG.debug( + "Split because the current sequence (%s) reaches the max pixels (%s)", + new_sequence_pixels, + max_sequence_pixels, + ) + start_new_sequence = True + else: + start_new_sequence = False + + if not start_new_sequence: + sequence_state["last_sequence_images"] = new_sequence_images + sequence_state["last_sequence_file_size"] = new_sequence_file_size + sequence_state["last_sequence_pixels"] = new_sequence_pixels + else: + sequence_state["last_sequence_images"] = 1 + sequence_state["last_sequence_file_size"] = filesize + sequence_state["last_sequence_pixels"] = pixels + + return start_new_sequence + + output_sequences = [] + for sequence in input_sequences: + output_sequences.extend( + split_sequence_by_agg( + sequence, should_split_with_sequence_state=_should_split + ) + ) + + assert sum(len(s) for s in output_sequences) == sum(len(s) for s in input_sequences) + + LOG.info("Found %s sequences after split by sequence limits", len(output_sequences)) + + return output_sequences def process_sequence_properties( @@ -436,169 +601,71 @@ def process_sequence_properties( error_metadatas.extend(video_error_metadatas) if image_metadatas: - input_sequences: T.List[PointSequence] - output_sequences: T.List[PointSequence] + sequences: T.List[PointSequence] # Group by folder and camera - grouped = _group_by( - image_metadatas, - lambda metadata: ( - str(metadata.filename.parent), - metadata.MAPDeviceMake, - metadata.MAPDeviceModel, - metadata.width, - metadata.height, - ), - ) - for key in grouped: - LOG.debug("Group sequences by %s: %s images", key, len(grouped[key])) - output_sequences = list(grouped.values()) - LOG.info( - "Found %s sequences from different folders and cameras", - len(output_sequences), - ) + sequences = _group_by_folder_and_camera(image_metadatas) # Make sure each sequence is sorted (in-place update) - input_sequences = output_sequences - for sequence in input_sequences: + for sequence in sequences: sequence.sort( key=lambda metadata: metadata.sort_key(), ) - output_sequences = input_sequences # Interpolate subseconds for same timestamps (in-place update) - input_sequences = output_sequences - for sequence in input_sequences: + for sequence in sequences: _interpolate_subsecs_for_sorting(sequence) - output_sequences = input_sequences # Cut sequences by cutoff time - # NOTE: Do not cut by distance here because it affects the speed limit check - - def _should_split_by_cutoff_time( - prev: types.ImageMetadata, cur: types.ImageMetadata - ) -> bool: - time_diff = cur.time - prev.time - assert 0 <= time_diff, "sequence must be sorted by capture times" - should = cutoff_time < time_diff - if should: - LOG.debug( - "Split the sequence because the time gap between two images (%s seconds) exceeds the cutoff time (%s seconds): %s: %s -> %s", - round(time_diff, 2), - round(cutoff_time, 2), - prev.filename.parent, - prev.filename.name, - cur.filename.name, - ) - return should - - input_sequences = output_sequences - output_sequences = [] - for sequence in input_sequences: - output_sequences.extend( - split_sequence_by(sequence, should_split=_should_split_by_cutoff_time) - ) - LOG.info( - "Found %s sequences after cut by cutoff time %d seconds", - len(output_sequences), - cutoff_time, - ) + # NOTE: Do not split by distance here because it affects the speed limit check + sequences = _split_sequences_by_cutoff_time(sequences, cutoff_time=cutoff_time) # Duplication check - input_sequences = output_sequences - output_sequences = [] - for sequence in input_sequences: - output_sequence, errors = duplication_check( - sequence, - max_duplicate_distance=duplicate_distance, - max_duplicate_angle=duplicate_angle, - ) - assert len(sequence) == len(output_sequence) + len(errors) - output_sequences.append(output_sequence) - error_metadatas.extend(errors) + sequences, errors = _check_sequences_duplication( + sequences, + duplicate_distance=duplicate_distance, + duplicate_angle=duplicate_angle, + ) + error_metadatas.extend(errors) # Interpolate angles (in-place update) - input_sequences = output_sequences - for sequence in input_sequences: + for sequence in sequences: if interpolate_directions: for image in sequence: image.angle = None geo.interpolate_directions_if_none(sequence) - output_sequences = input_sequences # Cut sequences by max number of images, max filesize, and max pixels - should_split = _should_spilt_by_limits( - max_sequence_images=constants.MAX_SEQUENCE_LENGTH, - max_sequence_filesize=max_sequence_filesize_in_bytes, + sequences = _split_sequences_by_limits( + sequences, + max_sequence_filesize_in_bytes=max_sequence_filesize_in_bytes, max_sequence_pixels=max_sequence_pixels, ) - input_sequences = output_sequences - output_sequences = [] - for sequence in input_sequences: - output_sequences.extend( - split_sequence_by_agg( - sequence, should_split_with_sequence_state=should_split - ) - ) - LOG.info( - "Found %s sequences after cut by sequence limits", len(output_sequences) - ) # Check limits for sequences - input_sequences = output_sequences - output_sequences, errors = _check_sequence_limits( - input_sequences, max_sequence_filesize_in_bytes, max_avg_speed + sequences, errors = _check_sequences_by_limits( + sequences, + max_sequence_filesize_in_bytes=max_sequence_filesize_in_bytes, + max_avg_speed=max_avg_speed, ) error_metadatas.extend(errors) - LOG.info( - "Found %s sequences after sequence limit checks", len(output_sequences) - ) # Cut sequences by cutoff distance # NOTE: The speed limit check probably rejects most of anomalies - def _should_split_by_cutoff_distance( - prev: types.ImageMetadata, cur: types.ImageMetadata - ) -> bool: - distance = geo.gps_distance( - (prev.lat, prev.lon), - (cur.lat, cur.lon), - ) - should = cutoff_distance < distance - if should: - LOG.debug( - "Split the sequence because the distance gap between two images (%s meters) exceeds the cutoff distance (%s meters): %s: %s -> %s", - round(distance, 2), - round(cutoff_distance, 2), - prev.filename.parent, - prev.filename.name, - cur.filename.name, - ) - return should - - input_sequences = output_sequences - output_sequences = [] - for sequence in input_sequences: - output_sequences.extend( - split_sequence_by(sequence, _should_split_by_cutoff_distance) - ) - LOG.info( - "Found %s sequences after cut by cutoff distance %d meters", - len(output_sequences), - cutoff_distance, + sequences = _split_sequences_by_cutoff_distance( + sequences, cutoff_distance=cutoff_distance ) # Assign sequence UUIDs (in-place update) sequence_idx = 0 - input_sequences = output_sequences - for sequence in input_sequences: + for sequence in sequences: for image in sequence: # using incremental id as shorter "uuid", so we can save some space for the desc file image.MAPSequenceUUID = str(sequence_idx) sequence_idx += 1 - output_sequences = input_sequences image_metadatas = [] - for sequence in input_sequences: + for sequence in sequences: image_metadatas.extend(sequence) assert sequence_idx == len( From df25e207db60acf338ace95218ee667db8d3ad3b Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Mon, 17 Mar 2025 14:33:18 -0700 Subject: [PATCH 3/5] update comments --- mapillary_tools/process_sequence_properties.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/mapillary_tools/process_sequence_properties.py b/mapillary_tools/process_sequence_properties.py index 00880f919..0b560805e 100644 --- a/mapillary_tools/process_sequence_properties.py +++ b/mapillary_tools/process_sequence_properties.py @@ -235,8 +235,8 @@ def _check_video_limits( max_sequence_filesize_in_bytes: int, max_avg_speed: float, ) -> T.Tuple[T.List[types.VideoMetadata], T.List[types.ErrorMetadata]]: - error_metadatas: T.List[types.ErrorMetadata] = [] output_video_metadatas: T.List[types.VideoMetadata] = [] + error_metadatas: T.List[types.ErrorMetadata] = [] for video_metadata in video_metadatas: if video_metadata.filesize is None: @@ -390,7 +390,7 @@ def _should_split_by_cutoff_time( should = cutoff_time < time_diff if should: LOG.debug( - "Split because the time gap %s seconds exceeds the cutoff time (%s seconds): %s: %s -> %s", + "Split because the capture time gap %s seconds exceeds cutoff_time (%s seconds): %s: %s -> %s", round(time_diff, 2), round(cutoff_time, 2), prev.filename.parent, @@ -408,7 +408,7 @@ def _should_split_by_cutoff_time( assert sum(len(s) for s in output_sequences) == sum(len(s) for s in input_sequences) LOG.info( - "Found %s sequences after split by cutoff time %d seconds", + "Found %s sequences after split by cutoff_time %d seconds", len(output_sequences), cutoff_time, ) @@ -429,7 +429,7 @@ def _should_split_by_cutoff_distance( should = cutoff_distance < distance if should: LOG.debug( - "Split because the distance gap %s meters exceeds the cutoff distance (%s meters): %s: %s -> %s", + "Split because the distance gap %s meters exceeds cutoff_distance (%s meters): %s: %s -> %s", round(distance, 2), round(cutoff_distance, 2), prev.filename.parent, @@ -447,7 +447,7 @@ def _should_split_by_cutoff_distance( assert sum(len(s) for s in output_sequences) == sum(len(s) for s in input_sequences) LOG.info( - "Found %s sequences after split by cutoff distance %d meters", + "Found %s sequences after split by cutoff_distance %d meters", len(output_sequences), cutoff_distance, ) @@ -616,7 +616,7 @@ def process_sequence_properties( for sequence in sequences: _interpolate_subsecs_for_sorting(sequence) - # Cut sequences by cutoff time + # Split sequences by cutoff time # NOTE: Do not split by distance here because it affects the speed limit check sequences = _split_sequences_by_cutoff_time(sequences, cutoff_time=cutoff_time) @@ -635,7 +635,7 @@ def process_sequence_properties( image.angle = None geo.interpolate_directions_if_none(sequence) - # Cut sequences by max number of images, max filesize, and max pixels + # Split sequences by max number of images, max filesize, and max pixels sequences = _split_sequences_by_limits( sequences, max_sequence_filesize_in_bytes=max_sequence_filesize_in_bytes, @@ -650,7 +650,7 @@ def process_sequence_properties( ) error_metadatas.extend(errors) - # Cut sequences by cutoff distance + # Split sequences by cutoff distance # NOTE: The speed limit check probably rejects most of anomalies sequences = _split_sequences_by_cutoff_distance( sequences, cutoff_distance=cutoff_distance From fd93179aab60e68a95f1741cec414fee3a881800 Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Mon, 17 Mar 2025 14:45:36 -0700 Subject: [PATCH 4/5] add tests --- tests/unit/test_sequence_processing.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/unit/test_sequence_processing.py b/tests/unit/test_sequence_processing.py index b0c374c8f..45f35a128 100644 --- a/tests/unit/test_sequence_processing.py +++ b/tests/unit/test_sequence_processing.py @@ -595,6 +595,26 @@ def test_video_error(tmpdir: py.path.local): duplicate_distance=100, duplicate_angle=5, ) + metadata_by_filename = {m.filename.name: m for m in metadatas} + assert isinstance(metadata_by_filename["test_good.mp4"], types.VideoMetadata) + assert isinstance( + metadata_by_filename["test_video_null_island.mp4"], types.ErrorMetadata + ) and isinstance( + metadata_by_filename["test_video_null_island.mp4"].error, + exceptions.MapillaryNullIslandError, + ) + assert isinstance( + metadata_by_filename["test_video_file_too_large.mp4"], types.ErrorMetadata + ) and isinstance( + metadata_by_filename["test_video_file_too_large.mp4"].error, + exceptions.MapillaryFileTooLargeError, + ) + assert isinstance( + metadata_by_filename["test_video_too_fast.mp4"], types.ErrorMetadata + ) and isinstance( + metadata_by_filename["test_video_too_fast.mp4"].error, + exceptions.MapillaryCaptureSpeedTooFastError, + ) def test_split_sequence_by(): From b7bd98709a69003bd2d04d849b67bec35dce6a1e Mon Sep 17 00:00:00 2001 From: Tao Peng Date: Mon, 17 Mar 2025 14:54:19 -0700 Subject: [PATCH 5/5] add tests --- tests/unit/test_sequence_processing.py | 47 ++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/tests/unit/test_sequence_processing.py b/tests/unit/test_sequence_processing.py index 45f35a128..475002366 100644 --- a/tests/unit/test_sequence_processing.py +++ b/tests/unit/test_sequence_processing.py @@ -668,3 +668,50 @@ def split_by_distance(prev, cur): sequences = psp.split_sequence_by([], split_by_time) assert len(sequences) == 0 + + +def test_split_sequence_by_agg(tmpdir): + curdir = tmpdir.mkdir("hello77").mkdir("world88") + sequence: T.List[types.Metadata] = [ + # s1 + _make_image_metadata( + Path(curdir) / Path("./a.jpg"), + 2, + 2, + 1, + filesize=110 * 1024 * 1024 * 1024, + ), + # s2 + _make_image_metadata( + Path(curdir) / Path("./b.jpg"), + 2.00001, + 2.00001, + 2, + filesize=1, + ), + # s3 + _make_image_metadata( + Path(curdir) / Path("./c.jpg"), + 2.00002, + 2.00002, + 3, + filesize=110 * 1024 * 1024 * 1024 - 1, + ), + _make_image_metadata( + Path(curdir) / Path("./c.jpg"), + 2.00003, + 2.00003, + 4, + filesize=1, + ), + ] + + metadatas = psp.process_sequence_properties( + sequence, + cutoff_distance=1000000000, + cutoff_time=100, + interpolate_directions=True, + duplicate_distance=0.1, + duplicate_angle=0.1, + ) + assert 3 == len({m.MAPSequenceUUID for m in metadatas}) # type: ignore