202 lines
7.2 KiB
Python
202 lines
7.2 KiB
Python
import smtplib
|
|
import imaplib
|
|
import imap_tools
|
|
import time
|
|
import ssl
|
|
import argparse
|
|
import uuid
|
|
import requests
|
|
from typing import NamedTuple
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("host", type=str)
|
|
parser.add_argument("--mailfrom", default="foo@example.com")
|
|
parser.add_argument("--rcptto", default="awesome@vacu.store")
|
|
parser.add_argument("--subject", default="Some test message")
|
|
parser.add_argument("--header", action="append", default=[])
|
|
parser.add_argument("--submission", default=False, action="store_true")
|
|
parser.add_argument("--smtp-starttls", default=None, action="store_true")
|
|
parser.add_argument("--imap-insecure", default=False, action="store_true")
|
|
parser.add_argument("--imap-move-to")
|
|
parser.add_argument("--imap-dir", default=None)
|
|
parser.add_argument("--username")
|
|
parser.add_argument("--password")
|
|
parser.add_argument(
|
|
"--expect-refused",
|
|
dest="expect",
|
|
action="store_const",
|
|
const="refused",
|
|
default="received",
|
|
)
|
|
parser.add_argument("--expect-flag", action="append", default=[])
|
|
parser.add_argument("--expect-sent", dest="expect", action="store_const", const="sent")
|
|
parser.add_argument(
|
|
"--expect-imap-error", dest="expect", action="store_const", const="imap_error"
|
|
)
|
|
parser.add_argument(
|
|
"--expect-mailpit-received",
|
|
dest="expect",
|
|
action="store_const",
|
|
const="mailpit_received",
|
|
)
|
|
parser.add_argument(
|
|
"--expect-mailpit-not-received",
|
|
dest="expect",
|
|
action="store_const",
|
|
const="mailpit_not_received",
|
|
)
|
|
parser.add_argument("--mailpit-url")
|
|
|
|
args = parser.parse_args()
|
|
|
|
print(f"got args {args!r}")
|
|
|
|
args.header.append(f"Subject: {args.subject}")
|
|
|
|
# smtp_starttls = args.smtp_starttls
|
|
# if smtp_starttls is None:
|
|
# smtp_starttls = args.submission
|
|
|
|
username = args.username
|
|
password = args.password
|
|
if password is None:
|
|
password = username
|
|
|
|
if (username is None or password is None) and (
|
|
args.submission or args.expect == "received"
|
|
):
|
|
assert False, "Bad args"
|
|
|
|
if args.expect.startswith("mailpit_") and args.mailpit_url is None:
|
|
assert False, "Bad args"
|
|
|
|
msg_magic = str(uuid.uuid4())
|
|
|
|
|
|
def mk_ctx():
|
|
ctx = ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
return ctx
|
|
|
|
|
|
try:
|
|
smtp = None
|
|
if args.submission:
|
|
smtp = smtplib.SMTP_SSL(args.host, port=465, context=mk_ctx())
|
|
else:
|
|
smtp = smtplib.SMTP(args.host, port=25)
|
|
smtp.ehlo()
|
|
if args.smtp_starttls:
|
|
smtp.starttls(context=mk_ctx())
|
|
smtp.ehlo()
|
|
if args.submission:
|
|
smtp.login(username, password)
|
|
headers = "\n".join(args.header)
|
|
smtp.sendmail(args.mailfrom, args.rcptto, f"{headers}\n\n{msg_magic}")
|
|
smtp.close()
|
|
except smtplib.SMTPRecipientsRefused:
|
|
assert args.expect == "refused", "Server rejected message as recipients refused"
|
|
except smtplib.SMTPSenderRefused:
|
|
assert args.expect == "refused", "Server rejected message as recipients refused"
|
|
else:
|
|
assert (
|
|
not args.expect == "refused"
|
|
), "Server was supposed to reject this message, but it didn't"
|
|
|
|
if args.mailpit_url is not None:
|
|
time.sleep(3)
|
|
mails = requests.get(args.mailpit_url + "/api/v1/messages").json()
|
|
found_message = False
|
|
for message_data in mails["messages"]:
|
|
if msg_magic in message_data["Snippet"]:
|
|
found_message = True
|
|
break
|
|
if args.expect == "mailpit_received":
|
|
assert found_message, "Message not received by mailpit server"
|
|
else:
|
|
assert (
|
|
not found_message
|
|
), "Message was received by the mailpit server when it wasn't supposed to be"
|
|
|
|
MessageInFolder = NamedTuple(
|
|
"MessageInFolder", [("message", imap_tools.message.MailMessage), ("folder", str)]
|
|
)
|
|
|
|
if args.expect == "received" or args.expect == "imap_error":
|
|
time.sleep(5)
|
|
print(f"looking for {msg_magic}")
|
|
try:
|
|
|
|
def connection() -> imap_tools.MailBox:
|
|
return imap_tools.MailBox(args.host, ssl_context=mk_ctx()).login(
|
|
username, password
|
|
)
|
|
|
|
def find_messages(mailbox: imap_tools.MailBox) -> list[MessageInFolder]:
|
|
matching_messages = []
|
|
directories = []
|
|
for d in mailbox.folder.list():
|
|
if "\\Noselect" not in d.flags:
|
|
directories.append(d.name)
|
|
# print(f"directories is {directories!r}")
|
|
for imap_dir in directories:
|
|
print(f"checking in {imap_dir!r}")
|
|
mailbox.folder.set(imap_dir)
|
|
# except imap_tools.errors.MailboxFolderSelectError as e:
|
|
# # print(f"failed to select folder {e!r}")
|
|
# continue
|
|
for msg in mailbox.fetch(mark_seen=False):
|
|
if "\\Deleted" in msg.flags:
|
|
continue
|
|
# print(f"found message {msg.uid!r} with text {msg.text!r} in folder {imap_dir!r}")
|
|
msg_str = msg.obj.as_string()
|
|
print(f"flags: {msg.flags!r}")
|
|
print(f"{msg_str}")
|
|
if msg_magic == msg.text.strip():
|
|
in_folder = MessageInFolder(message=msg, folder=imap_dir)
|
|
matching_messages.append(in_folder)
|
|
return matching_messages
|
|
|
|
if args.imap_move_to is not None:
|
|
with connection() as mailbox:
|
|
print("prefind")
|
|
prefind = find_messages(mailbox)
|
|
assert len(prefind) > 0, "Could not find message to move anywhere"
|
|
assert len(prefind) == 1, "Found duplicate messages"
|
|
mailbox.folder.set(prefind[0].folder)
|
|
msg = prefind[0].message
|
|
print(f"about to move {msg.uid} to {args.imap_move_to}")
|
|
res = mailbox.move(msg.uid, args.imap_move_to)
|
|
assert res[1][1][1] is not None, "failed to move"
|
|
print(f"done moving, res {res!r}")
|
|
with connection() as mailbox:
|
|
matching_messages = find_messages(mailbox)
|
|
if args.expect == "received":
|
|
# print(f"{matching_messages!r}")
|
|
assert (
|
|
len(matching_messages) > 0
|
|
), "Could not find the message in the mailbox"
|
|
assert (
|
|
len(matching_messages) == 1
|
|
), f"Multiple messages matching message magic {msg_magic}"
|
|
matching_mif = matching_messages[0]
|
|
if args.imap_dir is not None:
|
|
expected_dir = args.imap_dir
|
|
actual_dir = matching_mif.folder
|
|
assert (
|
|
expected_dir == actual_dir
|
|
), f"Expected to find message in {expected_dir}, found it in {actual_dir} instead"
|
|
matching_message = matching_mif.message
|
|
for expected_flag in args.expect_flag:
|
|
assert (
|
|
expected_flag in matching_message.flags
|
|
), f"Flag {expected_flag} not found, message flags: {matching_message.flags!r}"
|
|
|
|
except imaplib.IMAP4.error as e:
|
|
assert args.expect == "imap_error", f"IMAP error: {e}"
|
|
else:
|
|
assert (
|
|
not args.expect == "imap_error"
|
|
), "Expected an IMAP error, but didn't get one"
|