mx-sanebot: refactor event handling

now we handle invites using the same sync idioms as with normal messages
This commit is contained in:
Colin 2023-04-27 23:35:42 +00:00
parent fd82256bbc
commit 7b141f6f58
6 changed files with 156 additions and 73 deletions

View File

@ -52,6 +52,11 @@
sopsFile = ../../secrets/universal.yaml;
owner = config.users.users.colin.name;
};
sops.secrets."mx-sanebot-env" = {
sopsFile = ../../secrets/universal/mx-sanebot-env.bin;
format = "binary";
owner = config.users.users.colin.name;
};
sops.secrets."router_passwd" = {
sopsFile = ../../secrets/universal.yaml;
};

View File

@ -1358,6 +1358,8 @@ dependencies = [
"anyhow",
"futures",
"matrix-sdk",
"ruma",
"ruma-client-api",
"tokio",
]

View File

@ -9,4 +9,6 @@ edition = "2021"
anyhow = "1.0"
futures = "0.3"
matrix-sdk = "0.6.2"
ruma = "*" # matrix-sdk dep
ruma-client-api = "*" # ruma dep
tokio = { version = "1.20.1", features = ["macros", "rt-multi-thread"] }

View File

@ -15,6 +15,6 @@
packages.mx-sanebot = pkgs.callPackage ./default.nix { };
defaultPackage = packages.mx-sanebot;
devShells.default = ./shell.nix { inherit pkgs; };
devShells.default = import ./shell.nix { inherit pkgs; };
});
}

View File

