Compare commits
5 Commits
cfdb10c392
...
arcmyx-dev
| Author | SHA1 | Date | |
|---|---|---|---|
| a1fe733115 | |||
| 6ff04f3cae | |||
| b9e5bc23e2 | |||
| a1f7c39c60 | |||
| 4683b87a32 |
159
experiments/trajectory_drive.py
Normal file
159
experiments/trajectory_drive.py
Normal file
@@ -0,0 +1,159 @@
|
|||||||
|
from pybricks.hubs import PrimeHub
|
||||||
|
from pybricks.pupdevices import Motor, ColorSensor, UltrasonicSensor, ForceSensor
|
||||||
|
from pybricks.parameters import Button, Color, Direction, Port, Side, Stop, Axis
|
||||||
|
from pybricks.robotics import DriveBase
|
||||||
|
from pybricks.tools import wait, StopWatch, run_task, multitask
|
||||||
|
import umath
|
||||||
|
|
||||||
|
hub = PrimeHub()
|
||||||
|
|
||||||
|
left_motor = Motor(Port.A, Direction.COUNTERCLOCKWISE)
|
||||||
|
right_motor = Motor(Port.B, Direction.CLOCKWISE)
|
||||||
|
left_arm = Motor(Port.C, Direction.CLOCKWISE, [[12,36]], [[12,20,24]])
|
||||||
|
right_arm = Motor(Port.D, Direction.CLOCKWISE, [[12,36],[12,20,24]])
|
||||||
|
lazer_ranger = UltrasonicSensor(Port.E)
|
||||||
|
color_sensor = ColorSensor(Port.F)
|
||||||
|
|
||||||
|
WHEEL_DIAMETER = 68.8
|
||||||
|
AXLE_TRACK = 180
|
||||||
|
drive_base = DriveBase(left_motor, right_motor, WHEEL_DIAMETER, AXLE_TRACK)
|
||||||
|
drive_base.settings(600, 500, 300, 200)
|
||||||
|
drive_base.use_gyro(False)
|
||||||
|
|
||||||
|
current = {"x": 0, "y": 0, "a": 0}
|
||||||
|
target = {"x": 0, "y": 200, "a": 0}
|
||||||
|
done = False
|
||||||
|
|
||||||
|
def addtoposition(a, r):
|
||||||
|
current["x"] += r * umath.sin(umath.radians(a))
|
||||||
|
current["y"] += r * umath.cos(umath.radians(a))
|
||||||
|
current["a"] = a
|
||||||
|
|
||||||
|
async def track_heading():
|
||||||
|
while True:
|
||||||
|
current["a"] = hub.imu.heading()
|
||||||
|
await wait(20)
|
||||||
|
|
||||||
|
async def printStats():
|
||||||
|
last_dist = 0
|
||||||
|
while True:
|
||||||
|
if done:
|
||||||
|
break
|
||||||
|
current_dist = drive_base.distance()
|
||||||
|
instant_dist = current_dist - last_dist
|
||||||
|
last_dist = current_dist
|
||||||
|
|
||||||
|
current["x"] += instant_dist * umath.sin(umath.radians(current["a"]))
|
||||||
|
current["y"] += instant_dist * umath.cos(umath.radians(current["a"]))
|
||||||
|
|
||||||
|
dx = target["x"] - current["x"]
|
||||||
|
dy = target["y"] - current["y"]
|
||||||
|
remaining = umath.sqrt(dx*dx + dy*dy)
|
||||||
|
desired_a = 90 - umath.degrees(umath.atan2(dy, dx))
|
||||||
|
print("x:", current["x"], "y:", current["y"], "desired_a:", desired_a, "current_a:", current["a"], "remaining:", remaining)
|
||||||
|
|
||||||
|
await wait(50)
|
||||||
|
|
||||||
|
async def pid_turn(target_angle):
|
||||||
|
KP = 3
|
||||||
|
KI = 0.01
|
||||||
|
KD = 1.5
|
||||||
|
SETTLE_THRESHOLD = 0.5
|
||||||
|
SETTLE_TIME = 500
|
||||||
|
|
||||||
|
integral = 0
|
||||||
|
last_error = 0
|
||||||
|
settled_timer = StopWatch()
|
||||||
|
settled = False
|
||||||
|
|
||||||
|
while True:
|
||||||
|
error = target_angle - current["a"]
|
||||||
|
while error > 180: error -= 360
|
||||||
|
while error < -180: error += 360
|
||||||
|
|
||||||
|
integral += error
|
||||||
|
integral = max(-100, min(100, integral))
|
||||||
|
derivative = error - last_error
|
||||||
|
last_error = error
|
||||||
|
|
||||||
|
turn_rate = KP * error + KI * integral + KD * derivative
|
||||||
|
turn_rate = max(-200, min(200, turn_rate))
|
||||||
|
|
||||||
|
drive_base.drive(0, turn_rate)
|
||||||
|
|
||||||
|
if abs(error) < SETTLE_THRESHOLD:
|
||||||
|
if not settled:
|
||||||
|
settled = True
|
||||||
|
settled_timer.reset()
|
||||||
|
elif settled_timer.time() > SETTLE_TIME:
|
||||||
|
drive_base.stop()
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
settled = False
|
||||||
|
|
||||||
|
await wait(20)
|
||||||
|
|
||||||
|
async def driveTrajectory():
|
||||||
|
global done
|
||||||
|
ARRIVE_THRESHOLD = 30 # give stage 2 more room to work
|
||||||
|
FINAL_THRESHOLD = 7
|
||||||
|
SPEED = 400
|
||||||
|
SLOW_SPEED = 30
|
||||||
|
TURN_GAIN = 3.0
|
||||||
|
|
||||||
|
# stage 1
|
||||||
|
while True:
|
||||||
|
dx = target["x"] - current["x"]
|
||||||
|
dy = target["y"] - current["y"]
|
||||||
|
remaining = umath.sqrt(dx*dx + dy*dy)
|
||||||
|
if remaining < ARRIVE_THRESHOLD:
|
||||||
|
drive_base.stop()
|
||||||
|
break
|
||||||
|
desired_a = 90 - umath.degrees(umath.atan2(dy, dx))
|
||||||
|
error = desired_a - current["a"]
|
||||||
|
while error > 180: error -= 360
|
||||||
|
while error < -180: error += 360
|
||||||
|
turn_rate = max(-200, min(200, error * TURN_GAIN))
|
||||||
|
# in stage 1, scale speed down as it gets closer
|
||||||
|
speed = max(150, SPEED * min(1, remaining / 300))
|
||||||
|
drive_base.drive(speed, turn_rate)
|
||||||
|
await wait(20)
|
||||||
|
|
||||||
|
print("Stage 1 done")
|
||||||
|
|
||||||
|
# stage 2
|
||||||
|
while True:
|
||||||
|
dx = target["x"] - current["x"]
|
||||||
|
dy = target["y"] - current["y"]
|
||||||
|
remaining = umath.sqrt(dx*dx + dy*dy)
|
||||||
|
if remaining < FINAL_THRESHOLD:
|
||||||
|
drive_base.stop()
|
||||||
|
break
|
||||||
|
a_rad = umath.radians(current["a"])
|
||||||
|
forward_x = umath.sin(a_rad)
|
||||||
|
forward_y = umath.cos(a_rad)
|
||||||
|
signed_dist = dx * forward_x + dy * forward_y
|
||||||
|
speed = SLOW_SPEED if signed_dist > 0 else -SLOW_SPEED
|
||||||
|
drive_base.drive(speed, 0)
|
||||||
|
await wait(20)
|
||||||
|
|
||||||
|
drive_base.stop()
|
||||||
|
print("Stage 2 done")
|
||||||
|
|
||||||
|
# stage 3 — PID angle correction
|
||||||
|
await pid_turn(target["a"])
|
||||||
|
|
||||||
|
done = True
|
||||||
|
print("Done! x:", current["x"], "y:", current["y"], "a:", current["a"])
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
drive_base.reset()
|
||||||
|
await multitask(driveTrajectory(), track_heading(), printStats())
|
||||||
|
|
||||||
|
hub.imu.reset_heading(0)
|
||||||
|
while True:
|
||||||
|
if hub.imu.ready():
|
||||||
|
break
|
||||||
|
|
||||||
|
run_task(main())
|
||||||
|
print("done")
|
||||||
@@ -1,14 +1,18 @@
|
|||||||
# Event codes:
|
# Event codes:
|
||||||
#0: notify
|
#0: notify
|
||||||
#1: prgm_start
|
#1: hfnotify
|
||||||
#2: prgm_end
|
#2: prgm_start
|
||||||
#3: prgm_crash
|
#3: prgm_end
|
||||||
#4: snsr_data
|
#4: prgm_crash
|
||||||
#5: mtr_data
|
#5: snsr_data
|
||||||
#6: perf_smpl
|
#6: mtr_data
|
||||||
#7: get_time
|
#7: perf_smpl
|
||||||
#8: breakpoint
|
#8: get_time
|
||||||
lvldict = {
|
#9: breakpoint
|
||||||
|
class PynamicsLogger:
|
||||||
|
def __init__(self):
|
||||||
|
self.verboseness = 7
|
||||||
|
self.lvldict = {
|
||||||
0: "FATAL",
|
0: "FATAL",
|
||||||
1: "ALERT",
|
1: "ALERT",
|
||||||
2: "CRIT",
|
2: "CRIT",
|
||||||
@@ -18,8 +22,8 @@ lvldict = {
|
|||||||
6: "INFO",
|
6: "INFO",
|
||||||
7: "DEBUG"
|
7: "DEBUG"
|
||||||
}
|
}
|
||||||
stpwtchtime = StopWatch()
|
self.time = StopWatch()
|
||||||
stpwtchtime.pause()
|
self.time.pause()
|
||||||
|
|
||||||
def start():
|
def start():
|
||||||
self.time.reset()
|
self.time.reset()
|
||||||
@@ -33,32 +37,38 @@ def log(self, message, level, origin):
|
|||||||
(ms // 1000) % 60,
|
(ms // 1000) % 60,
|
||||||
ms % 1000
|
ms % 1000
|
||||||
)
|
)
|
||||||
label = self.lvldict.get(level, "UNKNOWN")
|
label = self.lvldict[level]
|
||||||
padding = " " * (7 - len(label))
|
padding = " " * (7 - len(label))
|
||||||
print("[{}] {}{} [{}] {}".format(timestamp, label, padding, origin, message))
|
print(f"[{timestamp}] [{label}]{padding} [{origin}] {message}")
|
||||||
|
|
||||||
def sendCommand(eventnum, level, msg, origin):
|
def sendCommand(self, eventnum, level, msg, origin):
|
||||||
print(f"\x1b[?PYN;{str(eventnum)};{lvldict[level]};{msg}~")
|
print(f"\x1b[?PYN;{str(eventnum)};{lvldict[level]};{msg}~")
|
||||||
def notify(level, msg, origin):
|
def notify(level, msg, origin):
|
||||||
print(f"\x1b[?PYN;0;{lvldict[level]};{msg}~")
|
self.sendCommand(self, 0, level, msg, origin)
|
||||||
def fatal(self, message, origin): notify(message, 0, origin)
|
def notifyfatal(self, message, origin): notify(message, 0, origin)
|
||||||
def alert(self, message, origin): notify(message, 1, origin)
|
def notifyalert(self, message, origin): notify(message, 1, origin)
|
||||||
def crit(self, message, origin): notify(message, 2, origin)
|
def notifycrit(self, message, origin): notify(message, 2, origin)
|
||||||
def err(self, message, origin): notify(message, 3, origin)
|
def notifyerr(self, message, origin): notify(message, 3, origin)
|
||||||
def warning(self, message, origin): notify(message, 4, origin)
|
def notifywarning(self, message, origin): notify(message, 4, origin)
|
||||||
def notice(self, message, origin): notify(message, 5, origin)
|
def notifynotice(self, message, origin): notify(message, 5, origin)
|
||||||
def info(self, message, origin): notify(message, 6, origin)
|
def notifyinfo(self, message, origin): notify(message, 6, origin)
|
||||||
def debug(self, message, origin): notify(message, 7, origin)
|
def notifydebug(self, message, origin): notify(message, 7, origin)
|
||||||
def crash(self, message, origin):
|
def notifycrash(self, message, origin):
|
||||||
sendCommand(3, 0, "program kinda crashed bc you suck at coding", origin)
|
sendCommand(4, 0, "uhhhh so the program kinda crashed sorry", origin)
|
||||||
raise FatalLoggerError("[FATAL] [{}] {}".format(origin, message))
|
raise FatalLoggerError("[FATAL] [{}] {}".format(origin, message))
|
||||||
sendCommand(0, 0, "hey you should probably know that your RAM is corrupted and you cant buy more because of the shortage. byeeeee", "test prgm")
|
|
||||||
sendCommand(0, 1, "UNRECOVERABLE PYTHON ERROR!!!!! Exiting gracefully... JUST KIDDING WHAT DID YOU THINK IT WAS GONNA DO", "test prgm")
|
def breakpoint(self, message, origin):
|
||||||
sendCommand(0, 2, "so the program is running but it just started looping on the motor frying bit of your program.", "test prgm")
|
sendCommand(3, 8, "currently everything about the robot")
|
||||||
sendCommand(0, 3, "syntax error, you failure!!!! use an error checker", "test prgm")
|
def testAll(self):
|
||||||
sendCommand(0, 4, "wanted to just say that your motor is kinda being slow", "test prgm")
|
self.start()
|
||||||
sendCommand(0, 5, "so your motor is like 1% slower than it should be but really no one cares", "test prgm")
|
notifyfatal("hey you should probably know that your RAM is corrupted and you cant replace it because of the shortage. byeeeee", "test prgm")
|
||||||
sendCommand(0, 6, "hey everything in the program is going well just in case nothing in your life is", "test prgm")
|
notifyalert(0, 1, "UNRECOVERABLE PYTHON ERROR!!!!! Exiting gracefully... JUST KIDDING WHAT DID YOU THINK IT WAS GONNA DO", "test prgm")
|
||||||
sendCommand(0, 7, "nobody cares about me but you should know that", "test prgm")
|
notifycrit(0, 2, "so the program is running but it just started looping on the motor frying bit of your program.", "test prgm")
|
||||||
def breakpoint():
|
notifyerr(0, 3, "syntax error, you failure!!!! use an error checker", "test prgm")
|
||||||
#send a command 8 and also get all important info about the code here and print
|
notifywarning(0, 4, "wanted to just say that your motor is kinda being slow", "test prgm")
|
||||||
|
notifynotice(0, 5, "so your motor is like 1% slower than it should be but really no one cares", "test prgm")
|
||||||
|
notifyinfo(0, 6, "hey everything in the program is going well just in case nothing in your life is", "test prgm")
|
||||||
|
notifydebug(0, 7, "nobody cares about me but you should know that", "test prgm")
|
||||||
|
if __name__ == "__main__":
|
||||||
|
pynlogger = PynamicsLogger()
|
||||||
|
pynlogger.testAll()
|
||||||
|
|||||||
Reference in New Issue
Block a user