mx-sanebot: stream the sync events to avoid an inversion of control
the handler callback API is a poor fit for Rust lifetimes, so avoid it when possible
This commit is contained in:
parent
66c42916c8
commit
d385845dd5
15
pkgs/mx-sanebot/Cargo.lock
generated
15
pkgs/mx-sanebot/Cargo.lock
generated
|
@ -1356,6 +1356,7 @@ name = "mx-sanebot"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
"futures",
|
||||||
"matrix-sdk",
|
"matrix-sdk",
|
||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
|
@ -1402,9 +1403,9 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "openssl"
|
name = "openssl"
|
||||||
version = "0.10.51"
|
version = "0.10.52"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "97ea2d98598bf9ada7ea6ee8a30fb74f9156b63bbe495d64ec2b87c269d2dda3"
|
checksum = "01b8574602df80f7b85fdfc5392fa884a4e3b3f4f35402c070ab34c3d3f78d56"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags",
|
"bitflags",
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
|
@ -1434,9 +1435,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "openssl-sys"
|
name = "openssl-sys"
|
||||||
version = "0.9.86"
|
version = "0.9.87"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "992bac49bdbab4423199c654a5515bd2a6c6a23bf03f2dd3bdb7e5ae6259bc69"
|
checksum = "8e17f59264b2809d77ae94f0e1ebabc434773f370d6ca667bd223ea10e06cc7e"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"cc",
|
"cc",
|
||||||
"libc",
|
"libc",
|
||||||
|
@ -2383,13 +2384,13 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tracing-attributes"
|
name = "tracing-attributes"
|
||||||
version = "0.1.23"
|
version = "0.1.24"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a"
|
checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2 1.0.56",
|
"proc-macro2 1.0.56",
|
||||||
"quote 1.0.26",
|
"quote 1.0.26",
|
||||||
"syn 1.0.109",
|
"syn 2.0.15",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
|
@ -7,5 +7,6 @@ edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
|
futures = "0.3"
|
||||||
matrix-sdk = "0.6.2"
|
matrix-sdk = "0.6.2"
|
||||||
tokio = { version = "1.20.1", features = ["macros", "rt-multi-thread"] }
|
tokio = { version = "1.20.1", features = ["macros", "rt-multi-thread"] }
|
||||||
|
|
|
@ -2,20 +2,30 @@ mod msg_handler;
|
||||||
|
|
||||||
use std::env;
|
use std::env;
|
||||||
|
|
||||||
|
use futures::StreamExt as _;
|
||||||
use matrix_sdk::{
|
use matrix_sdk::{
|
||||||
config::SyncSettings,
|
config::SyncSettings,
|
||||||
room::Room,
|
room::Room,
|
||||||
ruma::events::room::member::StrippedRoomMemberEvent,
|
|
||||||
ruma::events::room::message::{
|
|
||||||
MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent,
|
|
||||||
},
|
|
||||||
Client,
|
Client,
|
||||||
};
|
};
|
||||||
|
use matrix_sdk::ruma::RoomId;
|
||||||
|
use matrix_sdk::ruma::events::{
|
||||||
|
AnySyncMessageLikeEvent,
|
||||||
|
AnySyncTimelineEvent,
|
||||||
|
SyncMessageLikeEvent,
|
||||||
|
};
|
||||||
|
use matrix_sdk::ruma::events::room::member::StrippedRoomMemberEvent;
|
||||||
|
use matrix_sdk::ruma::events::room::message::{
|
||||||
|
MessageType,
|
||||||
|
RoomMessageEventContent,
|
||||||
|
};
|
||||||
use tokio::time::{sleep, Duration};
|
use tokio::time::{sleep, Duration};
|
||||||
|
|
||||||
use msg_handler::MessageHandler;
|
use msg_handler::MessageHandler;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
struct Runner {
|
struct Runner {
|
||||||
|
// this is actually a *handle* to the client (Arc).
|
||||||
client: Client,
|
client: Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,48 +64,68 @@ impl Runner {
|
||||||
// initial sync will be skipped in favor of loading state from the store
|
// initial sync will be skipped in favor of loading state from the store
|
||||||
let response = self.client.sync_once(SyncSettings::default()).await.unwrap();
|
let response = self.client.sync_once(SyncSettings::default()).await.unwrap();
|
||||||
println!("sync'd");
|
println!("sync'd");
|
||||||
|
|
||||||
|
let settings = SyncSettings::default().token(response.next_batch);
|
||||||
|
let mut sync_stream = Box::pin(
|
||||||
|
self.client.sync_stream(settings)
|
||||||
|
.await
|
||||||
|
);
|
||||||
|
while let Some(Ok(response)) = sync_stream.next().await {
|
||||||
|
for (room_id, room) in &response.rooms.join {
|
||||||
|
for e in &room.timeline.events {
|
||||||
|
if let Ok(event) = e.event.deserialize() {
|
||||||
|
self.handle_event(room_id, event).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// add our CommandBot to be notified of incoming messages, we do this after the
|
// add our CommandBot to be notified of incoming messages, we do this after the
|
||||||
// initial sync to avoid responding to messages before the bot was running.
|
// initial sync to avoid responding to messages before the bot was running.
|
||||||
self.client.add_event_handler(on_room_message);
|
// self.client.add_event_handler(on_room_message);
|
||||||
|
|
||||||
// since we called `sync_once` before we entered our sync loop we must pass
|
// // since we called `sync_once` before we entered our sync loop we must pass
|
||||||
// that sync token to `sync`
|
// // that sync token to `sync`
|
||||||
let settings = SyncSettings::default().token(response.next_batch);
|
// let settings = SyncSettings::default().token(response.next_batch);
|
||||||
// this keeps state from the server streaming in to CommandBot via the
|
// // this keeps state from the server streaming in to CommandBot via the
|
||||||
// EventHandler trait
|
// // EventHandler trait
|
||||||
self.client.sync(settings).await?;
|
// self.client.sync(settings).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
async fn on_room_message(event: OriginalSyncRoomMessageEvent, client: Client, room: Room) {
|
async fn handle_event(&self, room_id: &RoomId, event: AnySyncTimelineEvent) {
|
||||||
println!("received event");
|
println!("Considering event {:?}", event);
|
||||||
if let Room::Joined(room) = room {
|
let sender = event.sender();
|
||||||
let text_content = match event.content.msgtype {
|
if Some(sender) == self.client.user_id() {
|
||||||
MessageType::Text(t) => t,
|
return; // don't react to self
|
||||||
_ => return, // not of interest
|
|
||||||
};
|
|
||||||
|
|
||||||
let sender = event.sender;
|
|
||||||
let msg = text_content.body;
|
|
||||||
println!("message from {sender}: {msg}\n");
|
|
||||||
|
|
||||||
if client.user_id() == Some(sender.as_ref()) {
|
|
||||||
return; // don't respond to myself!
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let resp = MessageHandler.on_msg(&msg);
|
match event {
|
||||||
println!("response: {}", resp);
|
AnySyncTimelineEvent::MessageLike(ref msg_like) => match msg_like {
|
||||||
|
AnySyncMessageLikeEvent::RoomMessage(SyncMessageLikeEvent::Original(room_msg)) => match room_msg.content.msgtype {
|
||||||
|
MessageType::Text(ref text_msg) => {
|
||||||
|
let body = &text_msg.body;
|
||||||
|
println!("message from {sender}: {body}\n");
|
||||||
|
let resp = MessageHandler.on_msg(&body);
|
||||||
|
println!("response: {resp}");
|
||||||
|
|
||||||
let resp_content = RoomMessageEventContent::text_plain(&resp);
|
let room = self.client.get_joined_room(room_id).unwrap();
|
||||||
|
let resp_content = RoomMessageEventContent::text_plain(&resp);
|
||||||
// send our message to the room we found the "!ping" command in
|
room.send(resp_content, None).await.unwrap();
|
||||||
// the last parameter is an optional transaction id which we don't
|
},
|
||||||
// care about.
|
ref other => {
|
||||||
room.send(resp_content, None).await.unwrap();
|
println!("dropping RoomMessage event {other:?}");
|
||||||
|
},
|
||||||
println!("response sent");
|
},
|
||||||
|
other => {
|
||||||
|
println!("dropping MessageLike event {other:?}");
|
||||||
|
},
|
||||||
|
},
|
||||||
|
AnySyncTimelineEvent::State(state) => {
|
||||||
|
println!("dropping State event {state:?}");
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user