-
Notifications
You must be signed in to change notification settings - Fork 24
Implement HMAC-Based Command Authentication with NVM-Backed Replay Attack Prevention #320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
abe8f9c
cd2a044
9d074fd
51b13f8
b4b7be8
3095769
3aae5f2
32e3f17
d0a665e
6dedc2f
26db833
91d72ed
0497842
6f5638b
7b22d8c
9547082
183f6b1
a69219d
99694a9
76e657d
afd00a0
c2ccbac
ba658ed
c15c753
828acc3
7904625
f0f398c
6f823df
7440fa1
a9fe3d1
49d44c9
46bf643
67fb3d2
9090797
ee24f3c
56d8872
9af4e74
3625263
5f16d89
28b4b35
d94f87b
d0991c5
48a6d46
1ad1770
5d70b55
7eed3e8
fc841bd
d32bb27
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
|
|
||
| from .config.config import Config | ||
| from .hardware.radio.packetizer.packet_manager import PacketManager | ||
| from .hmac_auth import HMACAuthenticator | ||
| from .logger import Logger | ||
|
|
||
|
|
||
|
|
@@ -55,6 +56,12 @@ def __init__( | |
| self._config: Config = config | ||
| self._packet_manager: PacketManager = packet_manager | ||
| self._send_delay: float = send_delay | ||
| self._hmac_authenticator: HMACAuthenticator = HMACAuthenticator( | ||
| config.hmac_secret | ||
| ) | ||
| self._last_valid_counter: int = ( | ||
| -1 | ||
| ) # Track last valid counter for replay prevention | ||
|
|
||
| def listen_for_commands(self, timeout: int) -> None: | ||
| """Listens for commands from the radio and handles them. | ||
|
|
@@ -73,7 +80,7 @@ def listen_for_commands(self, timeout: int) -> None: | |
|
|
||
| msg: dict[str, str] = json.loads(json_str) | ||
|
|
||
| # Check for OSCAR password first | ||
| # Check for OSCAR password first (legacy authentication) | ||
| if msg.get("password") == self.oscar_password: | ||
| self._log.debug("OSCAR command received", msg=msg) | ||
| cmd = msg.get("command") | ||
|
|
@@ -96,20 +103,70 @@ def listen_for_commands(self, timeout: int) -> None: | |
| self.oscar_command(cmd, args) | ||
| return | ||
|
|
||
| # If message has password field, check it | ||
| if msg.get("password") != self._config.super_secret_code: | ||
| self._log.debug( | ||
| "Invalid password in message", | ||
| msg=msg, | ||
| ) | ||
| return | ||
| # New HMAC-based authentication | ||
| hmac_value = msg.get("hmac") | ||
| counter_raw = msg.get("counter") | ||
|
|
||
| if msg.get("name") != self._config.cubesat_name: | ||
| self._log.debug( | ||
| "Satellite name mismatch in message", | ||
| msg=msg, | ||
| ) | ||
| return | ||
| if hmac_value is None or counter_raw is None: | ||
| # Fall back to password-based authentication for backward compatibility | ||
|
||
| if msg.get("password") != self._config.super_secret_code: | ||
| self._log.debug( | ||
| "Invalid password in message", | ||
| msg=msg, | ||
| ) | ||
| return | ||
|
|
||
| if msg.get("name") != self._config.cubesat_name: | ||
| self._log.debug( | ||
| "Satellite name mismatch in message", | ||
| msg=msg, | ||
| ) | ||
| return | ||
| else: | ||
| # Use HMAC authentication | ||
| # Convert counter to int | ||
| try: | ||
| counter: int = int(counter_raw) | ||
| except (ValueError, TypeError): | ||
| self._log.debug( | ||
| "Invalid counter in message", | ||
| counter=counter_raw, | ||
| ) | ||
| return | ||
|
|
||
| # Extract message without HMAC for verification | ||
| msg_without_hmac = {k: v for k, v in msg.items() if k != "hmac"} | ||
| message_str = json.dumps(msg_without_hmac, separators=(",", ":")) | ||
|
|
||
| # Verify HMAC | ||
| if not self._hmac_authenticator.verify_hmac( | ||
| message_str, counter, hmac_value | ||
| ): | ||
| self._log.debug( | ||
| "Invalid HMAC in message", | ||
| msg=msg, | ||
| ) | ||
| return | ||
|
|
||
| # Prevent replay attacks - counter must be greater than last valid counter | ||
| if counter <= self._last_valid_counter: | ||
|
||
| self._log.debug( | ||
| "Replay attack detected - counter not greater than last valid", | ||
| counter=counter, | ||
| last_valid=self._last_valid_counter, | ||
| ) | ||
| return | ||
|
|
||
| # Update last valid counter | ||
| self._last_valid_counter = counter | ||
|
|
||
| # Verify satellite name | ||
| if msg.get("name") != self._config.cubesat_name: | ||
| self._log.debug( | ||
| "Satellite name mismatch in message", | ||
| msg=msg, | ||
| ) | ||
| return | ||
|
|
||
| # If message has command field, execute the command | ||
| cmd = msg.get("command") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| """This module provides HMAC-based authentication for command messages. | ||
| This module implements HMAC (Hash-based Message Authentication Code) for | ||
| authenticating commands sent to the satellite. It provides protection against | ||
| unauthorized commands and replay attacks through the use of a shared secret | ||
| and packet counter. | ||
| **Usage:** | ||
| ```python | ||
| from pysquared.hmac_auth import HMACAuthenticator | ||
| # On the ground station | ||
| authenticator = HMACAuthenticator("shared_secret_key") | ||
| message = '{"command": "send_joke", "name": "MySat"}' | ||
| counter = 42 | ||
| hmac_value = authenticator.generate_hmac(message, counter) | ||
| # On the satellite | ||
| authenticator = HMACAuthenticator("shared_secret_key") | ||
| is_valid = authenticator.verify_hmac(message, counter, hmac_value) | ||
| ``` | ||
| """ | ||
|
|
||
| import hmac | ||
|
||
|
|
||
|
|
||
| class HMACAuthenticator: | ||
| """Provides HMAC authentication for command messages.""" | ||
|
|
||
| def __init__(self, secret_key: str) -> None: | ||
| """Initializes the HMACAuthenticator. | ||
| Args: | ||
| secret_key: The shared secret key for HMAC generation and verification. | ||
| """ | ||
| self._secret_key: bytes = secret_key.encode("utf-8") | ||
|
|
||
| def generate_hmac(self, message: str, counter: int) -> str: | ||
| """Generates an HMAC for a message with a counter. | ||
| Args: | ||
| message: The message to authenticate. | ||
| counter: The packet counter for replay attack prevention. | ||
| Returns: | ||
| The HMAC as a hexadecimal string. | ||
| """ | ||
| # Combine message and counter | ||
| data = f"{message}|{counter}".encode("utf-8") | ||
|
|
||
| # Generate HMAC using SHA-256 | ||
| # Use string "sha256" for CircuitPython compatibility | ||
| h = hmac.new(self._secret_key, data, "sha256") | ||
| return h.hexdigest() | ||
|
|
||
| def verify_hmac(self, message: str, counter: int, received_hmac: str) -> bool: | ||
| """Verifies an HMAC for a message with a counter. | ||
| Args: | ||
| message: The message to verify. | ||
| counter: The packet counter for replay attack prevention. | ||
| received_hmac: The HMAC to verify. | ||
| Returns: | ||
| True if the HMAC is valid, False otherwise. | ||
| """ | ||
| expected_hmac = self.generate_hmac(message, counter) | ||
| return hmac.compare_digest(expected_hmac, received_hmac) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ | |
| from pysquared.cdh import CommandDataHandler | ||
| from pysquared.config.config import Config | ||
| from pysquared.hardware.radio.packetizer.packet_manager import PacketManager | ||
| from pysquared.hmac_auth import HMACAuthenticator | ||
| from pysquared.logger import Logger | ||
|
|
||
|
|
||
|
|
@@ -27,6 +28,8 @@ def __init__( | |
| self._config = config | ||
| self._packet_manager = packet_manager | ||
| self._cdh = cdh | ||
| self._hmac_authenticator = HMACAuthenticator(config.hmac_secret) | ||
| self._command_counter = 0 # Counter for replay attack prevention | ||
|
||
|
|
||
| def listen(self): | ||
| """Listen for incoming packets from the satellite.""" | ||
|
|
@@ -86,7 +89,6 @@ def handle_input(self, cmd_selection): | |
|
|
||
| message: dict[str, object] = { | ||
| "name": self._config.cubesat_name, | ||
| "password": self._config.super_secret_code, | ||
| } | ||
|
|
||
| if cmd_selection == "1": | ||
|
|
@@ -98,6 +100,17 @@ def handle_input(self, cmd_selection): | |
| elif cmd_selection == "3": | ||
| message["command"] = self._cdh.command_send_joke | ||
|
|
||
| # Increment counter for replay attack prevention | ||
| self._command_counter += 1 | ||
| message["counter"] = self._command_counter | ||
|
|
||
| # Generate HMAC for the message | ||
| message_str = json.dumps(message, separators=(",", ":")) | ||
| hmac_value = self._hmac_authenticator.generate_hmac( | ||
| message_str, self._command_counter | ||
| ) | ||
| message["hmac"] = hmac_value | ||
|
|
||
| while True: | ||
| # Turn on the radio so that it captures any received packets to buffer | ||
| self._packet_manager.listen(1) | ||
|
|
@@ -107,6 +120,7 @@ def handle_input(self, cmd_selection): | |
| "Sending command", | ||
| cmd=message["command"], | ||
| args=message.get("args", []), | ||
| counter=self._command_counter, | ||
| ) | ||
| self._packet_manager.send(json.dumps(message).encode("utf-8")) | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.