mx-sanebot: split out a cleaner API between the Matrix events and the bot's event-handling logic

This commit is contained in:
Colin 2023-04-28 01:33:47 +00:00
parent 7b141f6f58
commit afc916c9f8

View File

@ -9,7 +9,8 @@ use matrix_sdk::{
deserialized_responses::SyncResponse, deserialized_responses::SyncResponse,
}; };
use ruma::{ use ruma::{
RoomId, OwnedRoomId,
OwnedUserId,
events::{ events::{
AnySyncMessageLikeEvent, AnySyncMessageLikeEvent,
AnySyncTimelineEvent, AnySyncTimelineEvent,
@ -34,6 +35,22 @@ struct Runner {
client: Client, client: Client,
} }
/// this encodes the specific set of Matrix events i might ever care about.
#[derive(Debug)]
enum Event {
/// i was invited to the given room
Invitation(OwnedRoomId),
/// i see a message in the given room, from the given user
Message(OwnedRoomId, OwnedUserId, String),
}
/// some action i might do, usually in response to an Event.
#[derive(Debug)]
enum Action {
AcceptInvite(OwnedRoomId),
SendMessage(OwnedRoomId, String),
}
impl Runner { impl Runner {
async fn login( async fn login(
homeserver: &str, homeserver: &str,
@ -91,68 +108,53 @@ impl Runner {
// doing so do not need to worry about double responses. // doing so do not need to worry about double responses.
let settings = build_sync_settings(None); let settings = build_sync_settings(None);
let response = self.client.sync_once(settings).await.unwrap(); let response = self.client.sync_once(settings).await.unwrap();
self.handle_sync_response(&response).await; let next_token = response.next_batch.clone();
self.act_on_sync_response(response).await;
println!("sync'd"); println!("sync'd");
let settings = build_sync_settings(Some(response.next_batch)); let settings = build_sync_settings(Some(next_token));
let mut sync_stream = Box::pin( let mut sync_stream = Box::pin(
self.client.sync_stream(settings) self.client.sync_stream(settings)
.await .await
); );
while let Some(Ok(response)) = sync_stream.next().await { while let Some(Ok(response)) = sync_stream.next().await {
// println!("handling sync responses"); // println!("handling sync responses");
self.handle_sync_response(&response).await; self.act_on_sync_response(response).await;
} }
Ok(()) Ok(())
} }
async fn handle_sync_response(&self, response: &SyncResponse) { fn parse_sync_response<'a>(&'a self, response: SyncResponse) -> impl Iterator<Item=Event> + 'a {
for (room_id, _room) in &response.rooms.invite { let events_from_invited_rooms = response.rooms.invite
self.handle_room_invite(room_id).await; .into_iter()
} .map(|(room_id, _room)| {
for (room_id, room) in &response.rooms.join { self.parse_room_invite(room_id)
for e in &room.timeline.events { });
if let Ok(event) = e.event.deserialize() { let events_from_joined_rooms = response.rooms.join
self.handle_event(room_id, event).await; .into_iter()
} .flat_map(move |(room_id, room)| {
} room.timeline.events.into_iter().flat_map(move |e| match e.event.deserialize() {
Ok(event) => self.parse_timeline_event(room_id.clone(), event),
Err(_) => None,
})
});
events_from_invited_rooms.chain(events_from_joined_rooms)
}
async fn act_on_sync_response(&self, response: SyncResponse) {
for event in self.parse_sync_response(response) {
self.act_on_event(event).await;
} }
} }
async fn handle_room_invite(&self, room_id: &RoomId) { fn parse_room_invite(&self, room_id: OwnedRoomId) -> Event {
println!("Received invite {:?}", room_id); println!("Received invite {:?}", room_id);
// matrix example claims: Event::Invitation(room_id)
// """
// The event handlers are called before the next sync begins, but
// methods that change the state of a room (joining, leaving a room)
// wait for the sync to return the new room state so we need to spawn
// a new task for them.
// """
let room = self.client.get_invited_room(room_id).unwrap();
tokio::spawn(async move {
let mut delay = 2;
while let Err(err) = room.accept_invitation().await {
// retry autojoin due to synapse sending invites, before the
// invited user can join for more information see
// https://github.com/matrix-org/synapse/issues/4345
eprintln!("Failed to join room {} ({err:?}), retrying in {delay}s", room.room_id());
sleep(Duration::from_secs(delay)).await;
delay *= 2;
if delay > 3600 {
eprintln!("Can't join room {} ({err:?})", room.room_id());
break;
}
}
println!("Successfully joined room {}", room.room_id());
});
} }
async fn handle_event(&self, room_id: &RoomId, event: AnySyncTimelineEvent) { fn parse_timeline_event(&self, room_id: OwnedRoomId, event: AnySyncTimelineEvent) -> Option<Event> {
println!("Considering event {:?}", event); println!("Considering timeline event {:?}", event);
let sender = event.sender(); let sender = event.sender();
// protect against a bad sync filter that would cause me to see my own events // protect against a bad sync filter that would cause me to see my own events
assert_ne!(Some(sender), self.client.user_id()); assert_ne!(Some(sender), self.client.user_id());
@ -160,27 +162,70 @@ impl Runner {
match event { match event {
AnySyncTimelineEvent::MessageLike(ref msg_like) => match msg_like { AnySyncTimelineEvent::MessageLike(ref msg_like) => match msg_like {
AnySyncMessageLikeEvent::RoomMessage(SyncMessageLikeEvent::Original(room_msg)) => match room_msg.content.msgtype { AnySyncMessageLikeEvent::RoomMessage(SyncMessageLikeEvent::Original(room_msg)) => match room_msg.content.msgtype {
MessageType::Text(ref text_msg) => { MessageType::Text(ref text_msg) => Some(
let body = &text_msg.body; Event::Message(room_id, sender.to_owned(), text_msg.body.clone())
println!("message from {sender}: {body}\n"); ),
let resp = MessageHandler.on_msg(&body); _ => None,
println!("response: {resp}"); },
_ => None,
},
_ => None,
}
}
let room = self.client.get_joined_room(room_id).unwrap(); async fn act_on_event(&self, event: Event) {
let resp_content = RoomMessageEventContent::text_plain(&resp); self.perform_action(self.map_event_to_action(event)).await;
room.send(resp_content, None).await.unwrap(); }
},
ref other => { fn map_event_to_action(&self, event: Event) -> Action {
println!("dropping RoomMessage event {other:?}"); println!("processing event {event:?}");
}, match event {
}, Event::Invitation(room_id) => Action::AcceptInvite(room_id),
other => { Event::Message(room_id, _sender_id, body) => {
println!("dropping MessageLike event {other:?}"); let resp = MessageHandler.on_msg(&body);
}, Action::SendMessage(room_id, resp)
}, }
AnySyncTimelineEvent::State(state) => { }
println!("dropping State event {state:?}"); }
},
async fn perform_action(&self, action: Action) {
println!("performing action: {action:?}");
match action {
Action::AcceptInvite(room_id) => {
// matrix example claims:
// """
// The event handlers are called before the next sync begins, but
// methods that change the state of a room (joining, leaving a room)
// wait for the sync to return the new room state so we need to spawn
// a new task for them.
// """
let room = self.client.get_invited_room(&room_id).unwrap();
tokio::spawn(async move {
let mut delay = 2;
while let Err(err) = room.accept_invitation().await {
// retry autojoin due to synapse sending invites, before the
// invited user can join for more information see
// https://github.com/matrix-org/synapse/issues/4345
eprintln!("Failed to join room {} ({err:?}), retrying in {delay}s", room.room_id());
sleep(Duration::from_secs(delay)).await;
delay *= 2;
if delay > 3600 {
eprintln!("Can't join room {} ({err:?})", room.room_id());
break;
}
}
println!("Successfully joined room {}", room.room_id());
});
}
Action::SendMessage(room_id, msg) => {
let room = self.client.get_joined_room(&room_id).unwrap();
let resp_content = RoomMessageEventContent::text_plain(&msg);
room.send(resp_content, None).await.unwrap();
}
} }
} }
} }