Skip to content
11 changes: 8 additions & 3 deletions src/flockwave/server/ext/crazyflie/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from flockwave.server.model.uav import BatteryInfo, UAVBase, UAVDriver, VersionInfo
from flockwave.server.registries.errors import RegistryFull
from flockwave.server.show import (
ShowSpecification,
TrajectorySpecification,
get_group_index_from_show_specification,
get_home_position_from_show_specification,
Expand Down Expand Up @@ -390,14 +391,16 @@ async def handle_command_trick(self, uav: "CrazyflieUAV", *params: str) -> str:
f"hover time = {hover_time} s"
)

async def handle_command___show_upload(self, uav: "CrazyflieUAV", *, show):
async def handle_command___show_upload(
self, uav: "CrazyflieUAV", *, show: ShowSpecification
):
"""Handles a drone show upload request for the given UAV.

This is a temporary solution until we figure out something that is
more sustainable in the long run.

Parameters:
show: the show data
show: the show data for a single UAV
"""
await uav.upload_show(show, remember=True)
if self.preferred_controller is not None:
Expand Down Expand Up @@ -1019,7 +1022,9 @@ async def test_component(self, component: str) -> None:
else:
raise NotSupportedError

async def upload_show(self, show, *, remember: bool = True) -> None:
async def upload_show(
self, show: ShowSpecification, *, remember: bool = True
) -> None:
home = get_home_position_from_show_specification(show)
trajectory = get_trajectory_from_show_specification(show)
group_index = get_group_index_from_show_specification(show)
Expand Down
9 changes: 6 additions & 3 deletions src/flockwave/server/ext/mavlink/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
VersionInfo,
)
from flockwave.server.show import (
ShowSpecification,
get_altitude_reference_from_show_specification,
get_coordinate_system_from_show_specification,
get_geofence_configuration_from_show_specification,
Expand Down Expand Up @@ -383,14 +384,16 @@ async def handle_command_show(self, uav: "MAVLinkUAV", command: str | None = Non
else:
raise RuntimeError(f"Unknown subcommand: {command!r}")

async def handle_command___show_upload(self, uav: "MAVLinkUAV", *, show):
async def handle_command___show_upload(
self, uav: "MAVLinkUAV", *, show: ShowSpecification
):
"""Handles a drone show upload request for the given UAV.

This is a temporary solution until we figure out something that is
more sustainable in the long run.

Parameters:
show: the show data
show: the show data for a single UAV
"""
try:
await uav.upload_show(show)
Expand Down Expand Up @@ -2359,7 +2362,7 @@ async def trigger_camera_shutter(self) -> None:
if not success:
raise RuntimeError("Failed to trigger camera shutter")

async def upload_show(self, show) -> None:
async def upload_show(self, show: ShowSpecification) -> None:
coordinate_system = get_coordinate_system_from_show_specification(show)
if coordinate_system.type != "nwu":
raise RuntimeError("Only NWU coordinate systems are supported")
Expand Down
17 changes: 9 additions & 8 deletions src/flockwave/server/ext/show/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

from logging import Logger
from time import monotonic
from typing import Any
from typing import Any, cast

from flockwave.gps.vectors import FlatEarthToGPSCoordinateTransformation

from flockwave.server.model import Client, FlockwaveMessage
from flockwave.server.show import ShowSpecification

from .metadata import ShowMetadata

Expand Down Expand Up @@ -55,7 +56,6 @@ def __init__(self, log: Logger):
Parameter:
log: logger that the middleware will write to
"""
self._last_show_metadata = None
self._last_show_upload_command_at = monotonic() - 1000
self._log = log

Expand Down Expand Up @@ -84,7 +84,7 @@ def __call__(self, message: FlockwaveMessage, sender: Client) -> FlockwaveMessag

return message

def _extract_show(self, message: FlockwaveMessage) -> dict[str, Any] | None:
def _extract_show(self, message: FlockwaveMessage) -> ShowSpecification | None:
"""Checks whether the given message is a show upload and extracts the
show specification out of the message if it is.
"""
Expand All @@ -104,20 +104,21 @@ def last_show_metadata(self) -> ShowMetadata | None:
return self._last_show_metadata

@staticmethod
def _get_show_fingerprint(show: dict[str, Any]) -> ShowFingerprint:
def _get_show_fingerprint(show: ShowSpecification) -> ShowFingerprint:
"""Extracts the basic show parameters like the origin and the orientation
from the upload. These are used to decide whether an upload attempt is
probably a continuation of an ongoing sequence of requests from the
client or a completely new one.
"""
show_dict = cast(dict[str, Any], show)
return [
get(show, "mission", "id"),
get(show, "coordinateSystem"),
get(show, "amslReference"),
get(show_dict, "mission", "id"),
get(show_dict, "coordinateSystem"),
get(show_dict, "amslReference"),
]

@staticmethod
def _get_metadata_from_upload_request(show: dict[str, Any]) -> ShowMetadata:
def _get_metadata_from_upload_request(show: ShowSpecification) -> ShowMetadata:
"""Extracts the metadata of the current show being uploaded. This is
returned to consumers of the API of the show extension when the caller
requests the metadata of the last uploaded show.
Expand Down
9 changes: 6 additions & 3 deletions src/flockwave/server/ext/virtual_uavs/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from flockwave.server.model.transport import TransportOptions
from flockwave.server.model.uav import UAVBase, UAVDriver, VersionInfo
from flockwave.server.show import (
ShowSpecification,
TrajectoryPlayer,
TrajectorySpecification,
get_coordinate_system_from_show_specification,
Expand Down Expand Up @@ -563,7 +564,7 @@ def handle_mission_upload(self, bundle: MissionItemBundle, format: str) -> None:

self.update_status(mode="mission" if self.has_mission_items else "stab")

def handle_show_upload(self, show) -> None:
def handle_show_upload(self, show: ShowSpecification) -> None:
"""Handles the upload of a full drone show (trajectory + light program).

Parameters:
Expand Down Expand Up @@ -1164,14 +1165,16 @@ async def handle_command___mission_upload(
uav.handle_mission_upload(bundle, format)
await sleep(0.25 + random() * 0.5)

async def handle_command___show_upload(self, uav: VirtualUAV, *, show) -> None:
async def handle_command___show_upload(
self, uav: VirtualUAV, *, show: ShowSpecification
) -> None:
"""Handles a drone show upload request for the given UAV.

This is a temporary solution until we figure out something that is
more sustainable in the long run.

Parameters:
show: the show data
show: the show data for a single UAV
"""
uav.handle_show_upload(show)
await sleep(0.25 + random() * 0.5)
Expand Down
5 changes: 4 additions & 1 deletion src/flockwave/server/model/mission.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
from dataclasses import dataclass, field
from enum import Enum
from itertools import chain
from typing import Any, TypedDict
from typing import Any, TypedDict, cast

from flockwave.server.show import (
ShowSpecification,
get_flight_area_configuration_from_show_specification,
get_geofence_configuration_from_show_specification,
get_safety_configuration_from_show_specification,
Expand Down Expand Up @@ -1222,6 +1223,7 @@ def from_json(cls, obj: MissionItem):
# we need a "flightArea" and a "coordinateSystem" entry, where the latter
# can be "geodetic" or a complete JSON representation of a
# FlatEarthToGPSCoordinateTransformation
params = cast(ShowSpecification, params)
flight_area = get_flight_area_configuration_from_show_specification(params)

return cls(id=id, participants=participants, flight_area=flight_area)
Expand Down Expand Up @@ -1261,6 +1263,7 @@ def from_json(cls, obj: MissionItem):
# we need a "geofence" and a "coordinateSystem" entry, where the latter
# can be "geodetic" or a complete JSON representation of a
# FlatEarthToGPSCoordinateTransformation
params = cast(ShowSpecification, params)
geofence = get_geofence_configuration_from_show_specification(params)

return cls(id=id, participants=participants, geofence=geofence)
Expand Down
3 changes: 2 additions & 1 deletion src/flockwave/server/show/flight_area.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from flockwave.server.utils import optional_float

from .specification import (
ShowSpecification,
get_coordinate_system_from_show_specification,
is_coordinate_system_in_show_specification_geodetic,
)
Expand All @@ -25,7 +26,7 @@


def get_flight_area_configuration_from_show_specification(
show: dict,
show: ShowSpecification,
) -> FlightAreaConfigurationRequest:
result = FlightAreaConfigurationRequest()

Expand Down
3 changes: 2 additions & 1 deletion src/flockwave/server/show/geofence.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from flockwave.server.utils import optional_float

from .specification import (
ShowSpecification,
get_coordinate_system_from_show_specification,
is_coordinate_system_in_show_specification_geodetic,
)
Expand All @@ -26,7 +27,7 @@


def get_geofence_configuration_from_show_specification(
show: dict,
show: ShowSpecification,
) -> GeofenceConfigurationRequest:
result = GeofenceConfigurationRequest()

Expand Down
4 changes: 3 additions & 1 deletion src/flockwave/server/show/lights.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

from base64 import b64decode

from .specification import ShowSpecification

__all__ = ("get_light_program_from_show_specification",)


def get_light_program_from_show_specification(show: dict) -> bytes:
def get_light_program_from_show_specification(show: ShowSpecification) -> bytes:
"""Returns the raw Skybrush light program as bytecode from the given
show specification object.
"""
Expand Down
22 changes: 20 additions & 2 deletions src/flockwave/server/show/specification.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any, TypedDict

from flockwave.gps.vectors import FlatEarthToGPSCoordinateTransformation

from .trajectory import TrajectorySpecification
Expand All @@ -14,8 +16,24 @@
)


ShowSpecification = dict
"""Type alias for show specification objects."""
class ShowSpecification(TypedDict, total=False):
"""Specification of a drone show for a single UAV as it appears
in show upload commands coming from Skybrush Live."""

cues: dict[str, Any]
validation: dict[str, Any]
trajectory: dict[str, Any]
lights: dict[str, Any]
rthPlan: dict[str, Any]
home: list[float]
landAt: list[float]
name: str
amslReference: float
coordinateSystem: dict[str, Any]
geofence: dict[str, Any]
mission: dict[str, Any]
group: int
flightArea: dict[str, Any]


def get_trajectory_from_show_specification(
Expand Down