Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions OTAnalytics/plugin_ui/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ async def _export_events(

for event_format in self._run_config.event_formats:
event_list_exporter = self._provide_eventlist_exporter(event_format)
actual_save_path = save_path.with_suffix(
f".events{event_list_exporter.get_extension()}"
actual_save_path = save_path.parent / (
save_path.name + f".events{event_list_exporter.get_extension()}"
)

event_export_specification = EventExportSpecification(
Expand All @@ -284,8 +284,8 @@ async def _export_events(
logger().info(f"Event list saved at '{actual_save_path}'")
await self._after_event_file_export(actual_save_path)

assignment_path = save_path.with_suffix(
f".{CONTEXT_FILE_TYPE_ROAD_USER_ASSIGNMENTS}.csv"
assignment_path = save_path.parent / (
save_path.name + f".{CONTEXT_FILE_TYPE_ROAD_USER_ASSIGNMENTS}.csv"
)
specification = ExportSpecification(
save_path=assignment_path, format=CSV_FORMAT.name, mode=export_mode
Expand Down Expand Up @@ -322,10 +322,9 @@ async def _do_export_counts(self, save_path: Path, export_mode: ExportMode) -> N
if modes is None:
raise ValueError("modes is None but has to be defined for exporting counts")
for count_interval in self._run_config.count_intervals:
output_file = save_path.with_suffix(
f".{CONTEXT_FILE_TYPE_COUNTS}_{count_interval}"
f"{DEFAULT_COUNT_INTERVAL_TIME_UNIT}."
f"{DEFAULT_COUNTS_FILE_TYPE}"
output_file = save_path.parent / (
save_path.name + f".{CONTEXT_FILE_TYPE_COUNTS}_{count_interval}"
f"{DEFAULT_COUNT_INTERVAL_TIME_UNIT}.{DEFAULT_COUNTS_FILE_TYPE}"
)
counting_specification = CountingSpecificationDto(
start=start,
Expand Down Expand Up @@ -363,8 +362,8 @@ async def _do_export_track_statistics(
self, save_path: Path, export_mode: ExportMode
) -> None:
logger().info("Create track statistics ...")
track_statistics_path = save_path.with_suffix(
f".{CONTEXT_FILE_TYPE_TRACK_STATISTICS}.csv"
track_statistics_path = save_path.parent / (
save_path.name + f".{CONTEXT_FILE_TYPE_TRACK_STATISTICS}.csv"
)
specification = TrackStatisticsExportSpecification(
save_path=track_statistics_path,
Expand Down
105 changes: 105 additions & 0 deletions tests/unit/OTAnalytics/plugin_ui/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)
from OTAnalytics.application.config import (
CONTEXT_FILE_TYPE_COUNTS,
CONTEXT_FILE_TYPE_TRACK_STATISTICS,
DEFAULT_COUNT_INTERVAL_TIME_UNIT,
DEFAULT_COUNTS_FILE_TYPE,
DEFAULT_EVENTLIST_FILE_TYPE,
Expand Down Expand Up @@ -98,6 +99,7 @@
from OTAnalytics.application.use_cases.track_statistics import CalculateTrackStatistics
from OTAnalytics.application.use_cases.track_statistics_export import (
ExportTrackStatistics,
TrackStatisticsExportSpecification,
)
from OTAnalytics.domain.event import EventRepository
from OTAnalytics.domain.progress import NoProgressbarBuilder
Expand Down Expand Up @@ -1022,6 +1024,109 @@ async def test_use_video_start_and_end_for_counting(
)
export_counts.export.assert_called_with(expected_specification)

@pytest.mark.asyncio
@pytest.mark.parametrize(
"mode",
[CliMode.STREAM, CliMode.BULK],
)
async def test_do_export_counts_preserves_filename_with_multiple_dots(
self,
mode: CliMode,
test_data_tmp_dir: Path,
mock_cli_stream_dependencies: dict[str, Mock],
mock_cli_bulk_dependencies: dict[str, Mock],
) -> None:
filename_with_dots = (
"first5min_FOOBAR1234_1998_04_26-1500.00000_1998-04-26_15-00-00"
)
save_path = test_data_tmp_dir / filename_with_dots
start_date = datetime(1998, 8, 28, 15, 0)
end_date = datetime(1998, 4, 28, 15, 15)
classifications = frozenset(["car", "bike"])
interval = 15
expected_output_file = (
test_data_tmp_dir
/ f"{filename_with_dots}.{CONTEXT_FILE_TYPE_COUNTS}_{interval}"
f"{DEFAULT_COUNT_INTERVAL_TIME_UNIT}.{DEFAULT_COUNTS_FILE_TYPE}"
)

if mode == CliMode.STREAM:
dependencies = mock_cli_stream_dependencies
else:
dependencies = mock_cli_bulk_dependencies

dependencies[self.GET_ALL_TRACK_IDS].return_value = [TrackId("1")]
dependencies[self.VIDEOS_METADATA].first_video_start = start_date
dependencies[self.VIDEOS_METADATA].last_video_end = end_date
dependencies[self.TRACKS_METADATA].filtered_detection_classifications = (
classifications
)

run_config = Mock()
run_config.count_intervals = {interval}
run_config.counting_event = CountingEvent.START

cli: OTAnalyticsCli = self.init_cli_with(
mode, dependencies, dependencies, run_config
)

await cli._do_export_counts(save_path, OVERWRITE)

expected_specification = CountingSpecificationDto(
start=start_date,
end=end_date,
interval_in_minutes=interval,
modes=list(classifications),
output_format="CSV",
output_file=str(expected_output_file),
export_mode=OVERWRITE,
)
dependencies[self.EXPORT_COUNTS].export.assert_called_with(
expected_specification
)

@pytest.mark.asyncio
@pytest.mark.parametrize(
"mode",
[CliMode.STREAM, CliMode.BULK],
)
async def test_do_export_track_statistics_preserves_filename_with_multiple_dots(
self,
mode: CliMode,
test_data_tmp_dir: Path,
mock_cli_stream_dependencies: dict[str, Mock],
mock_cli_bulk_dependencies: dict[str, Mock],
) -> None:
filename_with_dots = (
"first5min_FOOBAR1234_1998_04_26-1500.00000_1998-04-26_15-00-00"
)
save_path = test_data_tmp_dir / filename_with_dots
expected_track_statistics_path = (
test_data_tmp_dir
/ f"{filename_with_dots}.{CONTEXT_FILE_TYPE_TRACK_STATISTICS}.csv"
)

if mode == CliMode.STREAM:
dependencies = mock_cli_stream_dependencies
else:
dependencies = mock_cli_bulk_dependencies

run_config = Mock()
cli: OTAnalyticsCli = self.init_cli_with(
mode, dependencies, dependencies, run_config
)

await cli._do_export_track_statistics(save_path, OVERWRITE)

expected_specification = TrackStatisticsExportSpecification(
save_path=expected_track_statistics_path,
format="CSV",
export_mode=OVERWRITE,
)
dependencies[self.EXPORT_TRACK_STATISTICS].export.assert_called_with(
expected_specification
)

@pytest.mark.parametrize(
"mode",
[CliMode.STREAM, CliMode.BULK],
Expand Down
Loading