Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 94 additions & 128 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,149 +2,115 @@
import psutil
import subprocess
import sys
from threading import Thread
from threading import Thread, Lock
import time
# from plant_classification import read_v
import Jetson.GPIO as GPIO
# except (RuntimeError, ModuleNotFoundError):
# class MockGPIO:
# BCM = BOARD = IN = OUT = HIGH = LOW = None
# def setmode(self, mode): pass
# def setup(self, pin, mode): pass
# def output(self, pin, state): pass
# def input(self, pin): return False
# def cleanup(self): pass
# def wait_for_edge(self, pin, edge_type):
# print(f"Simulating waiting for {edge_type} edge on pin {pin}")
# return True
# GPIO = MockGPIO()
import logging

def kill_python_scripts_by_name(target_names): # ex ['lighting_rainy.py', 'lighting_summer.py']
"""
Kill all running Python scripts whose command lines include one of the target names.
Does not kill the currently running script.
"""
current_pid = os.getpid()
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
if proc.info['pid'] == current_pid:
continue
cmdline = proc.info['cmdline']
if cmdline and 'python' in cmdline[0].lower():
for name in target_names:
if any(name in part for part in cmdline[1:]): # check script path/args
print(f"Killing PID {proc.info['pid']}: {' '.join(cmdline)}")
proc.kill()
break
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# GPIO Pin Configuration
ONOFF_PIN = 16 # Physical 21
SEASONS = {
'rainy': {'pin': 19, 'script': 'rainy_sound.py'}, # Physical 35
'spring': {'pin': 8, 'script': 'spring_sound.py'}, # Physical 24
'winter': {'pin': 4, 'script': 'winter_sound.py'} # Physical 7
}

def run_script(script_name): # ex 'lighting_rainy.py'
script_path = os.path.join(os.path.dirname(__file__), script_name)
print(f"Running script: {script_path}")
subprocess.Popen([sys.executable, script_path])

GPIO.setwarnings(False)
GPIO.cleanup()
GPIO.setmode(GPIO.BCM)

winter_pin = 4 #physical 7
spring_pin = 8 #physical 24
rainy_pin = 19 #physical 35
running_processes = []
process_lock = Lock()

GPIO.setup(rainy_pin, GPIO.IN) # button pin set as input
GPIO.setup(spring_pin, GPIO.IN) # button pin set as input
GPIO.setup(winter_pin, GPIO.IN) # button pin set as input
def kill_python_scripts_by_name(target_names):
"""Kill running Python scripts matching target names."""
with process_lock:
for proc in running_processes[:]:
try:
if proc.poll() is None:
cmdline = proc.cmdline()
for name in target_names:
if any(name in part for part in cmdline[1:]):
logger.info(f"Terminating PID {proc.pid}: {' '.join(cmdline)}")
proc.terminate()
try:
proc.wait(timeout=2)
except subprocess.TimeoutExpired:
logger.warning(f"Force killing PID {proc.pid}")
proc.kill()
running_processes.remove(proc)
break
except (psutil.NoSuchProcess, psutil.AccessDenied):
running_processes.remove(proc)

ss_old = ''
ss_new = 'spring'
def run_script(script_name):
"""Run a Python script in a subprocess and track its process."""
script_path = os.path.join(os.path.dirname(__file__), script_name)
if not os.path.exists(script_path):
logger.error(f"Script {script_path} not found.")
return None
try:
logger.info(f"Running script: {script_path}")
proc = subprocess.Popen([sys.executable, script_path])
with process_lock:
running_processes.append(proc)
return proc
except Exception as e:
logger.error(f"Failed to run {script_path}: {e}")
return None

class choose_season(Thread):
"""Thread to handle season selection based on GPIO button presses."""
def __init__(self):
Thread.__init__(self)
self.running = True
self.ss_old = ''
self.ss_new = 'spring'
for season, config in SEASONS.items():
GPIO.add_event_detect(
config['pin'], GPIO.RISING, callback=self.handle_button, bouncetime=200
)

