Skip to content

Commit 099e6c2

Browse files
authored
Merge pull request #15 from rightup/dev
2 parents e0ccf6a + 988830c commit 099e6c2

File tree

5 files changed

+431
-303
lines changed

5 files changed

+431
-303
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "pymc_core"
7-
version = "1.0.4"
7+
version = "1.0.5"
88
authors = [
99
{name = "Lloyd Newton", email = "lloyd@rightup.co.uk"},
1010
]

src/pymc_core/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
Clean, simple API for building mesh network applications.
44
"""
55

6-
__version__ = "1.0.4"
6+
__version__ = "1.0.5"
77

88
# Core mesh functionality
99
from .node.node import MeshNode
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
"""
2+
GPIO Pin Manager for Raspberry Pi
3+
Manages GPIO pins abstraction using gpiozero
4+
"""
5+
6+
import asyncio
7+
import logging
8+
from typing import Callable, Optional
9+
10+
from gpiozero import Button, Device, OutputDevice
11+
12+
# Force gpiozero to use LGPIOFactory - no RPi.GPIO fallback
13+
from gpiozero.pins.lgpio import LGPIOFactory
14+
15+
Device.pin_factory = LGPIOFactory()
16+
17+
logger = logging.getLogger("GPIOPinManager")
18+
19+
20+
class GPIOPinManager:
21+
"""Manages GPIO pins abstraction"""
22+
23+
def __init__(self):
24+
self._pins = {}
25+
self._led_tasks = {} # Track active LED tasks
26+
27+
def setup_output_pin(self, pin_number: int, initial_value: bool = False) -> bool:
28+
"""Setup an output pin with initial value"""
29+
if pin_number == -1:
30+
return False
31+
32+
try:
33+
if pin_number in self._pins:
34+
self._pins[pin_number].close()
35+
36+
self._pins[pin_number] = OutputDevice(pin_number, initial_value=initial_value)
37+
return True
38+
except Exception as e:
39+
logger.warning(f"Failed to setup output pin {pin_number}: {e}")
40+
return False
41+
42+
def setup_input_pin(
43+
self,
44+
pin_number: int,
45+
pull_up: bool = False,
46+
callback: Optional[Callable] = None,
47+
) -> bool:
48+
"""Setup an input pin with optional interrupt callback"""
49+
if pin_number == -1:
50+
return False
51+
52+
try:
53+
if pin_number in self._pins:
54+
self._pins[pin_number].close()
55+
56+
self._pins[pin_number] = Button(pin_number, pull_up=pull_up)
57+
if callback:
58+
self._pins[pin_number].when_activated = callback
59+
60+
return True
61+
except Exception as e:
62+
logger.warning(f"Failed to setup input pin {pin_number}: {e}")
63+
return False
64+
65+
def setup_interrupt_pin(
66+
self,
67+
pin_number: int,
68+
pull_up: bool = False,
69+
callback: Optional[Callable] = None,
70+
) -> Optional[Button]:
71+
"""Setup an interrupt pin and return the Button object for direct access"""
72+
if pin_number == -1:
73+
return None
74+
75+
try:
76+
if pin_number in self._pins:
77+
self._pins[pin_number].close()
78+
79+
button = Button(pin_number, pull_up=pull_up)
80+
if callback:
81+
button.when_activated = callback
82+
83+
self._pins[pin_number] = button
84+
return button
85+
except Exception as e:
86+
logger.warning(f"Failed to setup interrupt pin {pin_number}: {e}")
87+
return None
88+
89+
def set_pin_high(self, pin_number: int) -> bool:
90+
"""Set output pin to HIGH"""
91+
if pin_number in self._pins and hasattr(self._pins[pin_number], "on"):
92+
try:
93+
self._pins[pin_number].on()
94+
return True
95+
except Exception as e:
96+
logger.warning(f"Failed to set pin {pin_number} HIGH: {e}")
97+
return False
98+
99+
def set_pin_low(self, pin_number: int) -> bool:
100+
"""Set output pin to LOW"""
101+
if pin_number in self._pins and hasattr(self._pins[pin_number], "off"):
102+
try:
103+
self._pins[pin_number].off()
104+
return True
105+
except Exception as e:
106+
logger.warning(f"Failed to set pin {pin_number} LOW: {e}")
107+
return False
108+
109+
def cleanup_pin(self, pin_number: int) -> None:
110+
"""Clean up a specific pin"""
111+
if pin_number in self._pins:
112+
try:
113+
self._pins[pin_number].close()
114+
del self._pins[pin_number]
115+
except Exception as e:
116+
logger.warning(f"Failed to cleanup pin {pin_number}: {e}")
117+
118+
def cleanup_all(self) -> None:
119+
"""Clean up all managed pins"""
120+
# Cancel any running LED tasks
121+
for task in self._led_tasks.values():
122+
if not task.done():
123+
task.cancel()
124+
self._led_tasks.clear()
125+
126+
# Clean up pins
127+
for pin_number in list(self._pins.keys()):
128+
self.cleanup_pin(pin_number)
129+
130+
async def _led_blink_task(self, pin_number: int, duration: float = 3.0) -> None:
131+
"""Internal task to blink LED for specified duration"""
132+
try:
133+
# Turn LED on
134+
self.set_pin_high(pin_number)
135+
logger.debug(f"LED {pin_number} turned ON for {duration}s")
136+
137+
# Wait for duration
138+
await asyncio.sleep(duration)
139+
140+
# Turn LED off
141+
self.set_pin_low(pin_number)
142+
logger.debug(f"LED {pin_number} turned OFF")
143+
144+
except asyncio.CancelledError:
145+
# Turn off LED if task was cancelled
146+
self.set_pin_low(pin_number)
147+
logger.debug(f"LED {pin_number} task cancelled, LED turned OFF")
148+
except Exception as e:
149+
logger.warning(f"LED {pin_number} task error: {e}")
150+
finally:
151+
# Remove from active tasks
152+
if pin_number in self._led_tasks:
153+
del self._led_tasks[pin_number]
154+
155+
def blink_led(self, pin_number: int, duration: float = 3.0) -> None:
156+
"""
157+
Blink LED for specified duration (non-blocking)
158+
159+
Args:
160+
pin_number: GPIO pin number for LED
161+
duration: How long to keep LED on (seconds, default: 3.0)
162+
"""
163+
if pin_number == -1:
164+
return # LED disabled
165+
166+
if pin_number not in self._pins:
167+
logger.debug(f"LED pin {pin_number} not configured, skipping")
168+
return
169+
170+
try:
171+
# Cancel any existing LED task for this pin
172+
if pin_number in self._led_tasks and not self._led_tasks[pin_number].done():
173+
self._led_tasks[pin_number].cancel()
174+
175+
# Start new LED task
176+
loop = asyncio.get_running_loop()
177+
self._led_tasks[pin_number] = loop.create_task(
178+
self._led_blink_task(pin_number, duration)
179+
)
180+
181+
except RuntimeError:
182+
# No event loop running - just turn on LED (won't auto-turn off)
183+
logger.warning(f"No event loop, LED pin {pin_number} turned on (manual off required)")
184+
self.set_pin_high(pin_number)
185+
except Exception as e:
186+
logger.warning(f"Failed to start LED task for pin {pin_number}: {e}")

0 commit comments

Comments
 (0)