obstacles + simulated pcb code in python

This commit is contained in:
acki
2025-05-02 17:37:02 -04:00
parent 7c82db862d
commit 7ffe157982
4 changed files with 177 additions and 9 deletions

29
simulated_pcb/alim.py Normal file
View File

@@ -0,0 +1,29 @@
import random
import time
import serial
# Set the parameters for the serial connection
serial_port = '/dev/pts/9' # Modify this to your serial port (e.g., 'COM3' on Windows)
baud_rate = 115200 # Modify this to your baud rate
# Open the serial connection
ser = serial.Serial(serial_port, baud_rate, timeout=1)
try:
while True:
if ser.in_waiting > 0:
data = ser.readline().decode('utf-8').strip()
if data.startswith("BAU;STATE"):
print("Received:", data)
ser.write(b'SET;BAU;STATE;1\n')
time.sleep(1)
except KeyboardInterrupt:
print("Program interrupted by user.")
finally:
# Close the serial connection
ser.close()
# socat -d -d pty,raw,echo=0 pty,raw,echo=0

147
simulated_pcb/odo.py Normal file
View File

@@ -0,0 +1,147 @@
import serial
import time
import math
import threading
class SimulatedPCB:
def __init__(self, port='/dev/pts/6', baud=115200):
self.ser = serial.Serial(port, baud, timeout=1)
self.running = True
self.start = False
self.x = 1225.0
self.y = 200.0
self.theta = math.pi / 2
self.vx = 0.0
self.vy = 0.0
self.vtheta = 0.0
self.last_update = time.time()
self.last_send = time.time()
self.pid = [0, 0, 0]
self.tof = {0: 1000, 1: 1000, 2: 1000}
self.waypoints = {} # waypoints by id
self.waypoint_order = [] # ordered list of ids
self.current_wp_id = None
self.thread = threading.Thread(target=self.run)
self.thread.start()
def update_position(self):
now = time.time()
dt = now - self.last_update
self.last_update = now
if self.start:
if self.waypoint_order:
wp = self.waypoints[self.waypoint_order[0]]
dx = wp['x'] - self.x
dy = wp['y'] - self.y
distance = math.hypot(dx, dy)
angle = math.atan2(dy, dx)
# Behavior differs by type
if wp['type'] == 1:
# Final waypoint: stop only when 100% accurate
if distance < 5.0:
self.vx = 0.0
self.vy = 0.0
self.ser.write(f'SET;WAYPOINT;{wp["id"]}\n'.encode('utf-8'))
self.current_wp_id = wp['id']
self.waypoint_order.pop(0)
del self.waypoints[wp['id']]
else:
speed = min(200.0, distance * 1.5)
self.vx = speed * math.cos(angle)
self.vy = speed * math.sin(angle)
else:
# Normal pass-through waypoint
speed = min(300.0, distance * 2)
self.vx = speed * math.cos(angle)
self.vy = speed * math.sin(angle)
if distance < 100:
self.ser.write(f'SET;WAYPOINT;{wp["id"]}\n'.encode('utf-8'))
self.current_wp_id = wp['id']
self.waypoint_order.pop(0)
del self.waypoints[wp['id']]
else:
self.vx = 0.0
self.vy = 0.0
self.x += self.vx * dt
self.y += self.vy * dt
self.theta += self.vtheta * dt
self.theta = (self.theta + math.pi) % (2 * math.pi) - math.pi
if now - self.last_send > 0.1:
self.ser.write(f'SET;POS;{int(self.x)};{int(self.y)};{self.theta:.3f}\n'.encode())
self.last_send = now
def run(self):
while self.running:
self.update_position()
if self.ser.in_waiting:
msg = self.ser.readline().decode('utf-8').strip()
print(f"[RX] {msg}")
self.handle_message(msg)
time.sleep(0.01)
def handle_message(self, msg):
if msg == "GET;POS":
self.ser.write(f'SET;POS;{int(self.x)};{int(self.y)};{self.theta:.3f}\n'.encode())
elif msg == "GET;SPEED":
self.ser.write(f'SET;SPEED;{int(self.vx)};{int(self.vy)};{int(self.vtheta)}\n'.encode())
elif msg.startswith("GET;DIST;"):
n = int(msg.split(';')[2])
dist = self.tof.get(n, 0)
self.ser.write(f'SET;DIST;{n};{dist}\n'.encode())
elif msg == "GET;PID":
self.ser.write(f'SET;PID;{self.pid[0]};{self.pid[1]};{self.pid[2]}\n'.encode())
elif msg.startswith("SET;PID"):
self.pid = list(map(int, msg.split(';')[2:5]))
self.ser.write(f'OK;PID;1\n'.encode())
elif msg.startswith("SET;START"):
self.start = msg.split(';')[2] == '1'
self.ser.write(f'OK;START;1\n'.encode())
elif msg.startswith("SET;WAYPOINT"):
parts = msg.split(';')
if len(parts) >= 7:
wp = {
'id': int(parts[2]),
'type': int(parts[3]),
'x': float(parts[4]),
'y': float(parts[5]),
'theta': float(parts[6])
}
self.waypoints[wp['id']] = wp
if wp['id'] not in self.waypoint_order:
self.waypoint_order.append(wp['id'])
self.ser.write(f'OK;WAYPOINT;{wp["id"]}\n'.encode())
elif msg.startswith("SET;POS"):
parts = msg.split(';')
self.x = float(parts[2])
self.y = float(parts[3])
self.theta = float(parts[4])
self.ser.write(b'OK;POS;1\n')
def stop(self):
self.running = False
self.thread.join()
self.ser.close()
if __name__ == '__main__':
try:
sim = SimulatedPCB()
except KeyboardInterrupt:
sim.stop()
print("Simulation stopped")