Files
2025-10-03 13:26:40 +02:00

139 lines
4.6 KiB
Python

import argparse
import serial
import time
import math
import threading
class SimulatedPCB:
def __init__(self, port='/dev/pts/6', baud=115200):
self.ser = serial.Serial(port, baud, timeout=1)
self.running = True
self.last_update = time.time()
# État simulé
self.servos = {0: 0, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, } # {id: pos_index}
self.servo_angles = {0: {0: 0, 1: 0, 3: 0}, 1: {0: 0, 1: 0, 3: 0}, 2: {0: 0, 1: 0, 3: 0}, 3: {0: 0, 1: 0, 3: 0}, 4: {0: 0, 1: 0, 3: 0}, 5: {0: 0, 1: 0, 3: 0}, } # {id: {pos_index: angle}}
self.relais = {0: 0, 1: 0, 2: 0}
self.ascenseur_pos = 0
self.fin_course = {} # à implémenter si besoin
self.tirette_arm = False
self.thread = threading.Thread(target=self.run)
self.thread.start()
def send_response(self, msg):
print(f"[TX] {msg}")
self.ser.write((msg + "\n").encode())
def process_command(self, line):
parts = line.strip().split(';')
if not parts:
return
cmd = parts[0]
if cmd == "GET":
self.handle_get(parts)
elif cmd == "SET":
self.handle_set(parts)
elif cmd == "MOV":
self.handle_mov(parts)
def handle_get(self, parts):
if len(parts) != 3:
return
category, data = parts[1], parts[2]
if category.startswith("SERVO") and data == "POS":
sid = int(category[5:])
pos = self.servos.get(sid, 0)
self.send_response(f"SET;{category};{data};{pos}")
elif category == "ASC" and data == "POS":
self.send_response(f"SET;ASC;POS;{self.ascenseur_pos}")
elif category.startswith("RELAY") and data == "STATE":
rid = int(category[5:])
state = self.relais.get(rid, 0)
self.send_response(f"SET;{category};{data};{state}")
def handle_set(self, parts):
if len(parts) != 4:
return
category, data, val = parts[1], parts[2], parts[3]
if category.startswith("SERVO") and data.startswith("POS"):
sid = int(category[5:])
pos_index = int(data[3:])
angle = float(val)
if sid not in self.servo_angles:
self.servo_angles[sid] = {}
self.servo_angles[sid][pos_index] = angle
self.send_response(f"OK;{category};{data};{val}")
elif category == "ASC":
self.send_response(f"OK;ASC;{data};{val}")
else:
self.send_response(f"KO;{category};{data};{val}")
def handle_mov(self, parts):
if len(parts) != 3:
return
category, data = parts[1], parts[2]
if category == "ASC":
self.ascenseur_pos = data
self.send_response(f"OK;{category};{data}")
time.sleep(.5)
elif category.startswith("SERVO") and data.startswith("POS"):
sid = int(category[5:])
pos_index = int(data[3:])
if sid in self.servo_angles and pos_index in self.servo_angles[sid]:
time.sleep(0.1)
self.servos[sid] = pos_index
self.send_response(f"OK;{category};{data}")
else:
self.send_response(f"KO;{category};{data}")
elif category.startswith("RELAY"):
rid = int(category[5:])
self.relais[rid] = 1 if data == "1" else 0
time.sleep(.1)
self.send_response(f"OK;{category};{data}")
else:
self.send_response(f"KO;{category};{data}")
def run(self):
while self.running:
if self.ser.in_waiting:
msg = self.ser.readline().decode().strip()
print(f"[RX] {msg}")
self.process_command(msg)
time.sleep(0.1)
def stop(self):
self.running = False
self.thread.join()
self.ser.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Simulated PCB")
parser.add_argument('--port', type=str, default='/tmp/USB_ACTION_SIM', help='Serial port to use')
args = parser.parse_args()
sim = None
try:
sim = SimulatedPCB(port=args.port)
except KeyboardInterrupt:
if sim:
sim.stop()
print("Simulation stopped")
# socat -d -d pty,raw,echo=0,link=/tmp/MARCEL_ACTION_SIM pty,raw,echo=0,link=/tmp/MARCEL_ACTION
# python3 simulated_pcb/action.py --port /tmp/MARCEL_ACTION_SIM
# socat -d -d pty,raw,echo=0,link=/tmp/ENEMY_ACTION_SIM pty,raw,echo=0,link=/tmp/ENEMY_ACTION
# python3 simulated_pcb/action.py --port /tmp/ENEMY_ACTION_SIM