diff --git a/Cargo.lock b/Cargo.lock index 086e1ae..623a9ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3182,6 +3182,8 @@ name = "remote_send" version = "0.1.0" dependencies = [ "anyhow", + "bitflags 2.5.0", + "common", "serde", "serde_json", ] diff --git a/common/src/serde/deserializer.rs b/common/src/serde/deserializer.rs index 3afd8f2..909bd03 100644 --- a/common/src/serde/deserializer.rs +++ b/common/src/serde/deserializer.rs @@ -76,4 +76,8 @@ impl<'a> Deserializer<'a> { pub fn read_sized_string(&mut self) -> SizedString { SizedString::new(self.read_bytes(SIZE)) } + + pub fn is_empty(&self) -> bool { + self.offset == self.buffer.len() + } } diff --git a/remote_send/Cargo.toml b/remote_send/Cargo.toml index 1ea09be..7927edd 100644 --- a/remote_send/Cargo.toml +++ b/remote_send/Cargo.toml @@ -7,3 +7,6 @@ edition = "2021" anyhow = "1.0.86" serde = { version = "1.0.203", features = ["derive"] } serde_json = "1.0.117" + +common = { path = "../common" } +bitflags = "2.5.0" diff --git a/remote_send/src/lib.rs b/remote_send/src/lib.rs index f2b0bb9..abcc987 100644 --- a/remote_send/src/lib.rs +++ b/remote_send/src/lib.rs @@ -1,6 +1,7 @@ use anyhow::Result; use serde::{Deserialize, Deserializer}; +pub mod mqtt; pub mod status; #[derive(Debug, Deserialize)] diff --git a/remote_send/src/main.rs b/remote_send/src/main.rs index 479da38..5345e8e 100644 --- a/remote_send/src/main.rs +++ b/remote_send/src/main.rs @@ -1,13 +1,18 @@ -use std::net::UdpSocket; +use std::{net::UdpSocket, thread}; use anyhow::Result; -use remote_send::{status::StatusData, Response}; +use remote_send::{mqtt, status::StatusData, Response}; fn main() -> Result<()> { + thread::spawn(|| { + mqtt::start().unwrap(); + }); + let socket = UdpSocket::bind("0.0.0.0:3000")?; - let msg = b"M99999"; + // let msg = b"M99999"; + let msg = b"M66666 1883"; socket.send_to(msg, "192.168.1.233:3000")?; let mut buffer = [0; 1024]; diff --git a/remote_send/src/mqtt/mod.rs b/remote_send/src/mqtt/mod.rs new file mode 100644 index 0000000..47d4910 --- /dev/null +++ b/remote_send/src/mqtt/mod.rs @@ -0,0 +1,128 @@ +use std::{ + borrow::Cow, + io::{Read, Write}, + net::{TcpListener, TcpStream}, + thread, +}; + +use anyhow::Result; +use common::serde::Deserializer; +use packets::{ + connect::ConnectPacket, + connect_ack::{ConnectAckFlags, ConnectAckPacket, ConnectReturnCode}, + subscribe::SubscribePacket, +}; + +pub mod packets; + +pub fn start() -> Result<()> { + let socket = TcpListener::bind("0.0.0.0:1883")?; + + for stream in socket.incoming() { + let stream = stream?; + println!("Connection established: {:?}", stream); + thread::spawn(|| { + if let Err(e) = handle_client(stream) { + eprintln!("Error handling client: {:?}", e); + } + }); + } + + Ok(()) +} + +fn handle_client(mut stream: TcpStream) -> Result<()> { + loop { + let packet = Packet::read(&mut stream)?; + + match packet.packet_type { + ConnectPacket::PACKET_TYPE => { + let packet = ConnectPacket::from_bytes(&packet.remaining_bytes)?; + println!("Connect packet: {:?}", packet); + + ConnectAckPacket { + flags: ConnectAckFlags::empty(), + return_code: ConnectReturnCode::Accepted, + } + .to_packet() + .write(&mut stream)?; + } + SubscribePacket::PACKET_TYPE => { + let packet = SubscribePacket::from_bytes(&packet.remaining_bytes)?; + println!("Subscribe packet: {:?}", packet); + } + ty => eprintln!("Unsupported packet type: 0x{ty:x}"), + } + } + +} + +pub struct Packet { + packet_type: u8, + flags: u8, + remaining_length: u32, + remaining_bytes: Vec, +} + +impl Packet { + fn write(&self, stream: &mut Stream) -> Result<()> { + let mut bytes = vec![self.packet_type << 4 | self.flags]; + let mut remaining_length = self.remaining_length; + loop { + let mut byte = (remaining_length % 128) as u8; + remaining_length /= 128; + if remaining_length > 0 { + byte |= 0x80; + } + bytes.push(byte); + if remaining_length == 0 { + break; + } + } + bytes.extend(self.remaining_bytes.iter()); + + stream.write_all(&bytes)?; + Ok(()) + } + + fn read(stream: &mut Stream) -> Result { + let mut header = [0; 2]; + stream.read_exact(&mut header)?; + + let (packet_type, flags) = (header[0] >> 4, header[0] & 0xF); + let mut multiplier = 1; + let mut remaining_length = 0; + let mut pos = 1; + loop { + let byte = header[pos]; + remaining_length += (byte & 0x7F) as u32 * multiplier; + multiplier *= 128; + pos += 1; + if byte & 0x80 == 0 { + break; + } + } + + let mut remaining_bytes = vec![0; remaining_length as usize]; + stream.read_exact(&mut remaining_bytes)?; + + Ok(Self { + packet_type, + flags, + remaining_length, + remaining_bytes, + }) + } +} + +trait MqttDeserialize<'a> { + fn read_string(&mut self) -> Cow<'a, str>; +} + +impl<'a> MqttDeserialize<'a> for Deserializer<'a> { + fn read_string(&mut self) -> Cow<'a, str> { + let len = self.read_u16(); + let buf = self.read_bytes(len as usize); + String::from_utf8_lossy(buf) + } +} diff --git a/remote_send/src/mqtt/packets/connect.rs b/remote_send/src/mqtt/packets/connect.rs new file mode 100644 index 0000000..5bfddbc --- /dev/null +++ b/remote_send/src/mqtt/packets/connect.rs @@ -0,0 +1,72 @@ +use anyhow::Result; +use bitflags::bitflags; + +use common::serde::Deserializer; + +use crate::mqtt::MqttDeserialize; + +#[derive(Debug)] +pub struct ConnectPacket { + pub protocol_name: String, + pub protocol_level: u8, + pub connect_flags: ConnectFlags, + pub keep_alive: u16, + + pub client_id: String, + pub will_topic: Option, + pub will_message: Option, + pub username: Option, + pub password: Option, +} + +bitflags! { + #[derive(Debug)] + pub struct ConnectFlags: u8 { + const USERNAME = 0b10000000; + const PASSWORD = 0b01000000; + const WILL_RETAIN = 0b00100000; + const WILL_QOS = 0b00011000; + const WILL_FLAG = 0b00000100; + const CLEAN_SESSION = 0b00000010; + const RESERVED = 0b00000001; + } +} + +impl ConnectPacket { + pub const PACKET_TYPE: u8 = 0x01; + + pub fn from_bytes(bytes: &[u8]) -> Result { + let mut des = Deserializer::new(bytes); + + let protocol_name = des.read_string().into_owned(); + let protocol_level = des.read_u8(); + let connect_flags = ConnectFlags::from_bits(des.read_u8()).unwrap(); + let keep_alive = des.read_u16(); + + let client_id = des.read_string().into_owned(); + let will_topic = connect_flags + .contains(ConnectFlags::WILL_FLAG) + .then(|| des.read_string().into_owned()); + let will_message = will_topic + .as_ref() + .and_then(|_| des.read_string().into_owned().into()); + let username = connect_flags + .contains(ConnectFlags::USERNAME) + .then(|| des.read_string().into_owned()); + let password = connect_flags + .contains(ConnectFlags::PASSWORD) + .then(|| des.read_string().into_owned()); + + Ok(Self { + protocol_name, + protocol_level, + connect_flags, + keep_alive, + client_id, + will_topic, + will_message, + username, + password, + }) + } +} diff --git a/remote_send/src/mqtt/packets/connect_ack.rs b/remote_send/src/mqtt/packets/connect_ack.rs new file mode 100644 index 0000000..97c9b78 --- /dev/null +++ b/remote_send/src/mqtt/packets/connect_ack.rs @@ -0,0 +1,57 @@ +use bitflags::bitflags; + +use crate::mqtt::Packet; + +pub struct ConnectAckPacket { + pub flags: ConnectAckFlags, + pub return_code: ConnectReturnCode, +} + +bitflags! { + pub struct ConnectAckFlags: u8 { + const SESSION_PRESENT = 0b00000001; + } +} + +pub enum ConnectReturnCode { + Accepted, + Refused(ConnectRefusedReason), +} + +pub enum ConnectRefusedReason { + UnacceptableProtocolVersion, + IdentifierRejected, + ServerUnavailable, + BadUsernameOrPassword, + NotAuthorized, +} + +impl ConnectAckPacket { + const PACKET_TYPE: u8 = 0x02; + + pub fn to_packet(&self) -> Packet { + let body = vec![self.flags.bits(), self.return_code.as_u8()]; + + Packet { + packet_type: Self::PACKET_TYPE, + flags: 0, + remaining_length: body.len() as u32, + remaining_bytes: body, + } + } +} + +impl ConnectReturnCode { + fn as_u8(&self) -> u8 { + match self { + ConnectReturnCode::Accepted => 0, + ConnectReturnCode::Refused(reason) => match reason { + ConnectRefusedReason::UnacceptableProtocolVersion => 1, + ConnectRefusedReason::IdentifierRejected => 2, + ConnectRefusedReason::ServerUnavailable => 3, + ConnectRefusedReason::BadUsernameOrPassword => 4, + ConnectRefusedReason::NotAuthorized => 5, + }, + } + } +} diff --git a/remote_send/src/mqtt/packets/mod.rs b/remote_send/src/mqtt/packets/mod.rs new file mode 100644 index 0000000..d310ea6 --- /dev/null +++ b/remote_send/src/mqtt/packets/mod.rs @@ -0,0 +1,3 @@ +pub mod connect; +pub mod connect_ack; +pub mod subscribe; diff --git a/remote_send/src/mqtt/packets/subscribe.rs b/remote_send/src/mqtt/packets/subscribe.rs new file mode 100644 index 0000000..2e22155 --- /dev/null +++ b/remote_send/src/mqtt/packets/subscribe.rs @@ -0,0 +1,32 @@ +use anyhow::Result; + +use common::serde::Deserializer; + +use crate::mqtt::MqttDeserialize; + +#[derive(Debug)] +pub struct SubscribePacket { + pub packet_id: u16, + pub filters: Vec<(String, QoS)>, +} + +#[derive(Debug)] +pub struct QoS(pub u8); + +impl SubscribePacket { + pub const PACKET_TYPE: u8 = 0x08; + + pub fn from_bytes(bytes: &[u8]) -> Result { + let mut des = Deserializer::new(bytes); + + let packet_id = des.read_u16(); + let mut filters = Vec::new(); + while !des.is_empty() { + let topic = des.read_string().into_owned(); + let qos = des.read_u8(); + filters.push((topic, QoS(qos))); + } + + Ok(Self { packet_id, filters }) + } +} diff --git a/ui/src/windows/slice_preview.rs b/ui/src/windows/slice_preview.rs index 852b54b..8d4b0bb 100644 --- a/ui/src/windows/slice_preview.rs +++ b/ui/src/windows/slice_preview.rs @@ -50,7 +50,7 @@ pub fn ui(app: &mut App, ctx: &Context, _frame: &mut Frame) { None }; - egui::Frame::default().show(ui, |ui| { + egui::Frame::canvas(&ui.style()).show(ui, |ui| { let available_size = ui.available_size(); let (rect, _response) = ui.allocate_exact_size( Vec2::new(