Files
modelec-marcel-ROS/simulated_pcb/odo.py
2025-12-20 22:01:40 +01:00

183 lines
5.4 KiB
Python

import serial
import time
import math
import threading
import argparse
class SimulatedPCB:
def __init__(self, port='/dev/pts/6', baud=115200):
self.ser = serial.Serial(port, baud, timeout=1)
self.running = True
self.start = False
self.x = 1225.0
self.y = 300.0
self.theta = math.pi / 2
self.vx = 0.0
self.vy = 0.0
self.vtheta = 0.0
self.last_update = time.time()
self.last_send = time.time()
self.pid = [0, 0, 0]
self.tof = {0: 1000, 1: 1000, 2: 1000}
# --- waypoint system ---
self.waypoints = {} # id -> waypoint
self.waypoint_queue = [] # FIFO execution
self.current_wp_id = None
self.thread = threading.Thread(target=self.run)
self.thread.start()
def normalize_angle(self, angle):
return (angle + math.pi) % (2 * math.pi) - math.pi
def update_position(self):
now = time.time()
dt = now - self.last_update
self.last_update = now
if self.start and self.waypoint_queue:
wp_id = self.waypoint_queue[0]
wp = self.waypoints[wp_id]
dx = wp['x'] - self.x
dy = wp['y'] - self.y
distance = math.hypot(dx, dy)
target_angle = math.atan2(dy, dx)
theta_error = self.normalize_angle(wp['theta'] - self.theta)
# --- motion ---
if wp['type'] == 1: # precise waypoint
speed = min(120.0, distance * 1)
self.vtheta = max(-6.0, min(6.0, theta_error * 2))
else: # transit waypoint
speed = min(180.0, distance * 2)
self.vtheta = 0.0
self.vx = speed * math.cos(target_angle)
self.vy = speed * math.sin(target_angle)
reached_pos = distance < 5.0
reached_theta = abs(theta_error) < 0.15 or wp['type'] == 0
if reached_pos and reached_theta:
self.vx = self.vy = self.vtheta = 0.0
self.current_wp_id = wp_id
self.ser.write(f"SET;WAYPOINT;REACH;{wp_id}\n".encode())
self.waypoint_queue.pop(0)
del self.waypoints[wp_id]
else:
self.vx = self.vy = self.vtheta = 0.0
# --- integrate ---
self.x += self.vx * dt
self.y += self.vy * dt
self.theta = self.normalize_angle(self.theta + self.vtheta * dt)
# --- periodic position ---
if self.start and now - self.last_send > 0.1:
self.ser.write(
f"SET;POS;{int(self.x)};{int(self.y)};{self.theta:.5f}\n".encode()
)
self.last_send = now
def run(self):
while self.running:
self.update_position()
if self.ser.in_waiting:
msg = self.ser.readline().decode('utf-8').strip()
print(f"[RX] {msg}")
self.handle_message(msg)
time.sleep(0.01)
def handle_message(self, msg):
if msg == "GET;POS":
self.ser.write(
f"SET;POS;{int(self.x)};{int(self.y)};{self.theta:.3f}\n".encode()
)
elif msg == "GET;SPEED":
self.ser.write(
f"SET;SPEED;{int(self.vx)};{int(self.vy)};{int(self.vtheta)}\n".encode()
)
elif msg.startswith("GET;DIST;"):
n = int(msg.split(';')[2])
dist = self.tof.get(n, 0)
self.ser.write(f"SET;DIST;{n};{dist}\n".encode())
elif msg == "GET;PID":
self.ser.write(f"SET;PID;{self.pid[0]};{self.pid[1]};{self.pid[2]}\n".encode())
elif msg.startswith("SET;PID"):
self.ser.write(b"OK;PID;1\n")
elif msg.startswith("SET;START;"):
self.start = msg.split(';')[2] == '1'
self.ser.write(b"OK;START;1\n")
# --- MULTI WAYPOINT (clears previous) ---
elif msg.startswith("SET;WAYPOINT;"):
parts = msg.split(';')[2:]
if len(parts) % 5 != 0:
self.ser.write(b"ERR;WAYPOINT;0\n")
return
# clear existing waypoints
self.waypoints.clear()
self.waypoint_queue.clear()
for i in range(0, len(parts), 5):
wp = {
'id': int(parts[i]),
'type': int(parts[i + 1]),
'x': float(parts[i + 2]),
'y': float(parts[i + 3]),
'theta': float(parts[i + 4]),
}
self.waypoints[wp['id']] = wp
self.waypoint_queue.append(wp['id'])
self.ser.write(f"OK;WAYPOINT;{wp['id']}\n".encode())
elif msg.startswith("SET;POS"):
parts = msg.split(';')
self.x = float(parts[2])
self.y = float(parts[3])
self.theta = float(parts[4])
self.ser.write(b"OK;POS;1\n")
def stop(self):
self.running = False
self.thread.join()
self.ser.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Simulated PCB")
parser.add_argument('--port', type=str, default='/dev/pts/6')
args = parser.parse_args()
sim = None
try:
sim = SimulatedPCB(port=args.port)
while True:
time.sleep(1)
except KeyboardInterrupt:
if sim:
sim.stop()
print("Simulation stopped")