@ -4,20 +4,25 @@ use std::env;
use futures::StreamExt as _;
use matrix_sdk::{
config::SyncSettings,
room::Room,
Client,
config::SyncSettings,
deserialized_responses::SyncResponse,
};
use matrix_sdk::ruma::RoomId;
use matrix_sdk::ruma::events::{
AnySyncMessageLikeEvent,
AnySyncTimelineEvent,
SyncMessageLikeEvent,
use ruma::{
RoomId,
events::{
AnySyncMessageLikeEvent,
AnySyncTimelineEvent,
SyncMessageLikeEvent,
room::message::{
MessageType,
RoomMessageEventContent,
},
},
};
use matrix_sdk::ruma::events::room::member::StrippedRoomMemberEvent;
use matrix_sdk::ruma::events::room::message::{
MessageType,
RoomMessageEventContent,
use ruma_client_api::{
filter::FilterDefinition,
sync::sync_events::v3::Filter,
};
use tokio::time::{sleep, Duration};
@ -53,42 +58,104 @@ impl Runner {
}
async fn event_loop(&self) -> anyhow::Result<()> {
// Now, we want our client to react to invites. Invites sent us stripped member
// state events so we want to react to them. We add the event handler before
// the sync, so this happens also for older messages. All rooms we've
// already entered won't have stripped states anymore and thus won't fire
self.client.add_event_handler(on_stripped_state_member);
// event types if i only care about monitoring invites
let types_for_invites = [
"m.room.member".to_owned(), // StrippedRoomMemberEvent
];
// event types i care about during normal operation
let types_for_all = [
"m.room.member".to_owned(), // StrippedRoomMemberEvent
"m.room.message".to_owned(), // RoomMessageEvent
];
// always ignore messages from self
let not_senders = [ self.client.user_id().unwrap().to_owned() ];
// An initial sync to set up state and so our bot doesn't respond to old
// messages. If the `StateStore` finds saved state in the location given the
// initial sync will be skipped in favor of loading state from the store
let response = self.client.sync_once(SyncSettings::default()).await.unwrap();
let build_sync_settings = |token| {
let mut filter = FilterDefinition::default();
filter.room.timeline.not_senders = &not_senders;
filter.room.timeline.types = Some(match token {
None => &types_for_invites,
Some(_) => &types_for_all,
});
let mut settings = SyncSettings::default().filter(
Filter::FilterDefinition(filter)
);
if let Some(t) = token {
settings = settings.token(t);
}
settings
};
// initial sync during which i handle only room invites, but ignore any messages
// from before now. this means i ignore messages from when i was offline, but in
// doing so do not need to worry about double responses.
let settings = build_sync_settings(None);
let response = self.client.sync_once(settings).await.unwrap();
self.handle_sync_response(&response).await;
println!("sync'd");
let settings = SyncSettings::default().token(response.next_batch);
let settings = build_sync_settings(Some(response.next_batch));
let mut sync_stream = Box::pin(
self.client.sync_stream(settings)
.await
);
while let Some(Ok(response)) = sync_stream.next().await {
for (room_id, room) in &response.rooms.join {
for e in &room.timeline.events {
if let Ok(event) = e.event.deserialize() {
self.handle_event(room_id, event).await;
}
}
}
// println!("handling sync responses");
self.handle_sync_response(&response).await;
}
Ok(())
}
async fn handle_sync_response(&self, response: &SyncResponse) {
for (room_id, _room) in &response.rooms.invite {
self.handle_room_invite(room_id).await;
}
for (room_id, room) in &response.rooms.join {
for e in &room.timeline.events {
if let Ok(event) = e.event.deserialize() {
self.handle_event(room_id, event).await;
}
}
}
}
async fn handle_room_invite(&self, room_id: &RoomId) {
println!("Received invite {:?}", room_id);
// matrix example claims:
// """
// The event handlers are called before the next sync begins, but
// methods that change the state of a room (joining, leaving a room)
// wait for the sync to return the new room state so we need to spawn
// a new task for them.
// """
let room = self.client.get_invited_room(room_id).unwrap();
tokio::spawn(async move {
let mut delay = 2;
while let Err(err) = room.accept_invitation().await {
// retry autojoin due to synapse sending invites, before the
// invited user can join for more information see
// https://github.com/matrix-org/synapse/issues/4345
eprintln!("Failed to join room {} ({err:?}), retrying in {delay}s", room.room_id());
sleep(Duration::from_secs(delay)).await;
delay *= 2;
if delay > 3600 {
eprintln!("Can't join room {} ({err:?})", room.room_id());
break;
}
}
println!("Successfully joined room {}", room.room_id());
});
}
async fn handle_event(&self, room_id: &RoomId, event: AnySyncTimelineEvent) {
println!("Considering event {:?}", event);
let sender = event.sender();
if Some(sender) == self.client.user_id() {
return; // don't react to self
}
// protect against a bad sync filter that would cause me to see my own events
assert_ne!(Some(sender), self.client.user_id());
match event {
AnySyncTimelineEvent::MessageLike(ref msg_like) => match msg_like {
@ -118,47 +185,6 @@ impl Runner {
}
}
// Whenever we see a new stripped room member event, we've asked our client to
// call this function. So what exactly are we doing then?
async fn on_stripped_state_member(
room_member: StrippedRoomMemberEvent,
client: Client,
room: Room,
) {
if room_member.state_key != client.user_id().unwrap() {
// the invite we've seen isn't for us, but for someone else. ignore
return;
}
// looks like the room is an invited room, let's attempt to join then
if let Room::Invited(room) = room {
// The event handlers are called before the next sync begins, but
// methods that change the state of a room (joining, leaving a room)
// wait for the sync to return the new room state so we need to spawn
// a new task for them.
tokio::spawn(async move {
println!("Autojoining room {}", room.room_id());
let mut delay = 2;
while let Err(err) = room.accept_invitation().await {
// retry autojoin due to synapse sending invites, before the
// invited user can join for more information see
// https://github.com/matrix-org/synapse/issues/4345
eprintln!("Failed to join room {} ({err:?}), retrying in {delay}s", room.room_id());
sleep(Duration::from_secs(delay)).await;
delay *= 2;
if delay > 3600 {
eprintln!("Can't join room {} ({err:?})", room.room_id());
break;
}
}
println!("Successfully joined room {}", room.room_id());
});
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {

View File

@ -0,0 +1,48 @@
{
"data": "ENC[AES256_GCM,data:h0hENUiU3GkK9INz/A/ouklTJcWlHQFWgs9mdmzqgr06su5Jgw==,iv:BafM5OHIkj07mDh8ukhsrdK2k0r2EJf14c00cggBDEs=,tag:CJW2k0Dnawru1Kyve2pMrQ==,type:str]",
"sops": {
"kms": null,
"gcp_kms": null,
"azure_kv": null,
"hc_vault": null,
"age": [
{
"recipient": "age1tnl4jfgacwkargzeqnhzernw29xx8mkv73xh6ufdyde6q7859slsnzf24x",
"enc": "-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSBNcmNwbU14bnFkTjMzYTZp\nblRGR0R0cDlrMDl6VDRRZmFGanM4NHY5Q1MwCjg3U2dtSGhHUmgrRzZVREZsL0ZH\nQTRyRTMxc05NaC85eW5Pc0ZJdHdFYzQKLS0tIDZFdGI0cTUxSlEwN1ZpMzFzMHlk\nWlJKNzZGN3N5bVk5cUI4T3Fya2NaMm8K3R99wPFdFiEtPCen2Wa9YzwxXnEcJobc\nuG8xvQUfUY/n0C2Ch4NJ5m9r+76WrnY3uG8f5o3OgWE9AiY1PUSZfQ==\n-----END AGE ENCRYPTED FILE-----\n"
},
{
"recipient": "age1j2pqnl8j0krdzk6npe93s4nnqrzwx978qrc0u570gzlamqpnje9sc8le2g",
"enc": "-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSAzQnBHM0JJc0FJRUtnK1dV\ndmVwS3dnN2pKOHd1dU5SZ2hDT0grZEFHRFQ0ClRxcWw3NlM1Q0ZNVnY4N2JlUXpK\nb2l4cWVDNzViU1lYaStyK3pUM1Z3R28KLS0tIHhoVGtNUHVUUFJ3bFUraTJDdGY2\nMEc5ekh1a3hkTHZCNTRUVytieDhtNVkK7MR1CCPii79Yk3xJ4A+6oYnI54ykEXQM\nquEYdontLyX5s061p3PmHutEHOIGpQDQDDunbAnfIdSKngZuX/kY/A==\n-----END AGE ENCRYPTED FILE-----\n"
},
{
"recipient": "age1z8fauff34cdecr6sjkre260luzxcca05kpcwvhx988d306tpcejsp63znu",
"enc": "-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSBUUTJLbEFsUW9LWEJrRjIz\nQ1dhZ2dGZFF1TDFSYUFmcERMemhaT1RXYUZzCmhCN2dSM0RJSDQ4WEtDcVN3RGhK\nUldnWHpnc1FWV2FOOGVINjZydCtBZ2MKLS0tIEJpaHB1KzBnQWg4ajA1dE9zU1FL\naHUrMnJQYk1BM1NpbEsrNWVBa1hXOGMK2gLe3Ddj9ErlzkR0RjlsmnEiY84VBMj8\nKV0NklDpxRdhYgrEKcJi6b+RZEKDLu1vlwGj70AzJUv3OK5uZoLCSw==\n-----END AGE ENCRYPTED FILE-----\n"
},
{
"recipient": "age1zsrsvd7j6l62fjxpfd2qnhqlk8wk4p8r0dtxpe4sdgnh2474095qdu7xj9",
"enc": "-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSB0WkhZZVlFaHFqeFp6aVdB\nMGRJSGVPRmJJeCttRjIva1pKM3EwMGVkdDFZCnFGTDdMc2ZURXR6NmRDTkRVYWg5\nbmd2V0FhdjNmQUx4b1NCRXEyU0lxL0UKLS0tIHJQRy82V0RkS2VNUGg0cnFFdHA4\nYUdMUStqM3NmRnVHQys2dzJ1cmhWYXcKxWRun88b2lClrI5zKl/XC8glXbE7nUFH\ncPdqSz4mYsPO8f5KZ8GZ8UZ68HBjrlS1vW6Zt1WmkUfXK3wOh2oKVg==\n-----END AGE ENCRYPTED FILE-----\n"
},
{
"recipient": "age1vnw7lnfpdpjn62l3u5nyv5xt2c965k96p98kc43mcnyzpetrts9q54mc9v",
"enc": "-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSBIWXZXM29MWVlzQ2dEdi9L\nSkNlTXhUMkpwUC9UdXNwZTFsWjF1M0pZNzN3CkFKcHVyaHRzbVVNK3ZvRXBrTHNQ\naEhLL0puRklJbkgxK1VoaHZkRFF2ZzAKLS0tIFh2NVFJbTBmWnFueFlDWnkyVXBE\nZGdGTkhMQzdUQmpCdGFyRjVXeWZoMU0Km9gc4GdQFFL3+O54TT50YtqkDtFYQggF\nqbVBXNjX8saxODceotz5+Kb2M4nkNl9k94K0dPIdbckLZasTfFHuWw==\n-----END AGE ENCRYPTED FILE-----\n"
},
{
"recipient": "age1w7mectcjku6x3sd8plm8wkn2qfrhv9n6zhzlf329e2r2uycgke8qkf9dyn",
"enc": "-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSBJSDQ5TFRXOTkvZmZLS1JP\nYkh5M2pXNjRoakt1cnd1R1hpZnREUEtJVUNJCldCTEhMaUVDY3pHOU9IUFh5Yml4\nN1VLd2w3Z2dFZFdqNjIxa2pWTFBKLzQKLS0tIFpXMDl3bitkaDJIYWxFaWdibSth\nRDI2SGpxVFdKSmIveFIvLzQrMEVySVEKUOwds3HEfKN1KtmkU1z3Q8c9RUkYH8my\n3+1vjanX2pgYhdEd4X0N6Yfn0JohFQWUbc2k426D0OSQQal3V38zqg==\n-----END AGE ENCRYPTED FILE-----\n"
},
{
"recipient": "age1tzlyex2z6t88tg9h82943e39shxhmqeyr7ywhlwpdjmyqsndv3qq27x0rf",
"enc": "-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSBhN3BzeFU0d1lOSTVLWWw0\nZ21POE53bXRtVVcxbEZyY1dVMUxrNDdvd1FZClZmdk1MQVhXMDk4Zm8ya01lWURx\nSGRBVEp3RmQ4MldVOFE3OWlnbXkzQmMKLS0tIG82MGg4TjhGV2cvbEJtak1QdkpG\nUnMwM215WWF2UU5aNlhsUHpOcGQ4RkkK5V2ldURjODktz57kq/Sw95HM5xt+qsZb\nkt51MqxZvVfOqBTtBljwv//7HofLj2i7igz8L2NC2xOMJfNLT+SCrw==\n-----END AGE ENCRYPTED FILE-----\n"
},
{
"recipient": "age18vq5ktwgeaysucvw9t67drqmg5zd5c5k3le34yqxckkfj7wqdqgsd4ejmt",
"enc": "-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSB2bjdDQXJYRythVTc0bDhD\ncWtWUmpBQXZrSW1PRThKS0VmK3A4V1FPa1JRCnhLVVFRT040bVdyZWVXRi80cS9j\nK2RvRjNWZ0ZieTh0WlA3aGxHZUdYWUkKLS0tIFI4TG84TVJnaXJEcDdYS29SZTRy\ndjYya1BmT0JhWVZQZ01FbTJsNGVvcnMK6yi1gBnKlXgEk/EzJUXMeXmsocDHF0ZM\n+yxXP/8YnBwpvArxO2hSNjjOrbU2NFfug4S1UAlYcBlK3Z7/tRAF9w==\n-----END AGE ENCRYPTED FILE-----\n"
}
],
"lastmodified": "2023-04-27T21:10:14Z",
"mac": "ENC[AES256_GCM,data:BDYk9zCsGrY5cAvJftQRrDBwG6fyZQ7Hz2BIjayUmLGVQvzPTQ3HPEbxAYBcgjlRLNNugFhmdrBtdQde7qLJ/0cNeyH5WLVqFFodq9gXCJ35u1XEhR4DOEIQTEaPUMNcjJ1reX5x+z5xBRsqBw1/ZfCneuxDWRii6HBZjMZXlPw=,iv:na/QcL2Ehpohr1qLR4Qtxw9lrY6UqxqV9Y823/TvhyI=,tag:ij8/74O2XmLtW8YnfWRb+A==,type:str]",
"pgp": null,
"unencrypted_suffix": "_unencrypted",
"version": "3.7.3"
}
}