mirror of
https://github.com/modelec/modelec-marcel-ROS.git
synced 2026-01-18 16:47:43 +01:00
64 lines
2.0 KiB
Python
64 lines
2.0 KiB
Python
import serial
|
|
import threading
|
|
import time
|
|
|
|
# Configuration du port série
|
|
SERIAL_PORT = '/dev/USB_ODO'
|
|
BAUDRATE = 115200
|
|
|
|
stop_thread = False
|
|
|
|
hola = "SET;START;1\nSET;PID;THETA;8;1;1"
|
|
|
|
def read_serial(ser):
|
|
"""Thread de lecture asynchrone du port série"""
|
|
global stop_thread
|
|
while not stop_thread:
|
|
if ser.in_waiting:
|
|
try:
|
|
line = ser.readline().decode(errors='ignore').strip()
|
|
if line:
|
|
print(f"\n<<< {line}")
|
|
print("Commande >>> ", end="", flush=True)
|
|
except Exception as e:
|
|
print(f"\n[Erreur de lecture série] {e}")
|
|
time.sleep(0.05) # Évite d'occuper 100% du CPU
|
|
|
|
def send_command(ser, cmd):
|
|
print(f">>> {cmd.strip()}")
|
|
ser.write((cmd + '\n').encode())
|
|
|
|
def main():
|
|
global stop_thread
|
|
try:
|
|
with serial.Serial(SERIAL_PORT, BAUDRATE, timeout=1) as ser:
|
|
time.sleep(2) # Délai pour l'initialisation de l'appareil
|
|
|
|
print("=== Interface de commande manuelle (asynchrone) ===")
|
|
print("Tape une commande à envoyer, ou 'exit' pour quitter.\n")
|
|
|
|
# Démarrer le thread de lecture série
|
|
reader_thread = threading.Thread(target=read_serial, args=(ser,), daemon=True)
|
|
reader_thread.start()
|
|
|
|
# Boucle principale de saisie
|
|
while True:
|
|
user_input = input("Commande >>> ").strip()
|
|
|
|
if user_input.lower() in ('exit', 'quit'):
|
|
stop_thread = True
|
|
reader_thread.join(timeout=1)
|
|
print("Fermeture du programme.")
|
|
break
|
|
elif user_input == "":
|
|
send_command(ser, hola)
|
|
continue
|
|
else:
|
|
send_command(ser, user_input)
|
|
|
|
except serial.SerialException as e:
|
|
print(f"Erreur de port série : {e}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|