test serial read/write

This commit is contained in:
acki
2025-11-12 11:23:07 +01:00
parent 1848cb72cd
commit 96c72f7ba4
2 changed files with 191 additions and 56 deletions

View File

@@ -1,63 +1,138 @@
import serial
import threading
import time
# Configuration du port série
SERIAL_PORT = '/dev/USB_ODO'
BAUDRATE = 115200
import curses
import argparse
import sys
stop_thread = False
hola = "SET;START;1\nSET;PID;THETA;8;1;1"
def read_serial(ser):
"""Thread de lecture asynchrone du port série"""
def read_serial(ser, log_lines, log_lock):
"""Thread de lecture du port série"""
global stop_thread
while not stop_thread:
if ser.in_waiting:
try:
line = ser.readline().decode(errors='ignore').strip()
if line:
print(f"\n<<< {line}")
print("Commande >>> ", end="", flush=True)
with log_lock:
log_lines.append(line)
if len(log_lines) > 1000:
log_lines.pop(0)
except Exception as e:
print(f"\n[Erreur de lecture série] {e}")
time.sleep(0.05) # Évite d'occuper 100% du CPU
with log_lock:
log_lines.append(f"[Erreur série] {e}")
time.sleep(0.05)
def curses_main(stdscr, serial_port, baudrate):
global stop_thread
curses.curs_set(1)
stdscr.nodelay(True)
stdscr.timeout(100)
log_lines = []
log_lock = threading.Lock()
cmd_history = []
# --- Try to open the serial port ---
try:
ser = serial.Serial(serial_port, baudrate, timeout=1)
except serial.SerialException as e:
stdscr.clear()
stdscr.addstr(0, 0, f"[Erreur] Impossible d'ouvrir le port {serial_port}: {e}")
stdscr.addstr(2, 0, "Appuyez sur une touche pour quitter.")
stdscr.refresh()
stdscr.getch()
return
with ser:
time.sleep(2)
reader_thread = threading.Thread(target=read_serial, args=(ser, log_lines, log_lock), daemon=True)
reader_thread.start()
user_input = ""
while True:
stdscr.clear()
h, w = stdscr.getmaxyx()
# Split screen horizontally (logs left, history right)
split_x = int(w * 0.7)
log_height = h - 2
# --- Left panel: serial logs ---
with log_lock:
visible_logs = log_lines[-log_height:]
for i, line in enumerate(visible_logs):
stdscr.addnstr(i, 0, line, split_x - 1)
# --- Right panel: command history ---
stdscr.vline(0, split_x, "|", log_height)
history_start_x = split_x + 2
stdscr.addstr(0, history_start_x, "Historique des commandes:")
for i, cmd in enumerate(reversed(cmd_history[-(log_height - 2):])):
stdscr.addnstr(i + 1, history_start_x, cmd, w - history_start_x - 1)
# --- Bottom input line ---
stdscr.addstr(log_height, 0, "-" * (w - 1))
stdscr.addstr(log_height + 1, 0, f"Commande >>> {user_input}")
stdscr.refresh()
try:
ch = stdscr.get_wch()
except curses.error:
ch = None
if ch is None:
continue
# Handle key input
if isinstance(ch, str):
if ch in ("\n", "\r"): # Enter
cmd = user_input.strip()
if cmd.lower() in ("exit", "quit"):
stop_thread = True
break
if cmd:
ser.write((cmd + "\n").encode())
cmd_history.append(cmd)
user_input = ""
elif ch in ("\b", "\x7f"): # Backspace
user_input = user_input[:-1]
elif ch.isprintable():
user_input += ch
elif ch == curses.KEY_BACKSPACE:
user_input = user_input[:-1]
elif ch == 27: # ESC
stop_thread = True
break
stop_thread = True
reader_thread.join(timeout=1)
def send_command(ser, cmd):
print(f">>> {cmd.strip()}")
ser.write((cmd + '\n').encode())
def main():
global stop_thread
try:
with serial.Serial(SERIAL_PORT, BAUDRATE, timeout=1) as ser:
time.sleep(2) # Délai pour l'initialisation de l'appareil
parser = argparse.ArgumentParser(
description="Terminal série interactif avec logs et historique des commandes."
)
parser.add_argument(
"--port",
type=str,
default="/dev/USB_ODO",
help="Port série à utiliser (ex: /dev/ttyUSB0, COM3, etc.)",
)
parser.add_argument(
"--baudrate",
type=int,
default=115200,
help="Vitesse de communication (baudrate, ex: 9600, 115200, etc.)",
)
args = parser.parse_args()
print("=== Interface de commande manuelle (asynchrone) ===")
print("Tape une commande à envoyer, ou 'exit' pour quitter.\n")
curses.wrapper(curses_main, args.port, args.baudrate)
# Démarrer le thread de lecture série
reader_thread = threading.Thread(target=read_serial, args=(ser,), daemon=True)
reader_thread.start()
# Boucle principale de saisie
while True:
user_input = input("Commande >>> ").strip()
if user_input.lower() in ('exit', 'quit'):
stop_thread = True
reader_thread.join(timeout=1)
print("Fermeture du programme.")
break
elif user_input == "":
send_command(ser, hola)
continue
else:
send_command(ser, user_input)
except serial.SerialException as e:
print(f"Erreur de port série : {e}")
if __name__ == "__main__":
main()

