Update diagnostics/os_diagnostics.py
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
from pybricks.parameters import Port
|
from pybricks.parameters import Port
|
||||||
from uerrno import EAGAIN, EBUSY, ECANCELED, EINVAL, EIO, ENODEV, EOPNOTSUPP, EPERM, ETIMEDOUT
|
from uerrno import EAGAIN, EBUSY, ECANCELED, EINVAL, EIO, ENODEV, EOPNOTSUPP, EPERM, ETIMEDOUT
|
||||||
import uio
|
import uio
|
||||||
|
import ujson
|
||||||
from pybricks.iodevices import UARTDevice as _UARTDevice
|
from pybricks.iodevices import UARTDevice as _UARTDevice
|
||||||
from pybricks.tools import wait, multitask, run_task
|
from pybricks.tools import wait, multitask, run_task
|
||||||
|
|
||||||
@@ -65,6 +66,11 @@ class OSDiagnostics:
|
|||||||
uiotestobject.print_results()
|
uiotestobject.print_results()
|
||||||
self.successfultests += uiotestobject.successfultests
|
self.successfultests += uiotestobject.successfultests
|
||||||
self.failedtests.update(uiotestobject.failedtests)
|
self.failedtests.update(uiotestobject.failedtests)
|
||||||
|
def testUJSON(self):
|
||||||
|
ujsontestobject = UJSONTest(self.hub, self.motorclass)
|
||||||
|
ujsontestobject.print_results()
|
||||||
|
self.successfultests += ujsontestobject.successfultests
|
||||||
|
self.failedtests.update(ujsontestobject.failedtests)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -336,3 +342,80 @@ class UIOTest(OSDiagnostics):
|
|||||||
print("Failed tests:")
|
print("Failed tests:")
|
||||||
for key, value in self.failedtests.items():
|
for key, value in self.failedtests.items():
|
||||||
print(f" {key}: {value}")
|
print(f" {key}: {value}")
|
||||||
|
class UJSONTest:
|
||||||
|
def __init__(self, hub, motorclass):
|
||||||
|
self.hub = hub
|
||||||
|
self.motorclass = motorclass
|
||||||
|
self.successfultests = 0
|
||||||
|
self.failedtests = {}
|
||||||
|
|
||||||
|
# Tests ujson.dumps() and ujson.loads()
|
||||||
|
def testdumpsloads(self):
|
||||||
|
try:
|
||||||
|
original = {"robot": "Pybricks", "speed": 500, "active": True}
|
||||||
|
|
||||||
|
json_str = ujson.dumps(original)
|
||||||
|
print(f"Serialized (via dumps()): {json_str}")
|
||||||
|
|
||||||
|
restored = ujson.loads(json_str)
|
||||||
|
print(f"Deserialized (via loads()): {restored}")
|
||||||
|
|
||||||
|
# TODO: After confirming dumps/loads works on this system, add equality tests
|
||||||
|
|
||||||
|
print("Completed Test 1/3: dumps/loads - SUCCESSFUL")
|
||||||
|
self.successfultests += 1
|
||||||
|
except Exception as ex:
|
||||||
|
print("An unexpected error occurred.")
|
||||||
|
print("Completed Test 1/3: dumps/loads - FAILED")
|
||||||
|
self.failedtests["dumps/loads"] = getattr(ex, "errno", str(ex))
|
||||||
|
|
||||||
|
# Tests ujson.loads() raising ValueError on malformed input
|
||||||
|
def testloadsinvalid(self):
|
||||||
|
try:
|
||||||
|
ujson.loads("{not valid json}")
|
||||||
|
print("No error raised.")
|
||||||
|
print("Completed Test 2/3: loads invalid - FAILED")
|
||||||
|
self.failedtests["loads_invalid"] = "No error raised"
|
||||||
|
except ValueError:
|
||||||
|
print("ValueError raised on malformed JSON, as expected.")
|
||||||
|
print("Completed Test 2/3: loads invalid - SUCCESSFUL")
|
||||||
|
self.successfultests += 1
|
||||||
|
except Exception as ex:
|
||||||
|
print("An unexpected error occurred.")
|
||||||
|
print("Completed Test 2/3: loads invalid - FAILED")
|
||||||
|
self.failedtests["loads_invalid"] = getattr(ex, "errno", str(ex))
|
||||||
|
|
||||||
|
# Tests ujson.dump() and ujson.load() using a uio.StringIO stream
|
||||||
|
def testdumpload(self):
|
||||||
|
try:
|
||||||
|
original = {"hub": "SPIKE Prime", "port": "A", "value": 42}
|
||||||
|
|
||||||
|
stream = uio.StringIO()
|
||||||
|
ujson.dump(original, stream)
|
||||||
|
print(f"Serialized to stream (via dump()): {stream.getvalue()}")
|
||||||
|
|
||||||
|
stream.seek(0)
|
||||||
|
restored = ujson.load(stream)
|
||||||
|
print(f"Deserialized from stream (via load()): {restored}")
|
||||||
|
|
||||||
|
stream.close()
|
||||||
|
|
||||||
|
# TODO: After confirming dump/load works on this system, add
|
||||||
|
# equality assertions to verify round-trip fidelity.
|
||||||
|
|
||||||
|
print("Completed Test 3/3: dump/load (stream) - SUCCESSFUL")
|
||||||
|
self.successfultests += 1
|
||||||
|
except Exception as ex:
|
||||||
|
print("An unexpected error occurred.")
|
||||||
|
print("Completed Test 3/3: dump/load (stream) - FAILED")
|
||||||
|
self.failedtests["dump/load"] = getattr(ex, "errno", str(ex))
|
||||||
|
|
||||||
|
def print_results(self):
|
||||||
|
self.testdumpsloads()
|
||||||
|
self.testloadsinvalid()
|
||||||
|
self.testdumpload()
|
||||||
|
print(f"\n=== Results: {self.successfultests}/3 tests passed ===")
|
||||||
|
if self.failedtests:
|
||||||
|
print("Failed tests:")
|
||||||
|
for key, value in self.failedtests.items():
|
||||||
|
print(f" {key}: {value}")
|
||||||
Reference in New Issue
Block a user