test serial read/write

This commit is contained in:
acki
2025-11-12 17:11:03 +01:00
parent 96c72f7ba4
commit 2378ca130b

View File

@@ -3,25 +3,23 @@ import threading
import time import time
import curses import curses
import argparse import argparse
import sys
import serial.tools.list_ports import serial.tools.list_ports
stop_thread = False stop_thread = False
def list_serial_ports(): def list_serial_ports():
"""Return a list of available serial ports.""" """List all available serial ports"""
ports = serial.tools.list_ports.comports() ports = serial.tools.list_ports.comports()
if not ports: if not ports:
print("No serial ports detected.") print("No serial ports found.")
return else:
print("Available serial ports:\n") print("Available serial ports:")
for p in ports: for port in ports:
print(f" - {p.device}\t({p.description})") print(f" {port.device} - {port.description}")
exit(0)
def read_serial(ser, log_lines, log_lock): def read_serial(ser, log_lines, log_lock):
"""Asynchronous serial reading thread.""" """Thread to read serial output"""
global stop_thread global stop_thread
while not stop_thread: while not stop_thread:
if ser.in_waiting: if ser.in_waiting:
@@ -34,12 +32,10 @@ def read_serial(ser, log_lines, log_lock):
log_lines.pop(0) log_lines.pop(0)
except Exception as e: except Exception as e:
with log_lock: with log_lock:
log_lines.append(f"[Serial read error] {e}") log_lines.append(f"[Serial Error] {e}")
time.sleep(0.05) time.sleep(0.05)
def curses_main(stdscr, port, baudrate):
def curses_main(stdscr, serial_port, baudrate):
"""Main curses-based interface."""
global stop_thread global stop_thread
curses.curs_set(1) curses.curs_set(1)
stdscr.nodelay(True) stdscr.nodelay(True)
@@ -48,23 +44,11 @@ def curses_main(stdscr, serial_port, baudrate):
log_lines = [] log_lines = []
log_lock = threading.Lock() log_lock = threading.Lock()
cmd_history = [] cmd_history = []
history_index = -1 # For navigating command history
# Try opening the serial port with serial.Serial(port, baudrate, timeout=1) as ser:
try:
ser = serial.Serial(serial_port, baudrate, timeout=1)
except serial.SerialException as e:
stdscr.clear()
stdscr.addstr(0, 0, f"[Error] Unable to open port {serial_port}: {e}")
stdscr.addstr(2, 0, "Press any key to exit.")
stdscr.refresh()
stdscr.getch()
return
with ser:
time.sleep(2) time.sleep(2)
reader_thread = threading.Thread( reader_thread = threading.Thread(target=read_serial, args=(ser, log_lines, log_lock), daemon=True)
target=read_serial, args=(ser, log_lines, log_lock), daemon=True
)
reader_thread.start() reader_thread.start()
user_input = "" user_input = ""
@@ -73,11 +57,11 @@ def curses_main(stdscr, serial_port, baudrate):
stdscr.clear() stdscr.clear()
h, w = stdscr.getmaxyx() h, w = stdscr.getmaxyx()
# Split screen (70% logs, 30% history) # Split screen horizontally (70% logs, 30% command history)
split_x = int(w * 0.7) split_x = int(w * 0.7)
log_height = h - 2 log_height = h - 2
# --- Left panel: serial logs --- # --- Left panel: logs ---
with log_lock: with log_lock:
visible_logs = log_lines[-log_height:] visible_logs = log_lines[-log_height:]
for i, line in enumerate(visible_logs): for i, line in enumerate(visible_logs):
@@ -86,11 +70,11 @@ def curses_main(stdscr, serial_port, baudrate):
# --- Right panel: command history --- # --- Right panel: command history ---
stdscr.vline(0, split_x, "|", log_height) stdscr.vline(0, split_x, "|", log_height)
history_start_x = split_x + 2 history_start_x = split_x + 2
stdscr.addstr(0, history_start_x, "Command history:") stdscr.addstr(0, history_start_x, "Command History:")
for i, cmd in enumerate(reversed(cmd_history[-(log_height - 2):])): for i, cmd in enumerate(reversed(cmd_history[-(log_height - 2):])):
stdscr.addnstr(i + 1, history_start_x, cmd, w - history_start_x - 1) stdscr.addnstr(i + 1, history_start_x, cmd, w - history_start_x - 1)
# --- Bottom input line --- # --- Input line ---
stdscr.addstr(log_height, 0, "-" * (w - 1)) stdscr.addstr(log_height, 0, "-" * (w - 1))
stdscr.addstr(log_height + 1, 0, f"Command >>> {user_input}") stdscr.addstr(log_height + 1, 0, f"Command >>> {user_input}")
stdscr.refresh() stdscr.refresh()
@@ -103,9 +87,8 @@ def curses_main(stdscr, serial_port, baudrate):
if ch is None: if ch is None:
continue continue
# Handle key input
if isinstance(ch, str): if isinstance(ch, str):
if ch in ("\n", "\r"): # Enter if ch in ("\n", "\r"): # Enter key
cmd = user_input.strip() cmd = user_input.strip()
if cmd.lower() in ("exit", "quit"): if cmd.lower() in ("exit", "quit"):
stop_thread = True stop_thread = True
@@ -113,13 +96,30 @@ def curses_main(stdscr, serial_port, baudrate):
if cmd: if cmd:
ser.write((cmd + "\n").encode()) ser.write((cmd + "\n").encode())
cmd_history.append(cmd) cmd_history.append(cmd)
history_index = -1
user_input = "" user_input = ""
elif ch in ("\b", "\x7f"): # Backspace elif ch in ("\b", "\x7f"):
user_input = user_input[:-1] user_input = user_input[:-1]
elif ch.isprintable(): elif ch.isprintable():
user_input += ch user_input += ch
elif ch == curses.KEY_BACKSPACE: elif ch == curses.KEY_BACKSPACE:
user_input = user_input[:-1] user_input = user_input[:-1]
elif ch == curses.KEY_UP:
if cmd_history:
if history_index == -1:
history_index = len(cmd_history) - 1
elif history_index > 0:
history_index -= 1
user_input = cmd_history[history_index]
elif ch == curses.KEY_DOWN:
if cmd_history:
if history_index != -1:
history_index += 1
if history_index >= len(cmd_history):
history_index = -1
user_input = ""
else:
user_input = cmd_history[history_index]
elif ch == 27: # ESC elif ch == 27: # ESC
stop_thread = True stop_thread = True
break break
@@ -129,31 +129,14 @@ def curses_main(stdscr, serial_port, baudrate):
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(description="Serial monitor with command history and curses UI.")
description="Interactive serial terminal with live logs and command history." parser.add_argument("--port", default="/dev/USB_ODO", help="Serial port to use (default: /dev/USB_ODO)")
) parser.add_argument("--baudrate", type=int, default=115200, help="Baud rate (default: 115200)")
parser.add_argument( parser.add_argument("--list", action="store_true", help="List available serial ports and exit")
"--port",
type=str,
default="/dev/USB_ODO",
help="Serial port to use (e.g. /dev/ttyUSB0, COM3, etc.)",
)
parser.add_argument(
"--baudrate",
type=int,
default=115200,
help="Baudrate (e.g. 9600, 115200, etc.)",
)
parser.add_argument(
"--list",
action="store_true",
help="List all available serial ports and exit.",
)
args = parser.parse_args() args = parser.parse_args()
if args.list: if args.list:
list_serial_ports() list_serial_ports()
return
curses.wrapper(curses_main, args.port, args.baudrate) curses.wrapper(curses_main, args.port, args.baudrate)