View File

@@ -2,13 +2,26 @@ import serial
import threading
import time
import curses
import argparse
import sys
import serial.tools.list_ports
SERIAL_PORT = '/dev/USB_ODO'
BAUDRATE = 115200
stop_thread = False
def list_serial_ports():
"""Return a list of available serial ports."""
ports = serial.tools.list_ports.comports()
if not ports:
print("No serial ports detected.")
return
print("Available serial ports:\n")
for p in ports:
print(f" - {p.device}\t({p.description})")
def read_serial(ser, log_lines, log_lock):
"""Thread de lecture du port série"""
"""Asynchronous serial reading thread."""
global stop_thread
while not stop_thread:
if ser.in_waiting:
@@ -21,10 +34,12 @@ def read_serial(ser, log_lines, log_lock):
log_lines.pop(0)
except Exception as e:
with log_lock:
log_lines.append(f"[Erreur série] {e}")
log_lines.append(f"[Serial read error] {e}")
time.sleep(0.05)
def curses_main(stdscr):
def curses_main(stdscr, serial_port, baudrate):
"""Main curses-based interface."""
global stop_thread
curses.curs_set(1)
stdscr.nodelay(True)
@@ -34,9 +49,22 @@ def curses_main(stdscr):
log_lock = threading.Lock()
cmd_history = []
with serial.Serial(SERIAL_PORT, BAUDRATE, timeout=1) as ser:
# Try opening the serial port
try:
ser = serial.Serial(serial_port, baudrate, timeout=1)
except serial.SerialException as e:
stdscr.clear()
stdscr.addstr(0, 0, f"[Error] Unable to open port {serial_port}: {e}")
stdscr.addstr(2, 0, "Press any key to exit.")
stdscr.refresh()
stdscr.getch()
return
with ser:
time.sleep(2)
reader_thread = threading.Thread(target=read_serial, args=(ser, log_lines, log_lock), daemon=True)
reader_thread = threading.Thread(
target=read_serial, args=(ser, log_lines, log_lock), daemon=True
)
reader_thread.start()
user_input = ""
@@ -45,11 +73,11 @@ def curses_main(stdscr):
stdscr.clear()
h, w = stdscr.getmaxyx()
# Split screen horizontally (80% logs, 20% history)
# Split screen (70% logs, 30% history)
split_x = int(w * 0.7)
log_height = h - 2
# --- Left panel: logs ---
# --- Left panel: serial logs ---
with log_lock:
visible_logs = log_lines[-log_height:]
for i, line in enumerate(visible_logs):
@@ -58,13 +86,13 @@ def curses_main(stdscr):
# --- Right panel: command history ---
stdscr.vline(0, split_x, "|", log_height)
history_start_x = split_x + 2
stdscr.addstr(0, history_start_x, "Historique des commandes:")
stdscr.addstr(0, history_start_x, "Command history:")
for i, cmd in enumerate(reversed(cmd_history[-(log_height - 2):])):
stdscr.addnstr(i + 1, history_start_x, cmd, w - history_start_x - 1)
# --- Input line ---
# --- Bottom input line ---
stdscr.addstr(log_height, 0, "-" * (w - 1))
stdscr.addstr(log_height + 1, 0, f"Commande >>> {user_input}")
stdscr.addstr(log_height + 1, 0, f"Command >>> {user_input}")
stdscr.refresh()
try:
@@ -77,7 +105,7 @@ def curses_main(stdscr):
# Handle key input
if isinstance(ch, str):
if ch in ("\n", "\r"):
if ch in ("\n", "\r"): # Enter
cmd = user_input.strip()
if cmd.lower() in ("exit", "quit"):
stop_thread = True
@@ -86,7 +114,7 @@ def curses_main(stdscr):
ser.write((cmd + "\n").encode())
cmd_history.append(cmd)
user_input = ""
elif ch in ("\b", "\x7f"):
elif ch in ("\b", "\x7f"): # Backspace
user_input = user_input[:-1]
elif ch.isprintable():
user_input += ch
@@ -99,4 +127,36 @@ def curses_main(stdscr):
stop_thread = True
reader_thread.join(timeout=1)
curses.wrapper(curses_main)
def main():
parser = argparse.ArgumentParser(
description="Interactive serial terminal with live logs and command history."
)
parser.add_argument(
"--port",
type=str,
default="/dev/USB_ODO",
help="Serial port to use (e.g. /dev/ttyUSB0, COM3, etc.)",
)
parser.add_argument(
"--baudrate",
type=int,
default=115200,
help="Baudrate (e.g. 9600, 115200, etc.)",
)
parser.add_argument(
"--list",
action="store_true",
help="List all available serial ports and exit.",
)
args = parser.parse_args()
if args.list:
list_serial_ports()
return
curses.wrapper(curses_main, args.port, args.baudrate)
if __name__ == "__main__":
main()