This commit is contained in:
Vladimir Vukicevic
2023-09-02 19:31:03 -07:00
commit 58be4707fd
2 changed files with 344 additions and 0 deletions

158
cassini.py Executable file
View File

@@ -0,0 +1,158 @@
#!env python3
# -*- coding: utf-8 -*-
import sys
import socket
import struct
import time
import json
import asyncio
import logging
import random
from simple_mqtt_server import SimpleMQTTServer
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
)
SATURN_BROADCAST_PORT = 3000
SATURN_STATUS_COMPLETE = 16 # ??
SATURN_CMD_0 = 0 # null data
SATURN_CMD_1 = 1 # null data
SATURN_CMD_SET_REPORT_TIME_PERIOD = 512
SATURN_CMD_START_PRINTING = 128 # "Filename": "X", "StartLayer": 0
SATURN_CMD_UPLOAD_FILE = 256 # "Check": 0, "CleanCache": 1, "Compress": 0, "FileSize": 3541068, "Filename": "_ResinXP2-ValidationMatrix_v2.goo", "MD5": "205abc8fab0762ad2b0ee1f6b63b1750", "URL": "http://${ipaddr}:58883/f60c0718c8144b0db48b7149d4d85390.goo" },
SATURN_CMD_DISCONNECT = 64 # Maybe disconnect?
PRINTERS = []
PRINTER_SEARCH_TIMEOUT = 1
def handle_exception(loop, context):
msg = context.get("exception", context["message"])
name = context.get("future").get_coro().__name__
logging.error(f"Caught exception from {name}: {msg}")
def find_printers_on(iface):
#netaddr = netifaces.ifaddresses(iface)
#if not netaddr.has_key(netifaces.AF_INET):
# print('No IPv4 address found for interface {}'.format(iface))
# sys.exit(1)
#broadcast = netaddr[netifaces.AF_INET][0]['broadcast']
# create UDP socket and send broadcast
printers = []
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
with sock:
sock.settimeout(PRINTER_SEARCH_TIMEOUT)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.sendto(b'M99999', ('<broadcast>', SATURN_BROADCAST_PORT))
now = time.time()
while True:
if time.time() - now > PRINTER_SEARCH_TIMEOUT:
break
try:
data, addr = sock.recvfrom(1024)
except socket.timeout:
continue
else:
print(f'Found printer at {addr}')
pdata = json.loads(data.decode('utf-8'))
pdata['addr'] = addr
printers.append(pdata)
return printers
def connect_printer(printer, srvport):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
with sock:
sock.sendto(b'M66666 ' + str(srvport).encode('utf-8'), printer['addr'])
async def create_server():
mqtt = SimpleMQTTServer('0.0.0.0', 0)
await mqtt.start()
logging.info(f"Server created, port {mqtt.port}")
return mqtt
async def serve_forever(mqtt):
loop = asyncio.get_running_loop()
loop.set_exception_handler(handle_exception)
await mqtt.serve_forever()
async def find_printers():
printers = find_printers_on('en0')
if len(printers) == 0:
print('No printers found')
sys.exit(1)
for i, p in enumerate(printers):
attrs = p['Data']['Attributes']
#print('{}: {} ({})'.format(i, attrs['Name'], attrs['MachineName']))
return printers
async def printer_setup(mqtt_port):
PRINTERS = await find_printers()
printer = PRINTERS[0]
connect_printer(printer, mqtt_port)
return printer
def print_printer_status(printers):
for i, p in enumerate(printers):
attrs = p['Data']['Attributes']
status = p['Data']['Status']
printInfo = status['PrintInfo']
print(f"{i}: {attrs['Name']} ({attrs['MachineName']})")
print(f" Status: {printInfo['Status']} Layers: {printInfo['CurrentLayer']}/{printInfo['TotalLayer']}")
def send_printer_command(mqtt, printer, cmdid, data=None):
# generate 16-byte random identifier as a hex string
hexstr = '%032x' % random.getrandbits(128)
timestamp = int(time.time() * 1000)
mainboard = printer['Data']['Attributes']['MainboardID']
cmd_data = {
"Data": {
"Cmd": cmdid,
"Data": json.dumps(data) if data is not None else "null",
"From": 0,
"MainboardID": mainboard,
"RequestID": hexstr,
"TimeStamp": timestamp
},
"Id": printer['Id']
}
print(json.dumps(cmd_data))
mqtt.outgoing_messages.put_nowait({'topic': '/sdcp/request/' + mainboard, 'payload': json.dumps(cmd_data)})
async def main():
cmd = None
if len(sys.argv) > 1:
cmd = sys.argv[1]
if cmd == 'status':
printers = await find_printers()
print_printer_status(printers)
else:
mqtt = await create_server()
server_task = asyncio.create_task(serve_forever(mqtt))
printer = await printer_setup(mqtt.port)
await asyncio.sleep(3)
send_printer_command(mqtt, printer, 0)
await asyncio.sleep(1)
send_printer_command(mqtt, printer, 1)
await asyncio.sleep(1)
send_printer_command(mqtt, printer, 512, { "TimePeriod": 5000 })
#printer_task = asyncio.create_task(printer_setup(mqtt.port))
await asyncio.sleep(1000)
#while True:
# if server_task is not None and server_task.done():
# print("Server task done")
# print(server_task.exception())
# server_task = None
# if printer_task is not None and printer_task.done():
# print("Printer task done")
# print(printer_task.exception())
# printer_task = None
asyncio.run(main())

