diff --git a/mapillary_tools/process_sequence_properties.py b/mapillary_tools/process_sequence_properties.py index 7f77a7d5e..0b560805e 100644 --- a/mapillary_tools/process_sequence_properties.py +++ b/mapillary_tools/process_sequence_properties.py @@ -9,61 +9,69 @@ LOG = logging.getLogger(__name__) +SeqItem = T.TypeVar("SeqItem") PointLike = T.TypeVar("PointLike", bound=geo.Point) PointSequence = T.List[PointLike] -def cut_sequence_by_time_or_distance( - sequence: PointSequence, - cutoff_distance: T.Optional[float] = None, - cutoff_time: T.Optional[float] = None, -) -> T.List[PointSequence]: - sequences: T.List[PointSequence] = [] +def split_sequence_by( + sequence: T.List[SeqItem], + should_split: T.Callable[[SeqItem, SeqItem], bool], +) -> T.List[T.List[SeqItem]]: + """ + Split a sequence into multiple sequences by should_split(prev, cur) => True + """ + output_sequences: T.List[T.List[SeqItem]] = [] - if sequence: - sequences.append([sequence[0]]) + seq = iter(sequence) + + prev = next(seq, None) + if prev is None: + return output_sequences - for prev, cur in geo.pairwise(sequence): + output_sequences.append([prev]) + + for cur in seq: # invariant: prev is processed + if should_split(prev, cur): + output_sequences.append([cur]) + else: + output_sequences[-1].append(cur) + prev = cur + # invariant: cur is processed - # Cut by distance - distance = geo.gps_distance( - (prev.lat, prev.lon), - (cur.lat, cur.lon), - ) - if cutoff_distance is not None: - if cutoff_distance <= distance: - LOG.debug( - "Cut the sequence because the distance gap between two images (%s meters) exceeds the cutoff distance (%s meters): %s: %s -> %s", - round(distance, 2), - round(cutoff_distance, 2), - prev.filename.parent, - prev.filename.name, - cur.filename.name, - ) - sequences.append([cur]) - continue + assert sum(len(s) for s in output_sequences) == len(sequence) - # Cut by time - time_diff = cur.time - prev.time - assert 0 <= time_diff, "sequence must be sorted by capture times" - if cutoff_time is not None: - if cutoff_time <= time_diff: - LOG.debug( - "Cut the sequence because the time gap between two images (%s seconds) exceeds the cutoff time (%s seconds): %s: %s -> %s", - round(time_diff, 2), - round(cutoff_time, 2), - prev.filename.parent, - prev.filename.name, - cur.filename.name, - ) - sequences.append([cur]) - continue + return output_sequences - sequences[-1].append(cur) - # invariant: cur is processed - return sequences +def split_sequence_by_agg( + sequence: T.List[SeqItem], + should_split_with_sequence_state: T.Callable[[SeqItem, T.Dict], bool], +) -> T.List[T.List[SeqItem]]: + """ + Split a sequence by should_split_with_sequence_state(cur, sequence_state) => True + """ + output_sequences: T.List[T.List[SeqItem]] = [] + sequence_state: T.Dict = {} + + for cur in sequence: + start_new_sequence = should_split_with_sequence_state(cur, sequence_state) + + if not output_sequences: + output_sequences.append([]) + + if start_new_sequence: + # DO NOT reset the state because it contains the information of current item + # sequence_state = {} + if output_sequences[-1]: + output_sequences.append([]) + + output_sequences[-1].append(cur) + + assert sum(len(s) for s in output_sequences) == len(sequence) + + return output_sequences def duplication_check( @@ -74,13 +82,14 @@ def duplication_check( dedups: PointSequence = [] dups: T.List[types.ErrorMetadata] = [] - sequence_iter = iter(sequence) - prev = next(sequence_iter) + it = iter(sequence) + prev = next(it) if prev is None: return dedups, dups + dedups.append(prev) - for cur in sequence_iter: + for cur in it: # invariant: prev is processed distance = geo.gps_distance( (prev.lat, prev.lon), @@ -96,18 +105,17 @@ def duplication_check( angle_diff is None or angle_diff <= max_duplicate_angle ): msg = f"Duplicate of its previous image in terms of distance <= {max_duplicate_distance} and angle <= {max_duplicate_angle}" - dups.append( - types.describe_error_metadata( - exceptions.MapillaryDuplicationError( - msg, - types.as_desc(cur), - distance=distance, - angle_diff=angle_diff, - ), - cur.filename, - filetype=types.FileType.IMAGE, + dup = types.describe_error_metadata( + exceptions.MapillaryDuplicationError( + msg, + types.as_desc(cur), + distance=distance, + angle_diff=angle_diff, ), + cur.filename, + filetype=types.FileType.IMAGE, ) + dups.append(dup) # prev does not change else: dedups.append(cur) @@ -117,70 +125,6 @@ def duplication_check( return dedups, dups -def cut_sequence( - sequence: T.List[types.ImageMetadata], - max_images: int, - max_sequence_filesize: int, - max_sequence_pixels: int, -) -> T.List[T.List[types.ImageMetadata]]: - """ - Cut a sequence into multiple sequences by max_images or max filesize - """ - sequences: T.List[T.List[types.ImageMetadata]] = [] - last_sequence_file_size = 0 - last_sequence_pixels = 0 - - for image in sequence: - # decent default values if width/height not available - width = 1024 if image.width is None else image.width - height = 1024 if image.height is None else image.height - - filesize = os.path.getsize(image.filename) - - if len(sequences) == 0: - start_new_sequence = True - else: - if sequences[-1]: - if max_images < len(sequences[-1]): - LOG.debug( - "Cut the sequence because the current sequence (%s) reaches the max number of images (%s)", - len(sequences[-1]), - max_images, - ) - start_new_sequence = True - elif max_sequence_filesize < last_sequence_file_size + filesize: - LOG.debug( - "Cut the sequence because the current sequence (%s) reaches the max filesize (%s)", - last_sequence_file_size + filesize, - max_sequence_filesize, - ) - start_new_sequence = True - elif max_sequence_pixels < last_sequence_pixels + width * height: - LOG.debug( - "Cut the sequence because the current sequence (%s) reaches the max pixels (%s)", - last_sequence_pixels + width * height, - max_sequence_pixels, - ) - start_new_sequence = True - else: - start_new_sequence = False - else: - start_new_sequence = False - - if start_new_sequence: - sequences.append([]) - last_sequence_file_size = 0 - last_sequence_pixels = 0 - - sequences[-1].append(image) - last_sequence_file_size += filesize - last_sequence_pixels += width * height - - assert sum(len(s) for s in sequences) == len(sequence) - - return sequences - - def _group_by( image_metadatas: T.List[types.ImageMetadata], group_key_func=T.Callable[[types.ImageMetadata], T.Hashable], @@ -291,8 +235,8 @@ def _check_video_limits( max_sequence_filesize_in_bytes: int, max_avg_speed: float, ) -> T.Tuple[T.List[types.VideoMetadata], T.List[types.ErrorMetadata]]: - error_metadatas: T.List[types.ErrorMetadata] = [] output_video_metadatas: T.List[types.VideoMetadata] = [] + error_metadatas: T.List[types.ErrorMetadata] = [] for video_metadata in video_metadatas: if video_metadata.filesize is None: @@ -336,18 +280,24 @@ def _check_video_limits( else: output_video_metadatas.append(video_metadata) + LOG.info( + "Found %s videos and %s errors after video limit checks", + len(output_video_metadatas), + len(error_metadatas), + ) + return output_video_metadatas, error_metadatas -def _check_sequence_limits( - sequences: T.Sequence[PointSequence], +def _check_sequences_by_limits( + input_sequences: T.Sequence[PointSequence], max_sequence_filesize_in_bytes: int, max_avg_speed: float, ) -> T.Tuple[T.List[PointSequence], T.List[types.ErrorMetadata]]: - error_metadatas: T.List[types.ErrorMetadata] = [] output_sequences: T.List[PointSequence] = [] + output_errors: T.List[types.ErrorMetadata] = [] - for sequence in sequences: + for sequence in input_sequences: filesize = 0 for image in sequence: if image.filesize is None: @@ -357,7 +307,7 @@ def _check_sequence_limits( if filesize > max_sequence_filesize_in_bytes: for image in sequence: - error_metadatas.append( + output_errors.append( types.describe_error_metadata( exc=exceptions.MapillaryFileTooLargeError( f"Sequence file size exceeds the maximum allowed file size ({max_sequence_filesize_in_bytes} bytes)", @@ -368,7 +318,7 @@ def _check_sequence_limits( ) elif any(image.lat == 0 and image.lon == 0 for image in sequence): for image in sequence: - error_metadatas.append( + output_errors.append( types.describe_error_metadata( exc=exceptions.MapillaryNullIslandError( "Found GPS coordinates in Null Island (0, 0)", @@ -379,7 +329,7 @@ def _check_sequence_limits( ) elif len(sequence) >= 2 and _avg_speed(sequence) > max_avg_speed: for image in sequence: - error_metadatas.append( + output_errors.append( types.describe_error_metadata( exc=exceptions.MapillaryCaptureSpeedTooFastError( f"Capture speed is too fast (exceeds {round(max_avg_speed, 3)} m/s)", @@ -391,49 +341,22 @@ def _check_sequence_limits( else: output_sequences.append(sequence) - return output_sequences, error_metadatas - - -def process_sequence_properties( - metadatas: T.Sequence[types.MetadataOrError], - cutoff_distance: float = constants.CUTOFF_DISTANCE, - cutoff_time: float = constants.CUTOFF_TIME, - interpolate_directions: bool = False, - duplicate_distance: float = constants.DUPLICATE_DISTANCE, - duplicate_angle: float = constants.DUPLICATE_ANGLE, - max_avg_speed: float = constants.MAX_AVG_SPEED, -) -> T.List[types.MetadataOrError]: - max_sequence_filesize_in_bytes = _parse_filesize_in_bytes( - constants.MAX_SEQUENCE_FILESIZE + assert sum(len(s) for s in output_sequences) + len(output_errors) == sum( + len(s) for s in input_sequences ) - max_sequence_pixels = _parse_pixels(constants.MAX_SEQUENCE_PIXELS) - - error_metadatas: T.List[types.ErrorMetadata] = [] - image_metadatas: T.List[types.ImageMetadata] = [] - video_metadatas: T.List[types.VideoMetadata] = [] - for metadata in metadatas: - if isinstance(metadata, types.ErrorMetadata): - error_metadatas.append(metadata) - elif isinstance(metadata, types.ImageMetadata): - image_metadatas.append(metadata) - elif isinstance(metadata, types.VideoMetadata): - video_metadatas.append(metadata) - else: - raise RuntimeError(f"invalid metadata type: {metadata}") - - # Check limits for videos - video_metadatas, video_error_metadatas = _check_video_limits( - video_metadatas, - max_sequence_filesize_in_bytes=max_sequence_filesize_in_bytes, - max_avg_speed=max_avg_speed, + LOG.info( + "Found %s sequences and %s errors after sequence limit checks", + len(output_sequences), + len(output_errors), ) - error_metadatas.extend(video_error_metadatas) - input_sequences: T.List[PointSequence] - output_sequences: T.List[PointSequence] + return output_sequences, output_errors + - # Group by folder and camera +def _group_by_folder_and_camera( + image_metadatas: T.List[types.ImageMetadata], +) -> T.List[T.List[types.ImageMetadata]]: grouped = _group_by( image_metadatas, lambda metadata: ( @@ -447,41 +370,99 @@ def process_sequence_properties( for key in grouped: LOG.debug("Group sequences by %s: %s images", key, len(grouped[key])) output_sequences = list(grouped.values()) + LOG.info( - "Found %s sequences from different folders and cameras", len(output_sequences) + "Found %s sequences from different folders and cameras", + len(output_sequences), ) - # Make sure each sequence is sorted (in-place update) - input_sequences = output_sequences - for sequence in input_sequences: - sequence.sort( - key=lambda metadata: metadata.sort_key(), - ) - output_sequences = input_sequences + return output_sequences - # Interpolate subseconds for same timestamps (in-place update) - input_sequences = output_sequences - for sequence in input_sequences: - _interpolate_subsecs_for_sorting(sequence) - output_sequences = input_sequences - # Cut sequences by cutoff time - # NOTE: Do not cut by distance here because it affects the speed limit check - input_sequences = output_sequences +def _split_sequences_by_cutoff_time( + input_sequences: T.List[PointSequence], cutoff_time: float +) -> T.List[PointSequence]: + def _should_split_by_cutoff_time( + prev: types.ImageMetadata, cur: types.ImageMetadata + ) -> bool: + time_diff = cur.time - prev.time + assert 0 <= time_diff, "sequence must be sorted by capture times" + should = cutoff_time < time_diff + if should: + LOG.debug( + "Split because the capture time gap %s seconds exceeds cutoff_time (%s seconds): %s: %s -> %s", + round(time_diff, 2), + round(cutoff_time, 2), + prev.filename.parent, + prev.filename.name, + cur.filename.name, + ) + return should + output_sequences = [] for sequence in input_sequences: output_sequences.extend( - cut_sequence_by_time_or_distance(sequence, cutoff_time=cutoff_time) + split_sequence_by(sequence, should_split=_should_split_by_cutoff_time) ) + + assert sum(len(s) for s in output_sequences) == sum(len(s) for s in input_sequences) + LOG.info( - "Found %s sequences after cut by cutoff time %d seconds", + "Found %s sequences after split by cutoff_time %d seconds", len(output_sequences), cutoff_time, ) - # Duplication check - input_sequences = output_sequences + return output_sequences + + +def _split_sequences_by_cutoff_distance( + input_sequences: T.List[PointSequence], cutoff_distance: float +) -> T.List[PointSequence]: + def _should_split_by_cutoff_distance( + prev: types.ImageMetadata, cur: types.ImageMetadata + ) -> bool: + distance = geo.gps_distance( + (prev.lat, prev.lon), + (cur.lat, cur.lon), + ) + should = cutoff_distance < distance + if should: + LOG.debug( + "Split because the distance gap %s meters exceeds cutoff_distance (%s meters): %s: %s -> %s", + round(distance, 2), + round(cutoff_distance, 2), + prev.filename.parent, + prev.filename.name, + cur.filename.name, + ) + return should + output_sequences = [] + for sequence in input_sequences: + output_sequences.extend( + split_sequence_by(sequence, _should_split_by_cutoff_distance) + ) + + assert sum(len(s) for s in output_sequences) == sum(len(s) for s in input_sequences) + + LOG.info( + "Found %s sequences after split by cutoff_distance %d meters", + len(output_sequences), + cutoff_distance, + ) + + return output_sequences + + +def _check_sequences_duplication( + input_sequences: T.List[PointSequence], + duplicate_distance: float, + duplicate_angle: float, +) -> T.Tuple[T.List[PointSequence], T.List[types.ErrorMetadata]]: + output_sequences: T.List[PointSequence] = [] + output_errors: T.List[types.ErrorMetadata] = [] + for sequence in input_sequences: output_sequence, errors = duplication_check( sequence, @@ -490,74 +471,211 @@ def process_sequence_properties( ) assert len(sequence) == len(output_sequence) + len(errors) output_sequences.append(output_sequence) - error_metadatas.extend(errors) + output_errors.extend(errors) - # Interpolate angles (in-place update) - input_sequences = output_sequences - for sequence in input_sequences: - if interpolate_directions: - for image in sequence: - image.angle = None - geo.interpolate_directions_if_none(sequence) - output_sequences = input_sequences + assert sum(len(s) for s in output_sequences) + len(output_errors) == sum( + len(s) for s in input_sequences + ) - # Cut sequences by max number of images, max filesize, and max pixels - input_sequences = output_sequences - output_sequences = [] - for sequence in input_sequences: - output_sequences.extend( - cut_sequence( - sequence, - constants.MAX_SEQUENCE_LENGTH, - max_sequence_filesize_in_bytes, + LOG.info( + "Found %s sequences and %s errors after duplication check", + len(output_sequences), + len(output_errors), + ) + + return output_sequences, output_errors + + +def _split_sequences_by_limits( + input_sequences: T.List[PointSequence], + max_sequence_filesize_in_bytes: float, + max_sequence_pixels: float, +) -> T.List[PointSequence]: + max_sequence_images = constants.MAX_SEQUENCE_LENGTH + max_sequence_filesize = max_sequence_filesize_in_bytes + + def _should_split(image: types.ImageMetadata, sequence_state: T.Dict) -> bool: + last_sequence_images = sequence_state.get("last_sequence_images", 0) + last_sequence_file_size = sequence_state.get("last_sequence_file_size", 0) + last_sequence_pixels = sequence_state.get("last_sequence_pixels", 0) + + # decent default values if width/height not available + width = 1024 if image.width is None else image.width + height = 1024 if image.height is None else image.height + pixels = width * height + + if image.filesize is None: + filesize = os.path.getsize(image.filename) + else: + filesize = image.filesize + + new_sequence_images = last_sequence_images + 1 + new_sequence_file_size = last_sequence_file_size + filesize + new_sequence_pixels = last_sequence_pixels + pixels + + if max_sequence_images < new_sequence_images: + LOG.debug( + "Split because the current sequence (%s) reaches the max number of images (%s)", + new_sequence_images, + max_sequence_images, + ) + start_new_sequence = True + elif max_sequence_filesize < new_sequence_file_size: + LOG.debug( + "Split because the current sequence (%s) reaches the max filesize (%s)", + new_sequence_file_size, + max_sequence_filesize, + ) + start_new_sequence = True + elif max_sequence_pixels < new_sequence_pixels: + LOG.debug( + "Split because the current sequence (%s) reaches the max pixels (%s)", + new_sequence_pixels, max_sequence_pixels, ) - ) - LOG.info("Found %s sequences after cut by sequence limits", len(output_sequences)) + start_new_sequence = True + else: + start_new_sequence = False - # Check limits for sequences - input_sequences = output_sequences - output_sequences, errors = _check_sequence_limits( - input_sequences, max_sequence_filesize_in_bytes, max_avg_speed - ) - error_metadatas.extend(errors) - LOG.info("Found %s sequences after sequence limit checks", len(output_sequences)) + if not start_new_sequence: + sequence_state["last_sequence_images"] = new_sequence_images + sequence_state["last_sequence_file_size"] = new_sequence_file_size + sequence_state["last_sequence_pixels"] = new_sequence_pixels + else: + sequence_state["last_sequence_images"] = 1 + sequence_state["last_sequence_file_size"] = filesize + sequence_state["last_sequence_pixels"] = pixels + + return start_new_sequence - # Cut sequences by cutoff distance - # NOTE: The speed limit check probably rejects most of anomalies - input_sequences = output_sequences output_sequences = [] for sequence in input_sequences: output_sequences.extend( - cut_sequence_by_time_or_distance(sequence, cutoff_distance=cutoff_distance) + split_sequence_by_agg( + sequence, should_split_with_sequence_state=_should_split + ) ) - LOG.info( - "Found %s sequences after cut by cutoff distance %d meters", - len(output_sequences), - cutoff_distance, + + assert sum(len(s) for s in output_sequences) == sum(len(s) for s in input_sequences) + + LOG.info("Found %s sequences after split by sequence limits", len(output_sequences)) + + return output_sequences + + +def process_sequence_properties( + metadatas: T.Sequence[types.MetadataOrError], + cutoff_distance: float = constants.CUTOFF_DISTANCE, + cutoff_time: float = constants.CUTOFF_TIME, + interpolate_directions: bool = False, + duplicate_distance: float = constants.DUPLICATE_DISTANCE, + duplicate_angle: float = constants.DUPLICATE_ANGLE, + max_avg_speed: float = constants.MAX_AVG_SPEED, +) -> T.List[types.MetadataOrError]: + max_sequence_filesize_in_bytes = _parse_filesize_in_bytes( + constants.MAX_SEQUENCE_FILESIZE ) + max_sequence_pixels = _parse_pixels(constants.MAX_SEQUENCE_PIXELS) - # Assign sequence UUIDs (in-place update) - sequence_idx = 0 - input_sequences = output_sequences - for sequence in input_sequences: - for image in sequence: - # using incremental id as shorter "uuid", so we can save some space for the desc file - image.MAPSequenceUUID = str(sequence_idx) - sequence_idx += 1 - output_sequences = input_sequences + error_metadatas: T.List[types.ErrorMetadata] = [] + image_metadatas: T.List[types.ImageMetadata] = [] + video_metadatas: T.List[types.VideoMetadata] = [] - image_metadatas = [] - for sequence in input_sequences: - image_metadatas.extend(sequence) + for metadata in metadatas: + if isinstance(metadata, types.ErrorMetadata): + error_metadatas.append(metadata) + elif isinstance(metadata, types.ImageMetadata): + image_metadatas.append(metadata) + elif isinstance(metadata, types.VideoMetadata): + video_metadatas.append(metadata) + else: + raise RuntimeError(f"invalid metadata type: {metadata}") + + if video_metadatas: + # Check limits for videos + video_metadatas, video_error_metadatas = _check_video_limits( + video_metadatas, + max_sequence_filesize_in_bytes=max_sequence_filesize_in_bytes, + max_avg_speed=max_avg_speed, + ) + error_metadatas.extend(video_error_metadatas) + + if image_metadatas: + sequences: T.List[PointSequence] + + # Group by folder and camera + sequences = _group_by_folder_and_camera(image_metadatas) + + # Make sure each sequence is sorted (in-place update) + for sequence in sequences: + sequence.sort( + key=lambda metadata: metadata.sort_key(), + ) + + # Interpolate subseconds for same timestamps (in-place update) + for sequence in sequences: + _interpolate_subsecs_for_sorting(sequence) + + # Split sequences by cutoff time + # NOTE: Do not split by distance here because it affects the speed limit check + sequences = _split_sequences_by_cutoff_time(sequences, cutoff_time=cutoff_time) + + # Duplication check + sequences, errors = _check_sequences_duplication( + sequences, + duplicate_distance=duplicate_distance, + duplicate_angle=duplicate_angle, + ) + error_metadatas.extend(errors) + + # Interpolate angles (in-place update) + for sequence in sequences: + if interpolate_directions: + for image in sequence: + image.angle = None + geo.interpolate_directions_if_none(sequence) + + # Split sequences by max number of images, max filesize, and max pixels + sequences = _split_sequences_by_limits( + sequences, + max_sequence_filesize_in_bytes=max_sequence_filesize_in_bytes, + max_sequence_pixels=max_sequence_pixels, + ) + + # Check limits for sequences + sequences, errors = _check_sequences_by_limits( + sequences, + max_sequence_filesize_in_bytes=max_sequence_filesize_in_bytes, + max_avg_speed=max_avg_speed, + ) + error_metadatas.extend(errors) + + # Split sequences by cutoff distance + # NOTE: The speed limit check probably rejects most of anomalies + sequences = _split_sequences_by_cutoff_distance( + sequences, cutoff_distance=cutoff_distance + ) + + # Assign sequence UUIDs (in-place update) + sequence_idx = 0 + for sequence in sequences: + for image in sequence: + # using incremental id as shorter "uuid", so we can save some space for the desc file + image.MAPSequenceUUID = str(sequence_idx) + sequence_idx += 1 + + image_metadatas = [] + for sequence in sequences: + image_metadatas.extend(sequence) + + assert sequence_idx == len( + set(metadata.MAPSequenceUUID for metadata in image_metadatas) + ) results = error_metadatas + image_metadatas + video_metadatas assert len(metadatas) == len(results), ( f"expected {len(metadatas)} results but got {len(results)}" ) - assert sequence_idx == len( - set(metadata.MAPSequenceUUID for metadata in image_metadatas) - ) return results diff --git a/tests/unit/test_sequence_processing.py b/tests/unit/test_sequence_processing.py index f1370ff5d..475002366 100644 --- a/tests/unit/test_sequence_processing.py +++ b/tests/unit/test_sequence_processing.py @@ -595,3 +595,123 @@ def test_video_error(tmpdir: py.path.local): duplicate_distance=100, duplicate_angle=5, ) + metadata_by_filename = {m.filename.name: m for m in metadatas} + assert isinstance(metadata_by_filename["test_good.mp4"], types.VideoMetadata) + assert isinstance( + metadata_by_filename["test_video_null_island.mp4"], types.ErrorMetadata + ) and isinstance( + metadata_by_filename["test_video_null_island.mp4"].error, + exceptions.MapillaryNullIslandError, + ) + assert isinstance( + metadata_by_filename["test_video_file_too_large.mp4"], types.ErrorMetadata + ) and isinstance( + metadata_by_filename["test_video_file_too_large.mp4"].error, + exceptions.MapillaryFileTooLargeError, + ) + assert isinstance( + metadata_by_filename["test_video_too_fast.mp4"], types.ErrorMetadata + ) and isinstance( + metadata_by_filename["test_video_too_fast.mp4"].error, + exceptions.MapillaryCaptureSpeedTooFastError, + ) + + +def test_split_sequence_by(): + """Test split_sequence_by function.""" + # Create test points + p1 = geo.Point(1, 1.00000, 1.00000, 1, angle=0) + p2 = geo.Point(2, 1.00001, 1.00001, 2, angle=0) + p3 = geo.Point(3, 1.00002, 1.00002, 3, angle=0) + p4 = geo.Point(10, 1.00003, 1.00003, 4, angle=0) # Large time gap + p5 = geo.Point(11, 1.00004, 1.00004, 5, angle=0) + p6 = geo.Point(12, 1.10000, 1.10000, 6, angle=0) # Large distance gap + p7 = geo.Point(13, 1.10001, 1.10001, 7, angle=0) + + # Create a sequence of points + sequence = [p1, p2, p3, p4, p5, p6, p7] + + # Test split by time gaps (> 5 seconds) + split_by_time = lambda prev, cur: cur.time - prev.time > 5 + sequences = psp.split_sequence_by(sequence, split_by_time) + + # Should be split into two sequences [p1,p2,p3], [p4,p5,p6,p7] + assert len(sequences) == 2 + assert sequences[0] == [p1, p2, p3] + assert sequences[1] == [p4, p5, p6, p7] + + # Test split by large distance gaps + def split_by_distance(prev, cur): + distance = geo.gps_distance( + (prev.lat, prev.lon), + (cur.lat, cur.lon), + ) + should = distance > 1000 # Split if distance > 1000 meters + return should + + sequences = psp.split_sequence_by(sequence, split_by_distance) + + # Should be split into two sequences [p1,p2,p3,p4,p5], [p6,p7] + assert len(sequences) == 2 + assert sequences[0] == [p1, p2, p3, p4, p5] + assert sequences[1] == [p6, p7] + + # Test empty sequence + empty_sequences = psp.split_sequence_by([], split_by_time) + assert len(empty_sequences) == 0 + + # Test single point sequence + single_point = [p1] + single_sequences = psp.split_sequence_by(single_point, split_by_time) + assert len(single_sequences) == 1 + assert single_sequences[0] == [p1] + + sequences = psp.split_sequence_by([], split_by_time) + assert len(sequences) == 0 + + +def test_split_sequence_by_agg(tmpdir): + curdir = tmpdir.mkdir("hello77").mkdir("world88") + sequence: T.List[types.Metadata] = [ + # s1 + _make_image_metadata( + Path(curdir) / Path("./a.jpg"), + 2, + 2, + 1, + filesize=110 * 1024 * 1024 * 1024, + ), + # s2 + _make_image_metadata( + Path(curdir) / Path("./b.jpg"), + 2.00001, + 2.00001, + 2, + filesize=1, + ), + # s3 + _make_image_metadata( + Path(curdir) / Path("./c.jpg"), + 2.00002, + 2.00002, + 3, + filesize=110 * 1024 * 1024 * 1024 - 1, + ), + _make_image_metadata( + Path(curdir) / Path("./c.jpg"), + 2.00003, + 2.00003, + 4, + filesize=1, + ), + ] + + metadatas = psp.process_sequence_properties( + sequence, + cutoff_distance=1000000000, + cutoff_time=100, + interpolate_directions=True, + duplicate_distance=0.1, + duplicate_angle=0.1, + ) + assert 3 == len({m.MAPSequenceUUID for m in metadatas}) # type: ignore