def handle_button(self, pin):
for season, config in SEASONS.items():
if pin == config['pin']:
self.ss_new = season
logger.info(f"{season.capitalize()} season selected.")
if self.ss_new != self.ss_old:
self.ss_old = self.ss_new
kill_python_scripts_by_name([config['script'] for config in SEASONS.values()])
run_script(SEASONS[self.ss_new]['script'])
self.ss_new = ''
break

def stop(self):
self.running = False
def run(self):
global ss_old, ss_new
while self.running:
while len(ss_new) == 0:
if GPIO.input(rainy_pin) == GPIO.HIGH:
ss_new = 'rainy'
print("Rainy season selected.")
break
elif GPIO.input(spring_pin) == GPIO.HIGH:
ss_new = 'spring'
print("spring season selected.")
break
elif GPIO.input(winter_pin) == GPIO.HIGH:
ss_new = 'winter'
print("Winter season selected.")
break
time.sleep(0.1)

if ss_new == ss_old:
pass
else:
ss_old = ss_new[:]
kill_python_scripts_by_name(['spring_sound.py', 'rainy_sound.py','winter_sound.py'])
if ss_new == 'rainy':
run_script('rainy_sound.py')
elif ss_new == 'spring':
run_script('spring_sound.py')
elif ss_new == 'winter':
run_script('winter_sound.py')
else:
pass
ss_new = ''
for season, config in SEASONS.items():
GPIO.remove_event_detect(config['pin'])

if __name__ == '__main__':
onoff_pin = 166 #physical 21
GPIO.setmode(GPIO.BCM)
GPIO.setup(onoff_pin, GPIO.IN)
target_scripts = ['playsound.py','plant_classification.py','spring_sound.py','rainy_sound.py','winter_sound.py']
choose_season_thread = choose_season()
while True:
GPIO.wait_for_edge(onoff_pin, GPIO.RISING)
print("ON button pressed. Starting system...")
time.sleep(0.3)
choose_season_thread.start()
run_script('playsound.py')
GPIO.wait_for_edge(onoff_pin, GPIO.RISING)
print("OFF button pressed. Stopping system...")
time.sleep(0.3)
choose_season_thread.stop()
ss_old = ''
ss_new = 'spring'
kill_python_scripts_by_name(target_scripts)

# if __name__ == '__main__':
# onoff_pin = 7 # physical 26
# GPIO.setmode(GPIO.BCM)
# GPIO.setup(onoff_pin, GPIO.IN)
# target_scripts = ['playsound.py', 'plant_classification', 'spring_sound.py', 'rainy_sound.py', 'winter_sound.py']
# run_script('playsound.py')

# while True:
# GPIO.wait_for_edge(onoff_pin, GPIO.RISING)
# print("ON button pressed. Starting system...")
# time.sleep(0.3)

# choose_season_thread = choose_season()
# choose_season_thread.start()
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(ONOFF_PIN, GPIO.IN)
for season, config in SEASONS.items():
GPIO.setup(config['pin'], GPIO.IN)

# try:
# while GPIO.input(onoff_pin) == GPIO.LOW:
# voltages = read_v()
# voltage_str = ' | '.join([f"CH{i}: {v:.2f} V" for i, v in enumerate(voltages)])
# print("Voltages ->", voltage_str)
# time.sleep(0.5)
# except KeyboardInterrupt:
# break
target_scripts = ['playsound.py', 'plant_classification.py', 'spring_sound.py', 'rainy_sound.py', 'winter_sound.py']

# print("OFF button pressed. Stopping system...")
# choose_season_thread.stop()
# ss_old = ''
# ss_new = 'spring'
# kill_python_scripts_by_name(target_scripts)
try:
while True:
GPIO.wait_for_edge(ONOFF_PIN, GPIO.RISING)
logger.info("ON button pressed. Starting system...")
time.sleep(0.3)
choose_season_thread = choose_season()
choose_season_thread.start()
run_script('playsound.py')
GPIO.wait_for_edge(ONOFF_PIN, GPIO.RISING)
logger.info("OFF button pressed. Stopping system...")
time.sleep(0.3)
choose_season_thread.stop()
choose_season_thread.join()
kill_python_scripts_by_name(target_scripts)
finally:
GPIO.cleanup()
logger.info("GPIO resources cleaned up.")