Start on mqtt implementation
Why are there no good embeddable mqtt libs for rust..
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -3182,6 +3182,8 @@ name = "remote_send"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
"bitflags 2.5.0",
|
||||||
|
"common",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
]
|
]
|
||||||
|
@@ -76,4 +76,8 @@ impl<'a> Deserializer<'a> {
|
|||||||
pub fn read_sized_string<const SIZE: usize>(&mut self) -> SizedString<SIZE> {
|
pub fn read_sized_string<const SIZE: usize>(&mut self) -> SizedString<SIZE> {
|
||||||
SizedString::new(self.read_bytes(SIZE))
|
SizedString::new(self.read_bytes(SIZE))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.offset == self.buffer.len()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -7,3 +7,6 @@ edition = "2021"
|
|||||||
anyhow = "1.0.86"
|
anyhow = "1.0.86"
|
||||||
serde = { version = "1.0.203", features = ["derive"] }
|
serde = { version = "1.0.203", features = ["derive"] }
|
||||||
serde_json = "1.0.117"
|
serde_json = "1.0.117"
|
||||||
|
|
||||||
|
common = { path = "../common" }
|
||||||
|
bitflags = "2.5.0"
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use serde::{Deserialize, Deserializer};
|
use serde::{Deserialize, Deserializer};
|
||||||
|
|
||||||
|
pub mod mqtt;
|
||||||
pub mod status;
|
pub mod status;
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
|
@@ -1,13 +1,18 @@
|
|||||||
use std::net::UdpSocket;
|
use std::{net::UdpSocket, thread};
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
|
|
||||||
use remote_send::{status::StatusData, Response};
|
use remote_send::{mqtt, status::StatusData, Response};
|
||||||
|
|
||||||
fn main() -> Result<()> {
|
fn main() -> Result<()> {
|
||||||
|
thread::spawn(|| {
|
||||||
|
mqtt::start().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
let socket = UdpSocket::bind("0.0.0.0:3000")?;
|
let socket = UdpSocket::bind("0.0.0.0:3000")?;
|
||||||
|
|
||||||
let msg = b"M99999";
|
// let msg = b"M99999";
|
||||||
|
let msg = b"M66666 1883";
|
||||||
socket.send_to(msg, "192.168.1.233:3000")?;
|
socket.send_to(msg, "192.168.1.233:3000")?;
|
||||||
|
|
||||||
let mut buffer = [0; 1024];
|
let mut buffer = [0; 1024];
|
||||||
|
128
remote_send/src/mqtt/mod.rs
Normal file
128
remote_send/src/mqtt/mod.rs
Normal file
@@ -0,0 +1,128 @@
|
|||||||
|
use std::{
|
||||||
|
borrow::Cow,
|
||||||
|
io::{Read, Write},
|
||||||
|
net::{TcpListener, TcpStream},
|
||||||
|
thread,
|
||||||
|
};
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
use common::serde::Deserializer;
|
||||||
|
use packets::{
|
||||||
|
connect::ConnectPacket,
|
||||||
|
connect_ack::{ConnectAckFlags, ConnectAckPacket, ConnectReturnCode},
|
||||||
|
subscribe::SubscribePacket,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub mod packets;
|
||||||
|
|
||||||
|
pub fn start() -> Result<()> {
|
||||||
|
let socket = TcpListener::bind("0.0.0.0:1883")?;
|
||||||
|
|
||||||
|
for stream in socket.incoming() {
|
||||||
|
let stream = stream?;
|
||||||
|
println!("Connection established: {:?}", stream);
|
||||||
|
thread::spawn(|| {
|
||||||
|
if let Err(e) = handle_client(stream) {
|
||||||
|
eprintln!("Error handling client: {:?}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_client(mut stream: TcpStream) -> Result<()> {
|
||||||
|
loop {
|
||||||
|
let packet = Packet::read(&mut stream)?;
|
||||||
|
|
||||||
|
match packet.packet_type {
|
||||||
|
ConnectPacket::PACKET_TYPE => {
|
||||||
|
let packet = ConnectPacket::from_bytes(&packet.remaining_bytes)?;
|
||||||
|
println!("Connect packet: {:?}", packet);
|
||||||
|
|
||||||
|
ConnectAckPacket {
|
||||||
|
flags: ConnectAckFlags::empty(),
|
||||||
|
return_code: ConnectReturnCode::Accepted,
|
||||||
|
}
|
||||||
|
.to_packet()
|
||||||
|
.write(&mut stream)?;
|
||||||
|
}
|
||||||
|
SubscribePacket::PACKET_TYPE => {
|
||||||
|
let packet = SubscribePacket::from_bytes(&packet.remaining_bytes)?;
|
||||||
|
println!("Subscribe packet: {:?}", packet);
|
||||||
|
}
|
||||||
|
ty => eprintln!("Unsupported packet type: 0x{ty:x}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Packet {
|
||||||
|
packet_type: u8,
|
||||||
|
flags: u8,
|
||||||
|
remaining_length: u32,
|
||||||
|
remaining_bytes: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Packet {
|
||||||
|
fn write<Stream: Write>(&self, stream: &mut Stream) -> Result<()> {
|
||||||
|
let mut bytes = vec![self.packet_type << 4 | self.flags];
|
||||||
|
let mut remaining_length = self.remaining_length;
|
||||||
|
loop {
|
||||||
|
let mut byte = (remaining_length % 128) as u8;
|
||||||
|
remaining_length /= 128;
|
||||||
|
if remaining_length > 0 {
|
||||||
|
byte |= 0x80;
|
||||||
|
}
|
||||||
|
bytes.push(byte);
|
||||||
|
if remaining_length == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bytes.extend(self.remaining_bytes.iter());
|
||||||
|
|
||||||
|
stream.write_all(&bytes)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read<Stream: Read>(stream: &mut Stream) -> Result<Self> {
|
||||||
|
let mut header = [0; 2];
|
||||||
|
stream.read_exact(&mut header)?;
|
||||||
|
|
||||||
|
let (packet_type, flags) = (header[0] >> 4, header[0] & 0xF);
|
||||||
|
let mut multiplier = 1;
|
||||||
|
let mut remaining_length = 0;
|
||||||
|
let mut pos = 1;
|
||||||
|
loop {
|
||||||
|
let byte = header[pos];
|
||||||
|
remaining_length += (byte & 0x7F) as u32 * multiplier;
|
||||||
|
multiplier *= 128;
|
||||||
|
pos += 1;
|
||||||
|
if byte & 0x80 == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut remaining_bytes = vec![0; remaining_length as usize];
|
||||||
|
stream.read_exact(&mut remaining_bytes)?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
packet_type,
|
||||||
|
flags,
|
||||||
|
remaining_length,
|
||||||
|
remaining_bytes,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
trait MqttDeserialize<'a> {
|
||||||
|
fn read_string(&mut self) -> Cow<'a, str>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> MqttDeserialize<'a> for Deserializer<'a> {
|
||||||
|
fn read_string(&mut self) -> Cow<'a, str> {
|
||||||
|
let len = self.read_u16();
|
||||||
|
let buf = self.read_bytes(len as usize);
|
||||||
|
String::from_utf8_lossy(buf)
|
||||||
|
}
|
||||||
|
}
|
72
remote_send/src/mqtt/packets/connect.rs
Normal file
72
remote_send/src/mqtt/packets/connect.rs
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
use anyhow::Result;
|
||||||
|
use bitflags::bitflags;
|
||||||
|
|
||||||
|
use common::serde::Deserializer;
|
||||||
|
|
||||||
|
use crate::mqtt::MqttDeserialize;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ConnectPacket {
|
||||||
|
pub protocol_name: String,
|
||||||
|
pub protocol_level: u8,
|
||||||
|
pub connect_flags: ConnectFlags,
|
||||||
|
pub keep_alive: u16,
|
||||||
|
|
||||||
|
pub client_id: String,
|
||||||
|
pub will_topic: Option<String>,
|
||||||
|
pub will_message: Option<String>,
|
||||||
|
pub username: Option<String>,
|
||||||
|
pub password: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
bitflags! {
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ConnectFlags: u8 {
|
||||||
|
const USERNAME = 0b10000000;
|
||||||
|
const PASSWORD = 0b01000000;
|
||||||
|
const WILL_RETAIN = 0b00100000;
|
||||||
|
const WILL_QOS = 0b00011000;
|
||||||
|
const WILL_FLAG = 0b00000100;
|
||||||
|
const CLEAN_SESSION = 0b00000010;
|
||||||
|
const RESERVED = 0b00000001;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectPacket {
|
||||||
|
pub const PACKET_TYPE: u8 = 0x01;
|
||||||
|
|
||||||
|
pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
|
||||||
|
let mut des = Deserializer::new(bytes);
|
||||||
|
|
||||||
|
let protocol_name = des.read_string().into_owned();
|
||||||
|
let protocol_level = des.read_u8();
|
||||||
|
let connect_flags = ConnectFlags::from_bits(des.read_u8()).unwrap();
|
||||||
|
let keep_alive = des.read_u16();
|
||||||
|
|
||||||
|
let client_id = des.read_string().into_owned();
|
||||||
|
let will_topic = connect_flags
|
||||||
|
.contains(ConnectFlags::WILL_FLAG)
|
||||||
|
.then(|| des.read_string().into_owned());
|
||||||
|
let will_message = will_topic
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|_| des.read_string().into_owned().into());
|
||||||
|
let username = connect_flags
|
||||||
|
.contains(ConnectFlags::USERNAME)
|
||||||
|
.then(|| des.read_string().into_owned());
|
||||||
|
let password = connect_flags
|
||||||
|
.contains(ConnectFlags::PASSWORD)
|
||||||
|
.then(|| des.read_string().into_owned());
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
protocol_name,
|
||||||
|
protocol_level,
|
||||||
|
connect_flags,
|
||||||
|
keep_alive,
|
||||||
|
client_id,
|
||||||
|
will_topic,
|
||||||
|
will_message,
|
||||||
|
username,
|
||||||
|
password,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
57
remote_send/src/mqtt/packets/connect_ack.rs
Normal file
57
remote_send/src/mqtt/packets/connect_ack.rs
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
use bitflags::bitflags;
|
||||||
|
|
||||||
|
use crate::mqtt::Packet;
|
||||||
|
|
||||||
|
pub struct ConnectAckPacket {
|
||||||
|
pub flags: ConnectAckFlags,
|
||||||
|
pub return_code: ConnectReturnCode,
|
||||||
|
}
|
||||||
|
|
||||||
|
bitflags! {
|
||||||
|
pub struct ConnectAckFlags: u8 {
|
||||||
|
const SESSION_PRESENT = 0b00000001;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum ConnectReturnCode {
|
||||||
|
Accepted,
|
||||||
|
Refused(ConnectRefusedReason),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum ConnectRefusedReason {
|
||||||
|
UnacceptableProtocolVersion,
|
||||||
|
IdentifierRejected,
|
||||||
|
ServerUnavailable,
|
||||||
|
BadUsernameOrPassword,
|
||||||
|
NotAuthorized,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectAckPacket {
|
||||||
|
const PACKET_TYPE: u8 = 0x02;
|
||||||
|
|
||||||
|
pub fn to_packet(&self) -> Packet {
|
||||||
|
let body = vec![self.flags.bits(), self.return_code.as_u8()];
|
||||||
|
|
||||||
|
Packet {
|
||||||
|
packet_type: Self::PACKET_TYPE,
|
||||||
|
flags: 0,
|
||||||
|
remaining_length: body.len() as u32,
|
||||||
|
remaining_bytes: body,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectReturnCode {
|
||||||
|
fn as_u8(&self) -> u8 {
|
||||||
|
match self {
|
||||||
|
ConnectReturnCode::Accepted => 0,
|
||||||
|
ConnectReturnCode::Refused(reason) => match reason {
|
||||||
|
ConnectRefusedReason::UnacceptableProtocolVersion => 1,
|
||||||
|
ConnectRefusedReason::IdentifierRejected => 2,
|
||||||
|
ConnectRefusedReason::ServerUnavailable => 3,
|
||||||
|
ConnectRefusedReason::BadUsernameOrPassword => 4,
|
||||||
|
ConnectRefusedReason::NotAuthorized => 5,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
3
remote_send/src/mqtt/packets/mod.rs
Normal file
3
remote_send/src/mqtt/packets/mod.rs
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
pub mod connect;
|
||||||
|
pub mod connect_ack;
|
||||||
|
pub mod subscribe;
|
32
remote_send/src/mqtt/packets/subscribe.rs
Normal file
32
remote_send/src/mqtt/packets/subscribe.rs
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
use anyhow::Result;
|
||||||
|
|
||||||
|
use common::serde::Deserializer;
|
||||||
|
|
||||||
|
use crate::mqtt::MqttDeserialize;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct SubscribePacket {
|
||||||
|
pub packet_id: u16,
|
||||||
|
pub filters: Vec<(String, QoS)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct QoS(pub u8);
|
||||||
|
|
||||||
|
impl SubscribePacket {
|
||||||
|
pub const PACKET_TYPE: u8 = 0x08;
|
||||||
|
|
||||||
|
pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
|
||||||
|
let mut des = Deserializer::new(bytes);
|
||||||
|
|
||||||
|
let packet_id = des.read_u16();
|
||||||
|
let mut filters = Vec::new();
|
||||||
|
while !des.is_empty() {
|
||||||
|
let topic = des.read_string().into_owned();
|
||||||
|
let qos = des.read_u8();
|
||||||
|
filters.push((topic, QoS(qos)));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Self { packet_id, filters })
|
||||||
|
}
|
||||||
|
}
|
@@ -50,7 +50,7 @@ pub fn ui(app: &mut App, ctx: &Context, _frame: &mut Frame) {
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
egui::Frame::default().show(ui, |ui| {
|
egui::Frame::canvas(&ui.style()).show(ui, |ui| {
|
||||||
let available_size = ui.available_size();
|
let available_size = ui.available_size();
|
||||||
let (rect, _response) = ui.allocate_exact_size(
|
let (rect, _response) = ui.allocate_exact_size(
|
||||||
Vec2::new(
|
Vec2::new(
|
||||||
|
Reference in New Issue
Block a user