338 lines
15 KiB
Python
338 lines
15 KiB
Python
from pybricks.parameters import Port
|
|
from uerrno import EAGAIN, EBUSY, ECANCELED, EINVAL, EIO, ENODEV, EOPNOTSUPP, EPERM, ETIMEDOUT
|
|
import uio
|
|
from pybricks.iodevices import UARTDevice as _UARTDevice
|
|
from pybricks.tools import wait, multitask, run_task
|
|
|
|
|
|
class FakeUART:
|
|
def __init__(self, port, baudrate, timeout):
|
|
self.timeout = timeout
|
|
self._force_error = None
|
|
print("Warning: No physical UART detected. Using simulator.")
|
|
|
|
def set_error(self, errno):
|
|
self._force_error = errno
|
|
|
|
def read(self, length=1):
|
|
if self._force_error is not None:
|
|
err = self._force_error
|
|
self._force_error = None
|
|
raise OSError(err)
|
|
if self.timeout is not None:
|
|
wait(self.timeout)
|
|
raise OSError(ETIMEDOUT)
|
|
else:
|
|
while True:
|
|
wait(1000)
|
|
|
|
def write(self, data):
|
|
if self._force_error is not None:
|
|
err = self._force_error
|
|
self._force_error = None
|
|
raise OSError(err)
|
|
|
|
|
|
def UARTDevice(port, baudrate=9600, timeout=None):
|
|
try:
|
|
return _UARTDevice(port, baudrate, timeout)
|
|
except OSError:
|
|
return FakeUART(port, baudrate, timeout)
|
|
|
|
|
|
class OSDiagnostics:
|
|
def __init__(self, hub, motorclass):
|
|
self.hub = hub
|
|
self.motorclass = motorclass
|
|
self.successfultests = 0
|
|
self.failedtests = {}
|
|
def testUerrno(self):
|
|
uerrnotestobject = UerrnoTest(self.hub, self.motorclass)
|
|
uerrnotestobject.testeagain()
|
|
uerrnotestobject.testebusy()
|
|
uerrnotestobject.testecanceled()
|
|
uerrnotestobject.testeinval()
|
|
uerrnotestobject.testeio()
|
|
uerrnotestobject.testenodev()
|
|
uerrnotestobject.testeopnotsupp()
|
|
uerrnotestobject.testeperm()
|
|
uerrnotestobject.testetimedout()
|
|
uerrnotestobject.print_results()
|
|
self.successfultests += uerrnotestobject.successfultests
|
|
self.failedtests.update(uerrnotestobject.failedtests)
|
|
def testUIO(self):
|
|
uiotestobject = UIOTest(self.hub, self.motorclass)
|
|
uiotestobject.print_results()
|
|
self.successfultests += uiotestobject.successfultests
|
|
self.failedtests.update(uiotestobject.failedtests)
|
|
|
|
|
|
|
|
class UerrnoTest:
|
|
def __init__(self, hub, motorclass):
|
|
self.hub = hub
|
|
self.motorclass = motorclass
|
|
self.successfultests = 0
|
|
self.failedtests = {}
|
|
def testeagain(self):
|
|
# Triggered by calling multitask() nested inside another multitask()
|
|
print("Starting Test 1/9: EAGAIN - Try Again Error")
|
|
try:
|
|
async def inner():
|
|
await multitask(wait(100)) # nested multitask raises EAGAIN
|
|
|
|
async def outer():
|
|
await multitask(inner())
|
|
|
|
run_task(outer())
|
|
print("No error raised.\nCompleted Test 1/9: EAGAIN - FAILED")
|
|
self.failedtests["EAGAIN"] = "No error raised"
|
|
except OSError as ex:
|
|
if ex.errno == EAGAIN:
|
|
print("EAGAIN can be thrown and caught.\nCompleted Test 1/9: EAGAIN - SUCCESSFUL")
|
|
self.successfultests += 1
|
|
elif ex.errno == EIO:
|
|
print("An unspecified error occurred (EIO).\nCompleted Test 1/9: EAGAIN - FAILED")
|
|
self.failedtests["EAGAIN"] = "EIO - Unspecified Error"
|
|
else:
|
|
print(f"Another error occurred with code: {ex.errno}.\nCompleted Test 1/9: EAGAIN - FAILED")
|
|
self.failedtests["EAGAIN"] = ex.errno
|
|
|
|
def testebusy(self):
|
|
# No reliable hardware trigger; use FakeUART
|
|
print("Starting Test 2/9: EBUSY - Device Busy Error")
|
|
uart = UARTDevice(Port.A, baudrate=9600, timeout=1000)
|
|
uart.set_error(EBUSY)
|
|
try:
|
|
uart.read(1)
|
|
print("No error raised.\nCompleted Test 2/9: EBUSY - FAILED")
|
|
self.failedtests["EBUSY"] = "No error raised"
|
|
except OSError as ex:
|
|
if ex.errno == EBUSY:
|
|
print("EBUSY can be thrown and caught.\nCompleted Test 2/9: EBUSY - SUCCESSFUL")
|
|
self.successfultests += 1
|
|
elif ex.errno == EIO:
|
|
print("An unspecified error occurred (EIO).\nCompleted Test 2/9: EBUSY - FAILED")
|
|
self.failedtests["EBUSY"] = "EIO - Unspecified Error"
|
|
else:
|
|
print(f"Another error occurred with code: {ex.errno}.\nCompleted Test 2/9: EBUSY - FAILED")
|
|
self.failedtests["EBUSY"] = ex.errno
|
|
|
|
def testecanceled(self):
|
|
# No reliable hardware trigger; use FakeUART
|
|
print("Starting Test 3/9: ECANCELED - Operation Canceled Error")
|
|
uart = UARTDevice(Port.A, baudrate=9600, timeout=1000)
|
|
uart.set_error(ECANCELED)
|
|
try:
|
|
uart.read(1)
|
|
print("No error raised.\nCompleted Test 3/9: ECANCELED - FAILED")
|
|
self.failedtests["ECANCELED"] = "No error raised"
|
|
except OSError as ex:
|
|
if ex.errno == ECANCELED:
|
|
print("ECANCELED can be thrown and caught.\nCompleted Test 3/9: ECANCELED - SUCCESSFUL")
|
|
self.successfultests += 1
|
|
elif ex.errno == EIO:
|
|
print("An unspecified error occurred (EIO).\nCompleted Test 3/9: ECANCELED - FAILED")
|
|
self.failedtests["ECANCELED"] = "EIO - Unspecified Error"
|
|
else:
|
|
print(f"Another error occurred with code: {ex.errno}.\nCompleted Test 3/9: ECANCELED - FAILED")
|
|
self.failedtests["ECANCELED"] = ex.errno
|
|
|
|
def testeinval(self):
|
|
# Triggered by passing an out-of-range value to motor.control.limits()
|
|
print("Starting Test 4/9: EINVAL - Invalid Argument Error")
|
|
input("Plug a motor into Port A, then press Enter.")
|
|
try:
|
|
motor = self.motorclass(Port.A)
|
|
motor.control.limits(speed=9999999, acceleration=9999999)
|
|
print("No error raised.\nCompleted Test 4/9: EINVAL - FAILED")
|
|
self.failedtests["EINVAL"] = "No error raised"
|
|
except OSError as ex:
|
|
if ex.errno == EINVAL:
|
|
print("EINVAL can be thrown and caught.\nCompleted Test 4/9: EINVAL - SUCCESSFUL")
|
|
self.successfultests += 1
|
|
elif ex.errno == EIO:
|
|
print("An unspecified error occurred (EIO).\nCompleted Test 4/9: EINVAL - FAILED")
|
|
self.failedtests["EINVAL"] = "EIO - Unspecified Error"
|
|
else:
|
|
print(f"Another error occurred with code: {ex.errno}.\nCompleted Test 4/9: EINVAL - FAILED")
|
|
self.failedtests["EINVAL"] = ex.errno
|
|
|
|
def testeio(self):
|
|
# No reliable scriptable trigger (requires physical unplug); use FakeUART
|
|
print("Starting Test 5/9: EIO - I/O Error")
|
|
uart = UARTDevice(Port.A, baudrate=9600, timeout=1000)
|
|
uart.set_error(EIO)
|
|
try:
|
|
uart.read(1)
|
|
print("No error raised.\nCompleted Test 5/9: EIO - FAILED")
|
|
self.failedtests["EIO"] = "No error raised"
|
|
except OSError as ex:
|
|
if ex.errno == EIO:
|
|
print("EIO can be thrown and caught.\nCompleted Test 5/9: EIO - SUCCESSFUL")
|
|
self.successfultests += 1
|
|
else:
|
|
print(f"Another error occurred with code: {ex.errno}.\nCompleted Test 5/9: EIO - FAILED")
|
|
self.failedtests["EIO"] = ex.errno
|
|
|
|
def testenodev(self):
|
|
# Triggered by initializing a motor on an empty port
|
|
print("Starting Test 6/9: ENODEV - Device Not Found Error")
|
|
input("Make sure Port A doesn't have anything plugged in, then press Enter.")
|
|
try:
|
|
my_motor = self.motorclass(Port.A)
|
|
print("OS detected a motor when there was nothing. You may have allowed missing motors. This is useful for debugging, but not recommended for production as it can cause issues with device control.")
|
|
self.failedtests["ENODEV"] = "No error raised"
|
|
except OSError as ex:
|
|
if ex.errno == ENODEV:
|
|
print("There is no motor on this port. ENODEV can be thrown and caught.\nCompleted Test 6/9: ENODEV - SUCCESSFUL")
|
|
self.successfultests += 1
|
|
elif ex.errno == EIO:
|
|
print("An unspecified error occurred (EIO).\nCompleted Test 6/9: ENODEV - FAILED")
|
|
self.failedtests["ENODEV"] = "EIO - Unspecified Error"
|
|
else:
|
|
print(f"Another error occurred with code: {ex.errno}.\nCompleted Test 6/9: ENODEV - FAILED")
|
|
self.failedtests["ENODEV"] = ex.errno
|
|
|
|
def testeopnotsupp(self):
|
|
# No reliable scriptable trigger without specific hardware; use FakeUART
|
|
print("Starting Test 7/9: EOPNOTSUPP - Operation Not Supported Error")
|
|
uart = UARTDevice(Port.A, baudrate=9600, timeout=1000)
|
|
uart.set_error(EOPNOTSUPP)
|
|
try:
|
|
uart.read(1)
|
|
print("No error raised.\nCompleted Test 7/9: EOPNOTSUPP - FAILED")
|
|
self.failedtests["EOPNOTSUPP"] = "No error raised"
|
|
except OSError as ex:
|
|
if ex.errno == EOPNOTSUPP:
|
|
print("EOPNOTSUPP can be thrown and caught.\nCompleted Test 7/9: EOPNOTSUPP - SUCCESSFUL")
|
|
self.successfultests += 1
|
|
elif ex.errno == EIO:
|
|
print("An unspecified error occurred (EIO).\nCompleted Test 7/9: EOPNOTSUPP - FAILED")
|
|
self.failedtests["EOPNOTSUPP"] = "EIO - Unspecified Error"
|
|
else:
|
|
print(f"Another error occurred with code: {ex.errno}.\nCompleted Test 7/9: EOPNOTSUPP - FAILED")
|
|
self.failedtests["EOPNOTSUPP"] = ex.errno
|
|
|
|
def testeperm(self):
|
|
# Triggered by calling motor.control.limits() while a motor is actively running
|
|
print("Starting Test 8/9: EPERM - Operation Not Permitted Error")
|
|
input("Plug a motor into Port A, then press Enter.")
|
|
try:
|
|
motor = self.motorclass(Port.A)
|
|
motor.run(500)
|
|
wait(50)
|
|
motor.control.limits(speed=500, acceleration=1000)
|
|
print("No error raised.\nCompleted Test 8/9: EPERM - FAILED")
|
|
self.failedtests["EPERM"] = "No error raised"
|
|
except OSError as ex:
|
|
if ex.errno == EPERM:
|
|
print("EPERM can be thrown and caught.\nCompleted Test 8/9: EPERM - SUCCESSFUL")
|
|
self.successfultests += 1
|
|
elif ex.errno == EIO:
|
|
print("An unspecified error occurred (EIO).\nCompleted Test 8/9: EPERM - FAILED")
|
|
self.failedtests["EPERM"] = "EIO - Unspecified Error"
|
|
else:
|
|
print(f"Another error occurred with code: {ex.errno}.\nCompleted Test 8/9: EPERM - FAILED")
|
|
self.failedtests["EPERM"] = ex.errno
|
|
finally:
|
|
try:
|
|
motor.stop()
|
|
except Exception:
|
|
pass
|
|
|
|
def testetimedout(self):
|
|
# Triggered by FakeUART (or real UART) timing out on read
|
|
print("Starting Test 9/9: ETIMEDOUT - Timed Out Error")
|
|
uart = UARTDevice(Port.A, baudrate=9600, timeout=1000)
|
|
try:
|
|
data = uart.read(10)
|
|
print("No error raised.\nCompleted Test 9/9: ETIMEDOUT - FAILED")
|
|
self.failedtests["ETIMEDOUT"] = "No error raised"
|
|
except OSError as ex:
|
|
if ex.errno == ETIMEDOUT:
|
|
print("Timed out with synthetic UART device. ETIMEDOUT can be thrown and caught.\nCompleted Test 9/9: ETIMEDOUT - SUCCESSFUL")
|
|
self.successfultests += 1
|
|
elif ex.errno == EIO:
|
|
print("An unspecified error occurred (EIO).\nCompleted Test 9/9: ETIMEDOUT - FAILED")
|
|
self.failedtests["ETIMEDOUT"] = "EIO - Unspecified Error"
|
|
else:
|
|
print(f"Another error occurred with code: {ex.errno}.\nCompleted Test 9/9: ETIMEDOUT - FAILED")
|
|
self.failedtests["ETIMEDOUT"] = ex.errno
|
|
|
|
def print_results(self):
|
|
print(f"\n=== Results: {self.successfultests}/9 tests passed ===")
|
|
if self.failedtests:
|
|
print("Failed tests:")
|
|
for key, value in self.failedtests.items():
|
|
print(f" {key}: {value}")
|
|
class UIOTest(OSDiagnostics):
|
|
def __init__(self, hub, motorclass):
|
|
self.hub = hub
|
|
self.motorclass = motorclass
|
|
self.successfultests = 0
|
|
self.failedtests = {}
|
|
# uio contains BytesIO, StringIO, and FileIO, but due to SPIKE Prime's lack of a filesystem, the test will omit FileIO and only include BytesIO and StringIO
|
|
def testbytesio(self):
|
|
try:
|
|
buffer = io.BytesIO()
|
|
|
|
buffer.write(b'Hello, ')
|
|
buffer.write(b'Pybricks byte stream!')
|
|
|
|
current_content = buffer.getvalue()
|
|
print(f"Buffer content (via getvalue()): {current_content}")
|
|
|
|
print(f"Current cursor position: {buffer.tell()}")
|
|
# TODO: After testing that BytesIO actually works on this system, add checks to make sure that the outputs match what they should.
|
|
buffer.seek(0)
|
|
|
|
read_data = buffer.read()
|
|
print(f"Read data (via read()): {read_data}")
|
|
|
|
print(f"Current cursor position after reading: {buffer.tell()}")
|
|
|
|
buffer.close()
|
|
print("Buffer was closed successfully.")
|
|
print("Completed Test 1/2: BytesIO - SUCCESSFUL")
|
|
self.successfultests += 1
|
|
except Exception as ex:
|
|
print("An unexpected error occured.")
|
|
print("Completed Test 1/2: BytesIO - FAILED")
|
|
self.failedtests["BytesIO"] = ex.errno
|
|
def teststringio(self):
|
|
try:
|
|
buffer = io.StringIO()
|
|
|
|
buffer.write('Hello, ')
|
|
buffer.write('Pybricks string stream!')
|
|
|
|
current_content = buffer.getvalue()
|
|
print(f"Buffer content (via getvalue()): {current_content}")
|
|
|
|
print(f"Current cursor position: {buffer.tell()}")
|
|
# TODO: After testing that StringIO actually works on this system, add checks to make sure that the outputs match what they should.
|
|
buffer.seek(0)
|
|
|
|
read_data = buffer.read()
|
|
print(f"Read data (via read()): {read_data}")
|
|
|
|
print(f"Current cursor position after reading: {buffer.tell()}")
|
|
|
|
buffer.close()
|
|
print("Buffer was closed successfully.")
|
|
print("Completed Test 2/2: StringIO - SUCCESSFUL")
|
|
self.successfultests += 1
|
|
except Exception as ex:
|
|
print("An unexpected error occured.")
|
|
print("Completed Test 2/2: StringIO - FAILED")
|
|
self.failedtests["StringIO"] = ex.errno
|
|
|
|
def print_results(self):
|
|
self.testbytesio()
|
|
self.teststringio()
|
|
print(f"\n=== Results: {self.successfultests}/2 tests passed ===")
|
|
if self.failedtests:
|
|
print("Failed tests:")
|
|
for key, value in self.failedtests.items():
|
|
print(f" {key}: {value}") |