Files
modelec-marcel-ROS/simulated_pcb/action.py
2025-12-15 18:41:59 +01:00

217 lines
6.5 KiB
Python

import argparse
import serial
import threading
import time
class SimulatedPCB:
def __init__(self, port='/tmp/ACTION_SIM', baud=115200):
self.ser = serial.Serial(port, baud, timeout=0.1)
self.running = True
# State identical to STM32 getters
self.servo_angles = {i: 0.0 for i in range(16)} # angle as float
self.relay_state = {i: 0 for i in range(16)} # 0/1
self.tirette_armed = False
self.thread = threading.Thread(target=self.run)
self.thread.start()
def tx(self, msg):
print(f"[TX] {msg.strip()}")
self.ser.write((msg + "\n").encode())
# ----------------------------------------------------------------------
# STM32-LIKE FUNCTIONS
# ----------------------------------------------------------------------
def getServoAngle(self, sid):
return float(self.servo_angles.get(sid, 0.0))
def getRelayState(self, rid):
return int(self.relay_state.get(rid, 0))
def moveServo(self, sid, angle):
self.servo_angles[sid] = float(angle)
def moveRelay(self, rid, state):
self.relay_state[rid] = 1 if int(state) else 0
# ----------------------------------------------------------------------
# COMMAND PARSER (as close as possible to STM32 version)
# ----------------------------------------------------------------------
def process(self, line):
print(f"[RX] {line}")
parts = line.split(";")
if len(parts) < 2:
return
cmd = parts[0]
if cmd == "GET":
self.handle_get(parts)
elif cmd == "SET":
self.handle_set(parts)
elif cmd == "MOV":
self.handle_mov(parts)
elif cmd == "ACK":
# simulate ACK reception ignore for now
pass
# ----------------------------------------------------------------------
# GET handler (SERVO POS / RELAY STATE)
# ----------------------------------------------------------------------
def handle_get(self, p):
# GET;SERVO;POS;n;id...
if p[1] == "SERVO" and p[2] == "POS":
if len(p) < 4:
self.tx("KO;SET;SERVO;POS")
return
n = int(p[3])
if n <= 0:
self.tx("SET;SERVO;POS;0")
return
resp = f"SET;SERVO;POS;{n}"
idx = 4
for _ in range(n):
if idx >= len(p):
self.tx("KO;SET;SERVO;POS")
return
sid = int(p[idx])
idx += 1
angle = self.getServoAngle(sid)
resp += f";{sid};{angle:.4f}"
self.tx(resp)
return
# GET;RELAY;STATE;n;id...
if p[1] == "RELAY" and p[2] == "STATE":
if len(p) < 4:
self.tx("KO;SET;RELAY;STATE")
return
n = int(p[3])
if n <= 0:
self.tx("SET;RELAY;STATE;0")
return
resp = f"SET;RELAY;STATE;{n}"
idx = 4
for _ in range(n):
if idx >= len(p):
self.tx("KO;SET;RELAY;STATE")
return
rid = int(p[idx])
idx += 1
state = self.getRelayState(rid)
resp += f";{rid};{state}"
self.tx(resp)
return
# ----------------------------------------------------------------------
# SET handler (only TIR;ARM)
# ----------------------------------------------------------------------
def handle_set(self, p):
if p[1] == "TIR" and p[2] == "ARM":
if len(p) < 4:
self.tx("KO;TIR;ARM")
return
val = int(p[3])
self.tirette_armed = bool(val)
self.tx(f"OK;TIR;ARM;{val}")
return
self.tx(f"KO;{p[1]};{p[2]}")
# ----------------------------------------------------------------------
# MOV handler (SERVO multi / RELAY multi)
# ----------------------------------------------------------------------
def handle_mov(self, p):
# MOV;SERVO;n;id;angle;id;angle...
if p[1] == "SERVO":
n = int(p[2])
idx = 3
resp = f"OK;SERVO;{n}"
for _ in range(n):
if idx + 1 >= len(p):
self.tx(f"KO;MOV;SERVO;{p[2]}")
return
sid = int(p[idx])
ang = float(p[idx+1])
idx += 2
self.moveServo(sid, ang)
resp += f";{sid};{ang}"
self.tx(resp)
return
# MOV;RELAY;n;id;state...
if p[1] == "RELAY":
n = int(p[2])
idx = 3
resp = f"OK;RELAY;{n}"
for _ in range(n):
if idx + 1 >= len(p):
self.tx(f"KO;MOV;RELAY;{p[2]}")
return
rid = int(p[idx])
st = int(p[idx+1])
idx += 2
self.moveRelay(rid, st)
resp += f";{rid};{st}"
self.tx(resp)
return
self.tx(f"KO;MOV;{p[1]}")
# ----------------------------------------------------------------------
# Loop
# ----------------------------------------------------------------------
def run(self):
buf = b""
while self.running:
try:
data = self.ser.read(64)
if data:
buf += data
while b"\n" in buf:
line, buf = buf.split(b"\n", 1)
self.process(line.decode().strip())
except Exception as e:
print(f"Serial error: {e}")
time.sleep(0.2)
def stop(self):
self.running = False
self.thread.join()
self.ser.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--port", type=str, default="/tmp/ACTION_SIM")
args = parser.parse_args()
sim = None
try:
sim = SimulatedPCB(args.port)
while True:
time.sleep(1)
except KeyboardInterrupt:
if sim:
sim.stop()
print("Stopped.")
# socat -d -d pty,raw,echo=0,link=/tmp/ACTION_SIM pty,raw,echo=0,link=/tmp/ACTION_USB
# python3 simulated_pcb/action.py --port /tmp/ACTION_SIM
# socat -d -d pty,raw,echo=0,link=/tmp/ODO_SIM pty,raw,echo=0,link=/tmp/ODO_USB
# python3 simulated_pcb/action.py --port /tmp/ODO_SIM