Files
pynamics/diagnostics/os_diagnostics.py

440 lines
19 KiB
Python

from pybricks.parameters import Port
from uerrno import EAGAIN, EBUSY, ECANCELED, EINVAL, EIO, ENODEV, EOPNOTSUPP, EPERM, ETIMEDOUT
import uio
import ujson
from pybricks.iodevices import UARTDevice as _UARTDevice
from pybricks.tools import wait, multitask, run_task
class FakeUART:
def __init__(self, port, baudrate, timeout):
self.timeout = timeout
self._force_error = None
print("Warning: No physical UART detected. Using simulator.")
def set_error(self, errno):
self._force_error = errno
def read(self, length=1):
if self._force_error is not None:
err = self._force_error
self._force_error = None
raise OSError(err)
if self.timeout is not None:
wait(self.timeout)
raise OSError(ETIMEDOUT)
else:
while True:
wait(1000)
def write(self, data):
if self._force_error is not None:
err = self._force_error
self._force_error = None
raise OSError(err)
def UARTDevice(port, baudrate=9600, timeout=None):
try:
return _UARTDevice(port, baudrate, timeout)
except OSError:
return FakeUART(port, baudrate, timeout)
class OSDiagnostics:
def __init__(self, hub, motorclass):
self.hub = hub
self.motorclass = motorclass
self.successfultests = 0
self.failedtests = {}
def testUerrno(self):
uerrnotestobject = UerrnoTest(self.hub, self.motorclass)
uerrnotestobject.testeagain()
uerrnotestobject.testebusy()
uerrnotestobject.testecanceled()
uerrnotestobject.testeinval()
uerrnotestobject.testeio()
uerrnotestobject.testenodev()
uerrnotestobject.testeopnotsupp()
uerrnotestobject.testeperm()
uerrnotestobject.testetimedout()
uerrnotestobject.print_results()
self.successfultests += uerrnotestobject.successfultests
self.failedtests.update(uerrnotestobject.failedtests)
def testUIO(self):
uiotestobject = UIOTest(self.hub, self.motorclass)
uiotestobject.print_results()
self.successfultests += uiotestobject.successfultests
self.failedtests.update(uiotestobject.failedtests)
def testUJSON(self):
ujsontestobject = UJSONTest(self.hub, self.motorclass)
ujsontestobject.print_results()
self.successfultests += ujsontestobject.successfultests
self.failedtests.update(ujsontestobject.failedtests)
def testUMath(self):
umathtestobject = UMathTest(self.hub, self.motorclass)
umathtestobject.print_results()
self.successfultests += umathtestobject.successfultests
self.failedtests.update(umathtestobject.failedtests)
class UerrnoTest:
def __init__(self, hub, motorclass):
self.hub = hub
self.motorclass = motorclass
self.successfultests = 0
self.failedtests = {}
def testeagain(self):
# Triggered by calling multitask() nested inside another multitask()
print("Starting Test 1/9: EAGAIN - Try Again Error")
try:
async def inner():
await multitask(wait(100)) # nested multitask raises EAGAIN
async def outer():
await multitask(inner())
run_task(outer())
print("No error raised.\nCompleted Test 1/9: EAGAIN - FAILED")
self.failedtests["EAGAIN"] = "No error raised"
except OSError as ex:
if ex.errno == EAGAIN:
print("EAGAIN can be thrown and caught.\nCompleted Test 1/9: EAGAIN - SUCCESSFUL")
self.successfultests += 1
elif ex.errno == EIO:
print("An unspecified error occurred (EIO).\nCompleted Test 1/9: EAGAIN - FAILED")
self.failedtests["EAGAIN"] = "EIO - Unspecified Error"
else:
print(f"Another error occurred with code: {ex.errno}.\nCompleted Test 1/9: EAGAIN - FAILED")
self.failedtests["EAGAIN"] = ex.errno
def testebusy(self):
# No reliable hardware trigger; use FakeUART
print("Starting Test 2/9: EBUSY - Device Busy Error")
uart = UARTDevice(Port.A, baudrate=9600, timeout=1000)
uart.set_error(EBUSY)
try:
uart.read(1)
print("No error raised.\nCompleted Test 2/9: EBUSY - FAILED")
self.failedtests["EBUSY"] = "No error raised"
except OSError as ex:
if ex.errno == EBUSY:
print("EBUSY can be thrown and caught.\nCompleted Test 2/9: EBUSY - SUCCESSFUL")
self.successfultests += 1
elif ex.errno == EIO:
print("An unspecified error occurred (EIO).\nCompleted Test 2/9: EBUSY - FAILED")
self.failedtests["EBUSY"] = "EIO - Unspecified Error"
else:
print(f"Another error occurred with code: {ex.errno}.\nCompleted Test 2/9: EBUSY - FAILED")
self.failedtests["EBUSY"] = ex.errno
def testecanceled(self):
# No reliable hardware trigger; use FakeUART
print("Starting Test 3/9: ECANCELED - Operation Canceled Error")
uart = UARTDevice(Port.A, baudrate=9600, timeout=1000)
uart.set_error(ECANCELED)
try:
uart.read(1)
print("No error raised.\nCompleted Test 3/9: ECANCELED - FAILED")
self.failedtests["ECANCELED"] = "No error raised"
except OSError as ex:
if ex.errno == ECANCELED:
print("ECANCELED can be thrown and caught.\nCompleted Test 3/9: ECANCELED - SUCCESSFUL")
self.successfultests += 1
elif ex.errno == EIO:
print("An unspecified error occurred (EIO).\nCompleted Test 3/9: ECANCELED - FAILED")
self.failedtests["ECANCELED"] = "EIO - Unspecified Error"
else:
print(f"Another error occurred with code: {ex.errno}.\nCompleted Test 3/9: ECANCELED - FAILED")
self.failedtests["ECANCELED"] = ex.errno
def testeinval(self):
# Triggered by passing an out-of-range value to motor.control.limits()
print("Starting Test 4/9: EINVAL - Invalid Argument Error")
input("Plug a motor into Port A, then press Enter.")
try:
motor = self.motorclass(Port.A)
motor.control.limits(speed=9999999, acceleration=9999999)
print("No error raised.\nCompleted Test 4/9: EINVAL - FAILED")
self.failedtests["EINVAL"] = "No error raised"
except OSError as ex:
if ex.errno == EINVAL:
print("EINVAL can be thrown and caught.\nCompleted Test 4/9: EINVAL - SUCCESSFUL")
self.successfultests += 1
elif ex.errno == EIO:
print("An unspecified error occurred (EIO).\nCompleted Test 4/9: EINVAL - FAILED")
self.failedtests["EINVAL"] = "EIO - Unspecified Error"
else:
print(f"Another error occurred with code: {ex.errno}.\nCompleted Test 4/9: EINVAL - FAILED")
self.failedtests["EINVAL"] = ex.errno
def testeio(self):
# No reliable scriptable trigger (requires physical unplug); use FakeUART
print("Starting Test 5/9: EIO - I/O Error")
uart = UARTDevice(Port.A, baudrate=9600, timeout=1000)
uart.set_error(EIO)
try:
uart.read(1)
print("No error raised.\nCompleted Test 5/9: EIO - FAILED")
self.failedtests["EIO"] = "No error raised"
except OSError as ex:
if ex.errno == EIO:
print("EIO can be thrown and caught.\nCompleted Test 5/9: EIO - SUCCESSFUL")
self.successfultests += 1
else:
print(f"Another error occurred with code: {ex.errno}.\nCompleted Test 5/9: EIO - FAILED")
self.failedtests["EIO"] = ex.errno
def testenodev(self):
# Triggered by initializing a motor on an empty port
print("Starting Test 6/9: ENODEV - Device Not Found Error")
input("Make sure Port A doesn't have anything plugged in, then press Enter.")
try:
my_motor = self.motorclass(Port.A)
print("OS detected a motor when there was nothing. You may have allowed missing motors. This is useful for debugging, but not recommended for production as it can cause issues with device control.")
self.failedtests["ENODEV"] = "No error raised"
except OSError as ex:
if ex.errno == ENODEV:
print("There is no motor on this port. ENODEV can be thrown and caught.\nCompleted Test 6/9: ENODEV - SUCCESSFUL")
self.successfultests += 1
elif ex.errno == EIO:
print("An unspecified error occurred (EIO).\nCompleted Test 6/9: ENODEV - FAILED")
self.failedtests["ENODEV"] = "EIO - Unspecified Error"
else:
print(f"Another error occurred with code: {ex.errno}.\nCompleted Test 6/9: ENODEV - FAILED")
self.failedtests["ENODEV"] = ex.errno
def testeopnotsupp(self):
# No reliable scriptable trigger without specific hardware; use FakeUART
print("Starting Test 7/9: EOPNOTSUPP - Operation Not Supported Error")
uart = UARTDevice(Port.A, baudrate=9600, timeout=1000)
uart.set_error(EOPNOTSUPP)
try:
uart.read(1)
print("No error raised.\nCompleted Test 7/9: EOPNOTSUPP - FAILED")
self.failedtests["EOPNOTSUPP"] = "No error raised"
except OSError as ex:
if ex.errno == EOPNOTSUPP:
print("EOPNOTSUPP can be thrown and caught.\nCompleted Test 7/9: EOPNOTSUPP - SUCCESSFUL")
self.successfultests += 1
elif ex.errno == EIO:
print("An unspecified error occurred (EIO).\nCompleted Test 7/9: EOPNOTSUPP - FAILED")
self.failedtests["EOPNOTSUPP"] = "EIO - Unspecified Error"
else:
print(f"Another error occurred with code: {ex.errno}.\nCompleted Test 7/9: EOPNOTSUPP - FAILED")
self.failedtests["EOPNOTSUPP"] = ex.errno
def testeperm(self):
# Triggered by calling motor.control.limits() while a motor is actively running
print("Starting Test 8/9: EPERM - Operation Not Permitted Error")
input("Plug a motor into Port A, then press Enter.")
try:
motor = self.motorclass(Port.A)
motor.run(500)
wait(50)
motor.control.limits(speed=500, acceleration=1000)
print("No error raised.\nCompleted Test 8/9: EPERM - FAILED")
self.failedtests["EPERM"] = "No error raised"
except OSError as ex:
if ex.errno == EPERM:
print("EPERM can be thrown and caught.\nCompleted Test 8/9: EPERM - SUCCESSFUL")
self.successfultests += 1
elif ex.errno == EIO:
print("An unspecified error occurred (EIO).\nCompleted Test 8/9: EPERM - FAILED")
self.failedtests["EPERM"] = "EIO - Unspecified Error"
else:
print(f"Another error occurred with code: {ex.errno}.\nCompleted Test 8/9: EPERM - FAILED")
self.failedtests["EPERM"] = ex.errno
finally:
try:
motor.stop()
except Exception:
pass
def testetimedout(self):
# Triggered by FakeUART (or real UART) timing out on read
print("Starting Test 9/9: ETIMEDOUT - Timed Out Error")
uart = UARTDevice(Port.A, baudrate=9600, timeout=1000)
try:
data = uart.read(10)
print("No error raised.\nCompleted Test 9/9: ETIMEDOUT - FAILED")
self.failedtests["ETIMEDOUT"] = "No error raised"
except OSError as ex:
if ex.errno == ETIMEDOUT:
print("Timed out with synthetic UART device. ETIMEDOUT can be thrown and caught.\nCompleted Test 9/9: ETIMEDOUT - SUCCESSFUL")
self.successfultests += 1
elif ex.errno == EIO:
print("An unspecified error occurred (EIO).\nCompleted Test 9/9: ETIMEDOUT - FAILED")
self.failedtests["ETIMEDOUT"] = "EIO - Unspecified Error"
else:
print(f"Another error occurred with code: {ex.errno}.\nCompleted Test 9/9: ETIMEDOUT - FAILED")
self.failedtests["ETIMEDOUT"] = ex.errno
def print_results(self):
print(f"\n=== Results: {self.successfultests}/9 tests passed ===")
if self.failedtests:
print("Failed tests:")
for key, value in self.failedtests.items():
print(f" {key}: {value}")
class UIOTest(OSDiagnostics):
def __init__(self, hub, motorclass):
self.hub = hub
self.motorclass = motorclass
self.successfultests = 0
self.failedtests = {}
# uio contains BytesIO, StringIO, and FileIO, but due to SPIKE Prime's lack of a filesystem, the test will omit FileIO and only include BytesIO and StringIO
def testbytesio(self):
try:
buffer = io.BytesIO()
buffer.write(b'Hello, ')
buffer.write(b'Pybricks byte stream!')
current_content = buffer.getvalue()
print(f"Buffer content (via getvalue()): {current_content}")
print(f"Current cursor position: {buffer.tell()}")
# TODO: After testing that BytesIO actually works on this system, add checks to make sure that the outputs match what they should.
buffer.seek(0)
read_data = buffer.read()
print(f"Read data (via read()): {read_data}")
print(f"Current cursor position after reading: {buffer.tell()}")
buffer.close()
print("Buffer was closed successfully.")
print("Completed Test 1/2: BytesIO - SUCCESSFUL")
self.successfultests += 1
except Exception as ex:
print("An unexpected error occured.")
print("Completed Test 1/2: BytesIO - FAILED")
self.failedtests["BytesIO"] = ex.errno
def teststringio(self):
try:
buffer = io.StringIO()
buffer.write('Hello, ')
buffer.write('Pybricks string stream!')
current_content = buffer.getvalue()
print(f"Buffer content (via getvalue()): {current_content}")
print(f"Current cursor position: {buffer.tell()}")
# TODO: After testing that StringIO actually works on this system, add checks to make sure that the outputs match what they should.
buffer.seek(0)
read_data = buffer.read()
print(f"Read data (via read()): {read_data}")
print(f"Current cursor position after reading: {buffer.tell()}")
buffer.close()
print("Buffer was closed successfully.")
print("Completed Test 2/2: StringIO - SUCCESSFUL")
self.successfultests += 1
except Exception as ex:
print("An unexpected error occured.")
print("Completed Test 2/2: StringIO - FAILED")
self.failedtests["StringIO"] = ex.errno
def print_results(self):
self.testbytesio()
self.teststringio()
print(f"\n=== Results: {self.successfultests}/2 tests passed ===")
if self.failedtests:
print("Failed tests:")
for key, value in self.failedtests.items():
print(f" {key}: {value}")
class UJSONTest:
def __init__(self, hub, motorclass):
self.hub = hub
self.motorclass = motorclass
self.successfultests = 0
self.failedtests = {}
# Tests ujson.dumps() and ujson.loads()
def testdumpsloads(self):
try:
original = {"robot": "Pybricks", "speed": 500, "active": True}
json_str = ujson.dumps(original)
print(f"Serialized (via dumps()): {json_str}")
restored = ujson.loads(json_str)
print(f"Deserialized (via loads()): {restored}")
# TODO: After confirming dumps/loads works on this system, add equality tests
print("Completed Test 1/3: dumps/loads - SUCCESSFUL")
self.successfultests += 1
except Exception as ex:
print("An unexpected error occurred.")
print("Completed Test 1/3: dumps/loads - FAILED")
self.failedtests["dumps/loads"] = getattr(ex, "errno", str(ex))
# Tests ujson.loads() raising ValueError on malformed input
def testloadsinvalid(self):
try:
ujson.loads("{not valid json}")
print("No error raised.")
print("Completed Test 2/3: loads invalid - FAILED")
self.failedtests["loads_invalid"] = "No error raised"
except ValueError:
print("ValueError raised on malformed JSON, as expected.")
print("Completed Test 2/3: loads invalid - SUCCESSFUL")
self.successfultests += 1
except Exception as ex:
print("An unexpected error occurred.")
print("Completed Test 2/3: loads invalid - FAILED")
self.failedtests["loads_invalid"] = getattr(ex, "errno", str(ex))
# Tests ujson.dump() and ujson.load() using a uio.StringIO stream
def testdumpload(self):
try:
original = {"hub": "SPIKE Prime", "port": "A", "value": 42}
stream = uio.StringIO()
ujson.dump(original, stream)
print(f"Serialized to stream (via dump()): {stream.getvalue()}")
stream.seek(0)
restored = ujson.load(stream)
print(f"Deserialized from stream (via load()): {restored}")
stream.close()
# TODO: After confirming dump/load works on this system, add
# equality assertions to verify round-trip fidelity.
print("Completed Test 3/3: dump/load (stream) - SUCCESSFUL")
self.successfultests += 1
except Exception as ex:
print("An unexpected error occurred.")
print("Completed Test 3/3: dump/load (stream) - FAILED")
self.failedtests["dump/load"] = getattr(ex, "errno", str(ex))
def print_results(self):
self.testdumpsloads()
self.testloadsinvalid()
self.testdumpload()
print(f"\n=== Results: {self.successfultests}/3 tests passed ===")
if self.failedtests:
print("Failed tests:")
for key, value in self.failedtests.items():
print(f" {key}: {value}")
class UMathTest:
def __init__(self, hub, motorclass):
self.hub = hub
self.motorclass = motorclass
self.successfultests = 0
self.failedtests = {}
def test_math(self):
def print_results(self):
print(f"\n=== Results: {self.successfultests}/0 tests passed ===")
if self.failedtests:
print("Failed tests:")
for key, value in self.failedtests.items():
print(f" {key}: {value}")