mirror of
https://github.com/modelec/modelec-marcel-ROS.git
synced 2026-01-19 00:57:37 +01:00
217 lines
6.5 KiB
Python
217 lines
6.5 KiB
Python
import argparse
|
|
import serial
|
|
import threading
|
|
import time
|
|
|
|
class SimulatedPCB:
|
|
def __init__(self, port='/tmp/ACTION_SIM', baud=115200):
|
|
self.ser = serial.Serial(port, baud, timeout=0.1)
|
|
self.running = True
|
|
|
|
# State identical to STM32 getters
|
|
self.servo_angles = {i: 0.0 for i in range(16)} # angle as float
|
|
self.relay_state = {i: 0 for i in range(16)} # 0/1
|
|
self.tirette_armed = False
|
|
|
|
self.thread = threading.Thread(target=self.run)
|
|
self.thread.start()
|
|
|
|
def tx(self, msg):
|
|
print(f"[TX] {msg.strip()}")
|
|
self.ser.write((msg + "\n").encode())
|
|
|
|
# ----------------------------------------------------------------------
|
|
# STM32-LIKE FUNCTIONS
|
|
# ----------------------------------------------------------------------
|
|
def getServoAngle(self, sid):
|
|
return float(self.servo_angles.get(sid, 0.0))
|
|
|
|
def getRelayState(self, rid):
|
|
return int(self.relay_state.get(rid, 0))
|
|
|
|
def moveServo(self, sid, angle):
|
|
self.servo_angles[sid] = float(angle)
|
|
|
|
def moveRelay(self, rid, state):
|
|
self.relay_state[rid] = 1 if int(state) else 0
|
|
|
|
# ----------------------------------------------------------------------
|
|
# COMMAND PARSER (as close as possible to STM32 version)
|
|
# ----------------------------------------------------------------------
|
|
def process(self, line):
|
|
print(f"[RX] {line}")
|
|
parts = line.split(";")
|
|
if len(parts) < 2:
|
|
return
|
|
|
|
cmd = parts[0]
|
|
|
|
if cmd == "GET":
|
|
self.handle_get(parts)
|
|
elif cmd == "SET":
|
|
self.handle_set(parts)
|
|
elif cmd == "MOV":
|
|
self.handle_mov(parts)
|
|
elif cmd == "ACK":
|
|
# simulate ACK reception ignore for now
|
|
pass
|
|
|
|
# ----------------------------------------------------------------------
|
|
# GET handler (SERVO POS / RELAY STATE)
|
|
# ----------------------------------------------------------------------
|
|
def handle_get(self, p):
|
|
# GET;SERVO;POS;n;id...
|
|
if p[1] == "SERVO" and p[2] == "POS":
|
|
if len(p) < 4:
|
|
self.tx("KO;SET;SERVO;POS")
|
|
return
|
|
|
|
n = int(p[3])
|
|
if n <= 0:
|
|
self.tx("SET;SERVO;POS;0")
|
|
return
|
|
|
|
resp = f"SET;SERVO;POS;{n}"
|
|
idx = 4
|
|
for _ in range(n):
|
|
if idx >= len(p):
|
|
self.tx("KO;SET;SERVO;POS")
|
|
return
|
|
sid = int(p[idx])
|
|
idx += 1
|
|
angle = self.getServoAngle(sid)
|
|
resp += f";{sid};{angle:.4f}"
|
|
|
|
self.tx(resp)
|
|
return
|
|
|
|
# GET;RELAY;STATE;n;id...
|
|
if p[1] == "RELAY" and p[2] == "STATE":
|
|
if len(p) < 4:
|
|
self.tx("KO;SET;RELAY;STATE")
|
|
return
|
|
n = int(p[3])
|
|
if n <= 0:
|
|
self.tx("SET;RELAY;STATE;0")
|
|
return
|
|
|
|
resp = f"SET;RELAY;STATE;{n}"
|
|
idx = 4
|
|
for _ in range(n):
|
|
if idx >= len(p):
|
|
self.tx("KO;SET;RELAY;STATE")
|
|
return
|
|
rid = int(p[idx])
|
|
idx += 1
|
|
state = self.getRelayState(rid)
|
|
resp += f";{rid};{state}"
|
|
|
|
self.tx(resp)
|
|
return
|
|
|
|
# ----------------------------------------------------------------------
|
|
# SET handler (only TIR;ARM)
|
|
# ----------------------------------------------------------------------
|
|
def handle_set(self, p):
|
|
if p[1] == "TIR" and p[2] == "ARM":
|
|
if len(p) < 4:
|
|
self.tx("KO;TIR;ARM")
|
|
return
|
|
|
|
val = int(p[3])
|
|
self.tirette_armed = bool(val)
|
|
self.tx(f"OK;TIR;ARM;{val}")
|
|
return
|
|
|
|
self.tx(f"KO;{p[1]};{p[2]}")
|
|
|
|
# ----------------------------------------------------------------------
|
|
# MOV handler (SERVO multi / RELAY multi)
|
|
# ----------------------------------------------------------------------
|
|
def handle_mov(self, p):
|
|
# MOV;SERVO;n;id;angle;id;angle...
|
|
if p[1] == "SERVO":
|
|
n = int(p[2])
|
|
idx = 3
|
|
|
|
resp = f"OK;SERVO;{n}"
|
|
for _ in range(n):
|
|
if idx + 1 >= len(p):
|
|
self.tx(f"KO;MOV;SERVO;{p[2]}")
|
|
return
|
|
|
|
sid = int(p[idx])
|
|
ang = float(p[idx+1])
|
|
idx += 2
|
|
|
|
self.moveServo(sid, ang)
|
|
resp += f";{sid};{ang}"
|
|
|
|
self.tx(resp)
|
|
return
|
|
|
|
# MOV;RELAY;n;id;state...
|
|
if p[1] == "RELAY":
|
|
n = int(p[2])
|
|
idx = 3
|
|
|
|
resp = f"OK;RELAY;{n}"
|
|
for _ in range(n):
|
|
if idx + 1 >= len(p):
|
|
self.tx(f"KO;MOV;RELAY;{p[2]}")
|
|
return
|
|
|
|
rid = int(p[idx])
|
|
st = int(p[idx+1])
|
|
idx += 2
|
|
|
|
self.moveRelay(rid, st)
|
|
resp += f";{rid};{st}"
|
|
|
|
self.tx(resp)
|
|
return
|
|
|
|
self.tx(f"KO;MOV;{p[1]}")
|
|
|
|
# ----------------------------------------------------------------------
|
|
# Loop
|
|
# ----------------------------------------------------------------------
|
|
def run(self):
|
|
buf = b""
|
|
while self.running:
|
|
try:
|
|
data = self.ser.read(64)
|
|
if data:
|
|
buf += data
|
|
while b"\n" in buf:
|
|
line, buf = buf.split(b"\n", 1)
|
|
self.process(line.decode().strip())
|
|
except Exception as e:
|
|
print(f"Serial error: {e}")
|
|
time.sleep(0.2)
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
self.thread.join()
|
|
self.ser.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--port", type=str, default="/tmp/ACTION_SIM")
|
|
args = parser.parse_args()
|
|
sim = None
|
|
try:
|
|
sim = SimulatedPCB(args.port)
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
if sim:
|
|
sim.stop()
|
|
print("Stopped.")
|
|
|
|
# socat -d -d pty,raw,echo=0,link=/tmp/ACTION_SIM pty,raw,echo=0,link=/tmp/ACTION_USB
|
|
# python3 simulated_pcb/action.py --port /tmp/ACTION_SIM
|
|
# socat -d -d pty,raw,echo=0,link=/tmp/ODO_SIM pty,raw,echo=0,link=/tmp/ODO_USB
|
|
# python3 simulated_pcb/action.py --port /tmp/ODO_SIM
|