commit 58be4707fda89cb27bc1ffe2384cb878934e1d36 Author: Vladimir Vukicevic Date: Sat Sep 2 19:31:03 2023 -0700 WIP diff --git a/cassini.py b/cassini.py new file mode 100755 index 0000000..f42e6e3 --- /dev/null +++ b/cassini.py @@ -0,0 +1,158 @@ +#!env python3 +# -*- coding: utf-8 -*- +import sys +import socket +import struct +import time +import json +import asyncio +import logging +import random +from simple_mqtt_server import SimpleMQTTServer + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s", + datefmt="%H:%M:%S", +) + +SATURN_BROADCAST_PORT = 3000 + +SATURN_STATUS_COMPLETE = 16 # ?? + +SATURN_CMD_0 = 0 # null data +SATURN_CMD_1 = 1 # null data +SATURN_CMD_SET_REPORT_TIME_PERIOD = 512 +SATURN_CMD_START_PRINTING = 128 # "Filename": "X", "StartLayer": 0 +SATURN_CMD_UPLOAD_FILE = 256 # "Check": 0, "CleanCache": 1, "Compress": 0, "FileSize": 3541068, "Filename": "_ResinXP2-ValidationMatrix_v2.goo", "MD5": "205abc8fab0762ad2b0ee1f6b63b1750", "URL": "http://${ipaddr}:58883/f60c0718c8144b0db48b7149d4d85390.goo" }, +SATURN_CMD_DISCONNECT = 64 # Maybe disconnect? + +PRINTERS = [] + +PRINTER_SEARCH_TIMEOUT = 1 + +def handle_exception(loop, context): + msg = context.get("exception", context["message"]) + name = context.get("future").get_coro().__name__ + logging.error(f"Caught exception from {name}: {msg}") + +def find_printers_on(iface): + #netaddr = netifaces.ifaddresses(iface) + #if not netaddr.has_key(netifaces.AF_INET): + # print('No IPv4 address found for interface {}'.format(iface)) + # sys.exit(1) + #broadcast = netaddr[netifaces.AF_INET][0]['broadcast'] + + # create UDP socket and send broadcast + printers = [] + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + with sock: + sock.settimeout(PRINTER_SEARCH_TIMEOUT) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + sock.sendto(b'M99999', ('', SATURN_BROADCAST_PORT)) + + now = time.time() + while True: + if time.time() - now > PRINTER_SEARCH_TIMEOUT: + break + try: + data, addr = sock.recvfrom(1024) + except socket.timeout: + continue + else: + print(f'Found printer at {addr}') + pdata = json.loads(data.decode('utf-8')) + pdata['addr'] = addr + printers.append(pdata) + return printers + +def connect_printer(printer, srvport): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + with sock: + sock.sendto(b'M66666 ' + str(srvport).encode('utf-8'), printer['addr']) + +async def create_server(): + mqtt = SimpleMQTTServer('0.0.0.0', 0) + await mqtt.start() + logging.info(f"Server created, port {mqtt.port}") + return mqtt + +async def serve_forever(mqtt): + loop = asyncio.get_running_loop() + loop.set_exception_handler(handle_exception) + await mqtt.serve_forever() + +async def find_printers(): + printers = find_printers_on('en0') + if len(printers) == 0: + print('No printers found') + sys.exit(1) + for i, p in enumerate(printers): + attrs = p['Data']['Attributes'] + #print('{}: {} ({})'.format(i, attrs['Name'], attrs['MachineName'])) + return printers + +async def printer_setup(mqtt_port): + PRINTERS = await find_printers() + printer = PRINTERS[0] + connect_printer(printer, mqtt_port) + return printer + +def print_printer_status(printers): + for i, p in enumerate(printers): + attrs = p['Data']['Attributes'] + status = p['Data']['Status'] + printInfo = status['PrintInfo'] + print(f"{i}: {attrs['Name']} ({attrs['MachineName']})") + print(f" Status: {printInfo['Status']} Layers: {printInfo['CurrentLayer']}/{printInfo['TotalLayer']}") + +def send_printer_command(mqtt, printer, cmdid, data=None): + # generate 16-byte random identifier as a hex string + hexstr = '%032x' % random.getrandbits(128) + timestamp = int(time.time() * 1000) + mainboard = printer['Data']['Attributes']['MainboardID'] + cmd_data = { + "Data": { + "Cmd": cmdid, + "Data": json.dumps(data) if data is not None else "null", + "From": 0, + "MainboardID": mainboard, + "RequestID": hexstr, + "TimeStamp": timestamp + }, + "Id": printer['Id'] + } + print(json.dumps(cmd_data)) + mqtt.outgoing_messages.put_nowait({'topic': '/sdcp/request/' + mainboard, 'payload': json.dumps(cmd_data)}) + +async def main(): + cmd = None + if len(sys.argv) > 1: + cmd = sys.argv[1] + if cmd == 'status': + printers = await find_printers() + print_printer_status(printers) + else: + mqtt = await create_server() + server_task = asyncio.create_task(serve_forever(mqtt)) + printer = await printer_setup(mqtt.port) + await asyncio.sleep(3) + send_printer_command(mqtt, printer, 0) + await asyncio.sleep(1) + send_printer_command(mqtt, printer, 1) + await asyncio.sleep(1) + send_printer_command(mqtt, printer, 512, { "TimePeriod": 5000 }) + + #printer_task = asyncio.create_task(printer_setup(mqtt.port)) + await asyncio.sleep(1000) + #while True: + # if server_task is not None and server_task.done(): + # print("Server task done") + # print(server_task.exception()) + # server_task = None + # if printer_task is not None and printer_task.done(): + # print("Printer task done") + # print(printer_task.exception()) + # printer_task = None + +asyncio.run(main()) \ No newline at end of file diff --git a/simple_mqtt_server.py b/simple_mqtt_server.py new file mode 100644 index 0000000..0041cf7 --- /dev/null +++ b/simple_mqtt_server.py @@ -0,0 +1,186 @@ +import asyncio +import struct + +MQTT_CONNECT = 1 +MQTT_CONNACK = 2 +MQTT_PUBLISH = 3 +MQTT_PUBACK = 4 +MQTT_SUBSCRIBE = 8 +MQTT_SUBACK = 9 +MQTT_DISCONNECT = 14 + +class SimpleMQTTServer: + def __init__(self, host, port): + self.host = host + self.port = port + self.server = None + self.incoming_messages = asyncio.Queue() + self.outgoing_messages = asyncio.Queue() + self.next_pack_id_value = 1 + + async def start(self): + self.server = await asyncio.start_server(self.handle_client, self.host, self.port) + self.port = self.server.sockets[0].getsockname()[1] + print(f'Listening on {self.server.sockets[0].getsockname()}') + + async def serve_forever(self): + await self.server.serve_forever() + + async def handle_client(self, reader, writer): + addr = writer.get_extra_info('peername') + print(f'Socket connected from {addr}') + data = b'' + + read_future = asyncio.ensure_future(reader.read(1024)) + outgoing_messages_future = asyncio.ensure_future(self.outgoing_messages.get()) + + subscribed_topics = dict() + + while True: + # get Future representing a reader.read(1024) + # get Future representing a self.incoming_messages.get() + completed, pending = await asyncio.wait([read_future, outgoing_messages_future], return_when=asyncio.FIRST_COMPLETED) + #print(completed) + #print(pending) + + if outgoing_messages_future in completed: + #print("Got outgoing message") + outmsg = outgoing_messages_future.result() + topic = outmsg['topic'] + payload = outmsg['payload'] + + if topic in subscribed_topics: + qos = subscribed_topics[topic] + await self.send_msg(writer, MQTT_PUBLISH, payload=self.encode_publish(topic, payload, self.next_pack_id())) + else: + print(f'SEND: NOT SUBSCRIBED {topic}: {payload}') + #msg = (MQTT_PUBLISH, 0, topic.encode('utf-8') + payload.encode('utf-8')) + #await self.send_msg(writer, *msg) + outgoing_messages_future = asyncio.ensure_future(self.outgoing_messages.get()) + + if read_future in completed: + d = read_future.result() + data += d + read_future = asyncio.ensure_future(reader.read(1024)) + else: + continue + + # Process any messages + while True: + # must have at least 2 bytes + if len(data) < 2: + break + #print(f"Remaining bytes: {len(data)}") + #print(f"Data: {data}") + + msg_type = data[0] >> 4 + msg_flags = data[0] & 0xf + #print(f" msg_type: {msg_type} msg_flags: {msg_flags}") + # TODO -- we could maybe not have enough bytes to decode the length, but assume + # that won't happen + msg_length, len_bytes_consumed = self.decode_length(data[1:]) + print(f" in msg_type: {msg_type} flags: {msg_flags} msg_length {msg_length} bytes_consumed for msg_length {len_bytes_consumed}") + + # is there enough to process the message? + head_len = len_bytes_consumed + 1 + if msg_length + head_len > len(data): + print("Not enough") + break + + # pull the message payload out, and move data to next packet + message = data[head_len :head_len+msg_length] + data = data[head_len+msg_length:] + + if msg_type == MQTT_CONNECT: + # ignore the contents of the message, should maybe check for 'MQTT' identifier at least + print(f"Client {addr} connected") + await self.send_msg(writer, MQTT_CONNACK, payload=b'\x00\x00') + elif msg_type == MQTT_PUBLISH: + qos = (msg_flags >> 1) & 0x3 + topic, packid, content = self.parse_publish(message) + print(f"{topic}: {content}") + if qos > 0: + await self.send_msg(writer, MQTT_PUBACK, packet_ident=packid) + elif msg_type == MQTT_SUBSCRIBE: + qos = (msg_flags >> 1) & 0x3 + packid = message[0] << 8 | message[1] + message = message[2:] + topic = self.parse_subscribe(message) + print(f"Client {addr} subscribed to topic '{topic}', QoS {qos}") + subscribed_topics[topic] = qos + await self.send_msg(writer, MQTT_SUBACK, packet_ident=packid, payload=bytes([qos])) + elif msg_type == MQTT_DISCONNECT: + print(f"Client {addr} disconnected") + writer.close() + await writer.wait_closed() + return + + async def send_msg(self, writer, msg_type, flags=0, packet_ident=0, payload=b''): + head = bytes([msg_type << 4 | flags]) + payload_length = len(payload) + if packet_ident > 0: + payload_length += 2 + head += self.encode_length(payload_length) + if packet_ident > 0: + head += bytes([packet_ident >> 8, packet_ident & 0xff]) + data = head + payload + print(f" writing {len(data)} bytes: {data}") + writer.write(data) + await writer.drain() + + def encode_length(self, length): + encoded = bytearray() + while True: + digit = length % 128 + length //= 128 + if length > 0: + digit |= 0x80 + encoded.append(digit) + if length == 0: + break + return encoded + + def decode_length(self, data): + multiplier = 1 + value = 0 + bytes_read = 0 + + for byte in data: + bytes_read += 1 + value += (byte & 0x7f) * multiplier + if byte & 0x80 == 0: + break + multiplier *= 128 + if multiplier > 2097152: + raise ValueError("Malformed Remaining Length") + + return value, bytes_read + + def parse_publish(self, data): + topic_len = struct.unpack("!H", data[0:2])[0] + topic = data[2:2 + topic_len].decode("utf-8") + packid = struct.unpack("!H", data[2 + topic_len:4 + topic_len])[0] + message_start = 4 + topic_len + message = data[message_start:].decode("utf-8") + return topic, packid, message + + def parse_subscribe(self, data): + topic_len = struct.unpack("!H", data[0:2])[0] + topic = data[2:2 + topic_len].decode("utf-8") + return topic + + def encode_publish(self, topic, message, packid=0): + topic_len = len(topic) + topic = topic.encode("utf-8") + packid = struct.pack("!H", packid) + message = message.encode("utf-8") + return struct.pack("!H", topic_len) + topic + packid + message + + def next_pack_id(self): + pack_id = self.next_pack_id_value + self.next_pack_id_value += 1 + return pack_id + +if __name__ == "__main__": + server = SimpleMQTTServer() + asyncio.run(server.run())