mirror of
https://github.com/modelec/modelec-marcel-ROS.git
synced 2026-01-18 16:47:43 +01:00
test serial read
This commit is contained in:
87
simulated_pcb/odo_serial.test.py
Normal file
87
simulated_pcb/odo_serial.test.py
Normal file
@@ -0,0 +1,87 @@
|
||||
import serial
|
||||
import threading
|
||||
import time
|
||||
import curses
|
||||
|
||||
SERIAL_PORT = '/dev/USB_ODO'
|
||||
BAUDRATE = 115200
|
||||
stop_thread = False
|
||||
|
||||
def read_serial(ser, log_lines, log_lock):
|
||||
global stop_thread
|
||||
while not stop_thread:
|
||||
if ser.in_waiting:
|
||||
try:
|
||||
line = ser.readline().decode(errors='ignore').strip()
|
||||
if line:
|
||||
with log_lock:
|
||||
log_lines.append(line)
|
||||
if len(log_lines) > 1000:
|
||||
log_lines.pop(0)
|
||||
except Exception as e:
|
||||
with log_lock:
|
||||
log_lines.append(f"[Erreur série] {e}")
|
||||
time.sleep(0.05)
|
||||
|
||||
def curses_main(stdscr):
|
||||
global stop_thread
|
||||
curses.curs_set(1)
|
||||
stdscr.nodelay(True)
|
||||
stdscr.timeout(100)
|
||||
|
||||
log_lines = []
|
||||
log_lock = threading.Lock()
|
||||
|
||||
with serial.Serial(SERIAL_PORT, BAUDRATE, timeout=1) as ser:
|
||||
time.sleep(2)
|
||||
reader_thread = threading.Thread(target=read_serial, args=(ser, log_lines, log_lock), daemon=True)
|
||||
reader_thread.start()
|
||||
|
||||
user_input = ""
|
||||
|
||||
while True:
|
||||
stdscr.clear()
|
||||
|
||||
# Get terminal size
|
||||
h, w = stdscr.getmaxyx()
|
||||
log_height = h - 2
|
||||
|
||||
# Display log window
|
||||
with log_lock:
|
||||
visible_logs = log_lines[-log_height:]
|
||||
for i, line in enumerate(visible_logs):
|
||||
stdscr.addnstr(i, 0, line, w - 1)
|
||||
|
||||
# Draw input line
|
||||
stdscr.addstr(log_height, 0, "-" * (w - 1))
|
||||
stdscr.addstr(log_height + 1, 0, f"Commande >>> {user_input}")
|
||||
stdscr.refresh()
|
||||
|
||||
# Handle user input
|
||||
try:
|
||||
ch = stdscr.get_wch()
|
||||
except curses.error:
|
||||
ch = None
|
||||
|
||||
if ch is None:
|
||||
continue
|
||||
elif isinstance(ch, str):
|
||||
if ch == "\n":
|
||||
cmd = user_input.strip()
|
||||
if cmd.lower() in ("exit", "quit"):
|
||||
stop_thread = True
|
||||
break
|
||||
ser.write((cmd + "\n").encode())
|
||||
user_input = ""
|
||||
elif ch == "\x7f": # Backspace
|
||||
user_input = user_input[:-1]
|
||||
else:
|
||||
user_input += ch
|
||||
elif ch == 27: # ESC
|
||||
stop_thread = True
|
||||
break
|
||||
|
||||
stop_thread = True
|
||||
reader_thread.join(timeout=1)
|
||||
|
||||
curses.wrapper(curses_main)
|
||||
Reference in New Issue
Block a user