From 98c42c2b9dedf36f658b4b77d5c58e167cd59747 Mon Sep 17 00:00:00 2001 From: Colin Date: Mon, 10 Mar 2025 07:48:24 +0000 Subject: [PATCH] decode Saturn 4 status responses --- cassini.py | 19 +++++---- saturn_printer.py | 99 ++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 103 insertions(+), 15 deletions(-) diff --git a/cassini.py b/cassini.py index e1baffc..bf60d86 100755 --- a/cassini.py +++ b/cassini.py @@ -53,17 +53,16 @@ async def create_http_server(): def do_status(printers): for i, p in enumerate(printers): - attrs = p.desc['Data']['Attributes'] - status = p.desc['Data']['Status'] - print_info = status['PrintInfo'] - file_info = status['FileTransferInfo'] + name = p.desc.name + machine_name = p.desc.machine_name + current_status = p.desc.current_status + print_info = p.desc.print_info + file_transfer_info = p.desc.file_transfer_info print(f"{p.addr[0]}:") - print(f" {attrs['Name']} ({attrs['MachineName']})") - print(f" Machine Status: {CurrentStatus(status['CurrentStatus']).name}") - print(f" Print Status: {PrintInfoStatus(print_info['Status']).name}") - print(f" Layers: {print_info['CurrentLayer']}/{print_info['TotalLayer']}") - print(f" File: {print_info['Filename']}") - print(f" File Transfer Status: {FileStatus(file_info['Status']).name}") + print(f" {name} ({machine_name})") + print(f" Machine Status: {current_status}") + print(f" Print Info: {print_info}") + print(f" File Transfer Info: {file_transfer_info}") def do_status_full(printers): for i, p in enumerate(printers): diff --git a/saturn_printer.py b/saturn_printer.py index abdbce7..b4e188b 100644 --- a/saturn_printer.py +++ b/saturn_printer.py @@ -50,6 +50,94 @@ class Command(Enum): UPLOAD_FILE = 256 # "Check": 0, "CleanCache": 1, "Compress": 0, "FileSize": 3541068, "Filename": "_ResinXP2-ValidationMatrix_v2.goo", "MD5": "205abc8fab0762ad2b0ee1f6b63b1750", "URL": "http://${ipaddr}:58883/f60c0718c8144b0db48b7149d4d85390.goo" }, SET_MYSTERY_TIME_PERIOD = 512 # "TimePeriod": 5000 +class PrintInfo(dict): + def __repr__(self) -> str: + return f"PrintInfo(\ + status={self.status},\ + current_layer={self.current_layer}, total_layer={self.total_layer},\ + filename={self.filename},\ +)" + + @property + def current_layer(self) -> int: + return self['CurrentLayer'] + + @property + def total_layer(self) -> int: + return self['TotalLayer'] + + @property + def filename(self) -> str: + return self['Filename'] + + @property + def status(self) -> PrintInfoStatus: + return PrintInfoStatus(self['Status']) + +class FileTransferInfo(dict): + def __repr__(self) -> str: + return f"FileTransferInfo(status={self.status})" + + @property + def status(self) -> FileStatus: + return FileStatus(self['Status']) + +class Desc(dict): + @property + def current_status(self) -> int | None: + try: + # Saturn 3 + return self['Data']['Status']['CurrentStatus'] + except KeyError: + # Saturn 4 + return self['Data'].get('CurrentStatus') + + @property + def id_(self) -> dict: + try: + # Saturn 3 + return self['Data']['Attributes']['MainboardID'] + except KeyError: + # Saturn 4 + return self['Data']['MainboardID'] + + @property + def machine_name(self) -> str: + try: + # Saturn 3 + return self['Data']['Attributes']['MachineName'] + except KeyError: + # Saturn 4 + return self['Data']['MachineName'] + + @property + def name(self) -> str: + try: + # Saturn 3 + return self['Data']['Attributes']['Name'] + except KeyError: + # Saturn 4 + return self['Data']['Name'] + + @property + def print_info(self) -> PrintInfo | None: + try: + # Saturn 3 + return PrintInfo(self['Data']['Status']['PrintInfo']) + except KeyError: + # Saturn 4 + return None + + @property + def file_transfer_info(self) -> FileTransferInfo | None: + try: + # Saturn 3 + return FileTransferInfo(self['Data']['Status']['FileTransferInfo']) + except KeyError: + return None + + + def random_hexstr(): return '%032x' % random.getrandbits(128) @@ -114,12 +202,13 @@ class SaturnPrinter: logging.debug(f"received: {pdata}") self.set_desc(pdata) - def set_desc(self, desc): + def set_desc(self, desc: Desc | dict): + desc = Desc(desc) self.desc = desc - self.id = desc['Data']['Attributes']['MainboardID'] - self.name = desc['Data']['Attributes']['Name'] - self.machine_name = desc['Data']['Attributes']['MachineName'] - self.current_status = desc['Data']['Status']['CurrentStatus'] + self.id = desc.id_ + self.name = desc.name + self.machine_name = desc.machine_name + self.current_status = desc.current_status or CurrentStatus.READY.value self.busy = self.current_status > 0 # Tell this printer to connect to the specified mqtt and http