186
simple_mqtt_server.py Normal file
View File

@@ -0,0 +1,186 @@
import asyncio
import struct
MQTT_CONNECT = 1
MQTT_CONNACK = 2
MQTT_PUBLISH = 3
MQTT_PUBACK = 4
MQTT_SUBSCRIBE = 8
MQTT_SUBACK = 9
MQTT_DISCONNECT = 14
class SimpleMQTTServer:
def __init__(self, host, port):
self.host = host
self.port = port
self.server = None
self.incoming_messages = asyncio.Queue()
self.outgoing_messages = asyncio.Queue()
self.next_pack_id_value = 1
async def start(self):
self.server = await asyncio.start_server(self.handle_client, self.host, self.port)
self.port = self.server.sockets[0].getsockname()[1]
print(f'Listening on {self.server.sockets[0].getsockname()}')
async def serve_forever(self):
await self.server.serve_forever()
async def handle_client(self, reader, writer):
addr = writer.get_extra_info('peername')
print(f'Socket connected from {addr}')
data = b''
read_future = asyncio.ensure_future(reader.read(1024))
outgoing_messages_future = asyncio.ensure_future(self.outgoing_messages.get())
subscribed_topics = dict()
while True:
# get Future representing a reader.read(1024)
# get Future representing a self.incoming_messages.get()
completed, pending = await asyncio.wait([read_future, outgoing_messages_future], return_when=asyncio.FIRST_COMPLETED)
#print(completed)
#print(pending)
if outgoing_messages_future in completed:
#print("Got outgoing message")
outmsg = outgoing_messages_future.result()
topic = outmsg['topic']
payload = outmsg['payload']
if topic in subscribed_topics:
qos = subscribed_topics[topic]
await self.send_msg(writer, MQTT_PUBLISH, payload=self.encode_publish(topic, payload, self.next_pack_id()))
else:
print(f'SEND: NOT SUBSCRIBED {topic}: {payload}')
#msg = (MQTT_PUBLISH, 0, topic.encode('utf-8') + payload.encode('utf-8'))
#await self.send_msg(writer, *msg)
outgoing_messages_future = asyncio.ensure_future(self.outgoing_messages.get())
if read_future in completed:
d = read_future.result()
data += d
read_future = asyncio.ensure_future(reader.read(1024))
else:
continue
# Process any messages
while True:
# must have at least 2 bytes
if len(data) < 2:
break
#print(f"Remaining bytes: {len(data)}")
#print(f"Data: {data}")
msg_type = data[0] >> 4
msg_flags = data[0] & 0xf
#print(f" msg_type: {msg_type} msg_flags: {msg_flags}")
# TODO -- we could maybe not have enough bytes to decode the length, but assume
# that won't happen
msg_length, len_bytes_consumed = self.decode_length(data[1:])
print(f" in msg_type: {msg_type} flags: {msg_flags} msg_length {msg_length} bytes_consumed for msg_length {len_bytes_consumed}")
# is there enough to process the message?
head_len = len_bytes_consumed + 1
if msg_length + head_len > len(data):
print("Not enough")
break
# pull the message payload out, and move data to next packet
message = data[head_len :head_len+msg_length]
data = data[head_len+msg_length:]
if msg_type == MQTT_CONNECT:
# ignore the contents of the message, should maybe check for 'MQTT' identifier at least
print(f"Client {addr} connected")
await self.send_msg(writer, MQTT_CONNACK, payload=b'\x00\x00')
elif msg_type == MQTT_PUBLISH:
qos = (msg_flags >> 1) & 0x3
topic, packid, content = self.parse_publish(message)
print(f"{topic}: {content}")
if qos > 0:
await self.send_msg(writer, MQTT_PUBACK, packet_ident=packid)
elif msg_type == MQTT_SUBSCRIBE:
qos = (msg_flags >> 1) & 0x3
packid = message[0] << 8 | message[1]
message = message[2:]
topic = self.parse_subscribe(message)
print(f"Client {addr} subscribed to topic '{topic}', QoS {qos}")
subscribed_topics[topic] = qos
await self.send_msg(writer, MQTT_SUBACK, packet_ident=packid, payload=bytes([qos]))
elif msg_type == MQTT_DISCONNECT:
print(f"Client {addr} disconnected")
writer.close()
await writer.wait_closed()
return
async def send_msg(self, writer, msg_type, flags=0, packet_ident=0, payload=b''):
head = bytes([msg_type << 4 | flags])
payload_length = len(payload)
if packet_ident > 0:
payload_length += 2
head += self.encode_length(payload_length)
if packet_ident > 0:
head += bytes([packet_ident >> 8, packet_ident & 0xff])
data = head + payload
print(f" writing {len(data)} bytes: {data}")
writer.write(data)
await writer.drain()
def encode_length(self, length):
encoded = bytearray()
while True:
digit = length % 128
length //= 128
if length > 0:
digit |= 0x80
encoded.append(digit)
if length == 0:
break
return encoded
def decode_length(self, data):
multiplier = 1
value = 0
bytes_read = 0
for byte in data:
bytes_read += 1
value += (byte & 0x7f) * multiplier
if byte & 0x80 == 0:
break
multiplier *= 128
if multiplier > 2097152:
raise ValueError("Malformed Remaining Length")
return value, bytes_read
def parse_publish(self, data):
topic_len = struct.unpack("!H", data[0:2])[0]
topic = data[2:2 + topic_len].decode("utf-8")
packid = struct.unpack("!H", data[2 + topic_len:4 + topic_len])[0]
message_start = 4 + topic_len
message = data[message_start:].decode("utf-8")
return topic, packid, message
def parse_subscribe(self, data):
topic_len = struct.unpack("!H", data[0:2])[0]
topic = data[2:2 + topic_len].decode("utf-8")
return topic
def encode_publish(self, topic, message, packid=0):
topic_len = len(topic)
topic = topic.encode("utf-8")
packid = struct.pack("!H", packid)
message = message.encode("utf-8")
return struct.pack("!H", topic_len) + topic + packid + message
def next_pack_id(self):
pack_id = self.next_pack_id_value
self.next_pack_id_value += 1
return pack_id
if __name__ == "__main__":
server = SimpleMQTTServer()
asyncio.run(server.run())