Files
modelec-marcel-ROS/simulated_pcb/serial.modelec.py
Ackimixs d9b1080516 Develop strat 7 (#32)
Co-authored-by: modelec <modelec-isen@gmail.com>
2026-03-11 16:38:23 +01:00

166 lines
5.7 KiB
Python

import serial
import threading
import time
import curses
import argparse
import serial.tools.list_ports
stop_thread = False
def list_serial_ports():
"""List all available serial ports"""
ports = serial.tools.list_ports.comports()
if not ports:
print("No serial ports found.")
else:
print("Available serial ports:")
for port in ports:
print(f" {port.device} - {port.description}")
exit(0)
def read_serial(ser, log_lines, log_lock):
"""Thread to read serial output"""
global stop_thread
while not stop_thread:
if ser.in_waiting:
try:
line = ser.readline().decode(errors='ignore').strip()
if line:
with log_lock:
log_lines.append(line)
if len(log_lines) > 1000:
log_lines.pop(0)
except Exception as e:
with log_lock:
log_lines.append(f"[Serial Error] {e}")
time.sleep(0.05)
def curses_main(stdscr, port, baudrate, filter_start=None):
global stop_thread
curses.curs_set(1)
stdscr.nodelay(True)
stdscr.timeout(100)
if filter_start is None:
filter_start = ["SET;POS"]
log_lines = []
log_lock = threading.Lock()
cmd_history = []
history_index = -1
with serial.Serial(port, baudrate, timeout=1) as ser:
time.sleep(2)
reader_thread = threading.Thread(target=read_serial, args=(ser, log_lines, log_lock), daemon=True)
reader_thread.start()
user_input = ""
while True:
stdscr.clear()
h, w = stdscr.getmaxyx()
# --- Split into 3 vertical columns ---
col_width = w // 3
col1_x = 0
col2_x = col_width
col3_x = 2 * col_width
log_height = h - 2
# --- Separate logs into filtered and others ---
with log_lock:
filtered_logs = [line for line in log_lines if any(line.startswith(p) for p in filter_start)]
other_logs = [line for line in log_lines if not any(line.startswith(p) for p in filter_start)]
visible_filtered = filtered_logs[-log_height:]
visible_other = other_logs[-log_height:]
visible_history = cmd_history[-log_height:]
# --- Column 1: Filtered serial output ---
stdscr.addstr(0, col1_x, f"Filtered ({', '.join(filter_start)}):")
for i, line in enumerate(visible_filtered):
stdscr.addnstr(i + 1, col1_x, line, col_width - 1)
# --- Column 2: Other serial output ---
stdscr.addstr(0, col2_x, "Other Logs:")
for i, line in enumerate(visible_other):
stdscr.addnstr(i + 1, col2_x, line, col_width - 1)
# --- Column 3: Command history ---
stdscr.addstr(0, col3_x, "Command History:")
for i, cmd in enumerate(visible_history):
stdscr.addnstr(i + 1, col3_x, cmd, col_width - 1)
# --- Input line ---
stdscr.addstr(log_height, 0, "-" * (w - 1))
stdscr.addstr(log_height + 1, 0, f"Command >>> {user_input}")
stdscr.refresh()
try:
ch = stdscr.get_wch()
except curses.error:
ch = None
if ch is None:
continue
if isinstance(ch, str):
if ch in ("\n", "\r"): # Enter key
cmd = user_input.strip()
if cmd.lower() in ("exit", "quit"):
stop_thread = True
break
elif cmd.lower() == "clear":
with log_lock:
log_lines.clear()
elif cmd:
ser.write((cmd + "\n").encode())
cmd_history.append(cmd)
history_index = -1
user_input = ""
elif ch in ("\b", "\x7f"):
user_input = user_input[:-1]
elif ch.isprintable():
user_input += ch
elif ch == curses.KEY_BACKSPACE:
user_input = user_input[:-1]
elif ch == curses.KEY_UP:
if cmd_history:
if history_index == -1:
history_index = len(cmd_history) - 1
elif history_index > 0:
history_index -= 1
user_input = cmd_history[history_index]
elif ch == curses.KEY_DOWN:
if cmd_history:
if history_index != -1:
history_index += 1
if history_index >= len(cmd_history):
history_index = -1
user_input = ""
else:
user_input = cmd_history[history_index]
elif ch == 27: # ESC
stop_thread = True
break
stop_thread = True
reader_thread.join(timeout=1)
def main():
parser = argparse.ArgumentParser(description="Serial monitor with command history and curses UI.")
parser.add_argument("--port", default="/dev/USB_ODO", help="Serial port to use (default: /dev/USB_ODO)")
parser.add_argument("--baudrate", type=int, default=115200, help="Baud rate (default: 115200)")
parser.add_argument("--list", action="store_true", help="List available serial ports and exit")
args = parser.parse_args()
if args.list:
list_serial_ports()
curses.wrapper(curses_main, args.port, args.baudrate)
if __name__ == "__main__":
main()