diff --git a/pkgs/mx-sanebot/src/main.rs b/pkgs/mx-sanebot/src/main.rs index 8bda435a..6e0e4edc 100644 --- a/pkgs/mx-sanebot/src/main.rs +++ b/pkgs/mx-sanebot/src/main.rs @@ -9,7 +9,8 @@ use matrix_sdk::{ deserialized_responses::SyncResponse, }; use ruma::{ - RoomId, + OwnedRoomId, + OwnedUserId, events::{ AnySyncMessageLikeEvent, AnySyncTimelineEvent, @@ -34,6 +35,22 @@ struct Runner { client: Client, } +/// this encodes the specific set of Matrix events i might ever care about. +#[derive(Debug)] +enum Event { + /// i was invited to the given room + Invitation(OwnedRoomId), + /// i see a message in the given room, from the given user + Message(OwnedRoomId, OwnedUserId, String), +} + +/// some action i might do, usually in response to an Event. +#[derive(Debug)] +enum Action { + AcceptInvite(OwnedRoomId), + SendMessage(OwnedRoomId, String), +} + impl Runner { async fn login( homeserver: &str, @@ -91,68 +108,53 @@ impl Runner { // doing so do not need to worry about double responses. let settings = build_sync_settings(None); let response = self.client.sync_once(settings).await.unwrap(); - self.handle_sync_response(&response).await; + let next_token = response.next_batch.clone(); + self.act_on_sync_response(response).await; println!("sync'd"); - let settings = build_sync_settings(Some(response.next_batch)); + let settings = build_sync_settings(Some(next_token)); let mut sync_stream = Box::pin( self.client.sync_stream(settings) .await ); while let Some(Ok(response)) = sync_stream.next().await { // println!("handling sync responses"); - self.handle_sync_response(&response).await; + self.act_on_sync_response(response).await; } Ok(()) } - async fn handle_sync_response(&self, response: &SyncResponse) { - for (room_id, _room) in &response.rooms.invite { - self.handle_room_invite(room_id).await; - } - for (room_id, room) in &response.rooms.join { - for e in &room.timeline.events { - if let Ok(event) = e.event.deserialize() { - self.handle_event(room_id, event).await; - } - } + fn parse_sync_response<'a>(&'a self, response: SyncResponse) -> impl Iterator + 'a { + let events_from_invited_rooms = response.rooms.invite + .into_iter() + .map(|(room_id, _room)| { + self.parse_room_invite(room_id) + }); + let events_from_joined_rooms = response.rooms.join + .into_iter() + .flat_map(move |(room_id, room)| { + room.timeline.events.into_iter().flat_map(move |e| match e.event.deserialize() { + Ok(event) => self.parse_timeline_event(room_id.clone(), event), + Err(_) => None, + }) + }); + events_from_invited_rooms.chain(events_from_joined_rooms) + } + + async fn act_on_sync_response(&self, response: SyncResponse) { + for event in self.parse_sync_response(response) { + self.act_on_event(event).await; } } - async fn handle_room_invite(&self, room_id: &RoomId) { + fn parse_room_invite(&self, room_id: OwnedRoomId) -> Event { println!("Received invite {:?}", room_id); - // matrix example claims: - // """ - // The event handlers are called before the next sync begins, but - // methods that change the state of a room (joining, leaving a room) - // wait for the sync to return the new room state so we need to spawn - // a new task for them. - // """ - let room = self.client.get_invited_room(room_id).unwrap(); - tokio::spawn(async move { - let mut delay = 2; - - while let Err(err) = room.accept_invitation().await { - // retry autojoin due to synapse sending invites, before the - // invited user can join for more information see - // https://github.com/matrix-org/synapse/issues/4345 - eprintln!("Failed to join room {} ({err:?}), retrying in {delay}s", room.room_id()); - - sleep(Duration::from_secs(delay)).await; - delay *= 2; - - if delay > 3600 { - eprintln!("Can't join room {} ({err:?})", room.room_id()); - break; - } - } - println!("Successfully joined room {}", room.room_id()); - }); + Event::Invitation(room_id) } - async fn handle_event(&self, room_id: &RoomId, event: AnySyncTimelineEvent) { - println!("Considering event {:?}", event); + fn parse_timeline_event(&self, room_id: OwnedRoomId, event: AnySyncTimelineEvent) -> Option { + println!("Considering timeline event {:?}", event); let sender = event.sender(); // protect against a bad sync filter that would cause me to see my own events assert_ne!(Some(sender), self.client.user_id()); @@ -160,27 +162,70 @@ impl Runner { match event { AnySyncTimelineEvent::MessageLike(ref msg_like) => match msg_like { AnySyncMessageLikeEvent::RoomMessage(SyncMessageLikeEvent::Original(room_msg)) => match room_msg.content.msgtype { - MessageType::Text(ref text_msg) => { - let body = &text_msg.body; - println!("message from {sender}: {body}\n"); - let resp = MessageHandler.on_msg(&body); - println!("response: {resp}"); + MessageType::Text(ref text_msg) => Some( + Event::Message(room_id, sender.to_owned(), text_msg.body.clone()) + ), + _ => None, + }, + _ => None, + }, + _ => None, + } + } - let room = self.client.get_joined_room(room_id).unwrap(); - let resp_content = RoomMessageEventContent::text_plain(&resp); - room.send(resp_content, None).await.unwrap(); - }, - ref other => { - println!("dropping RoomMessage event {other:?}"); - }, - }, - other => { - println!("dropping MessageLike event {other:?}"); - }, - }, - AnySyncTimelineEvent::State(state) => { - println!("dropping State event {state:?}"); - }, + async fn act_on_event(&self, event: Event) { + self.perform_action(self.map_event_to_action(event)).await; + } + + fn map_event_to_action(&self, event: Event) -> Action { + println!("processing event {event:?}"); + match event { + Event::Invitation(room_id) => Action::AcceptInvite(room_id), + Event::Message(room_id, _sender_id, body) => { + let resp = MessageHandler.on_msg(&body); + Action::SendMessage(room_id, resp) + } + } + } + + async fn perform_action(&self, action: Action) { + println!("performing action: {action:?}"); + match action { + Action::AcceptInvite(room_id) => { + // matrix example claims: + // """ + // The event handlers are called before the next sync begins, but + // methods that change the state of a room (joining, leaving a room) + // wait for the sync to return the new room state so we need to spawn + // a new task for them. + // """ + let room = self.client.get_invited_room(&room_id).unwrap(); + tokio::spawn(async move { + let mut delay = 2; + + while let Err(err) = room.accept_invitation().await { + // retry autojoin due to synapse sending invites, before the + // invited user can join for more information see + // https://github.com/matrix-org/synapse/issues/4345 + eprintln!("Failed to join room {} ({err:?}), retrying in {delay}s", room.room_id()); + + sleep(Duration::from_secs(delay)).await; + delay *= 2; + + if delay > 3600 { + eprintln!("Can't join room {} ({err:?})", room.room_id()); + break; + } + } + println!("Successfully joined room {}", room.room_id()); + }); + } + Action::SendMessage(room_id, msg) => { + + let room = self.client.get_joined_room(&room_id).unwrap(); + let resp_content = RoomMessageEventContent::text_plain(&msg); + room.send(resp_content, None).await.unwrap(); + } } } }