Migrate checkers from linmobapps repo

This commit is contained in:
Dark Dragon 2023-03-15 23:35:13 +01:00
parent 2a40994e83
commit ec9a46ccf5
8 changed files with 715 additions and 0 deletions

.gitignore vendored
View File

@ -6,9 +6,16 @@
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
# Python cache
# Files generated by git
# These are backup files generated by rustfmt
# Files downloaded in CI

View File

@ -3,6 +3,10 @@ variables:
APPS_REF: master
- .cache/pip
stage: build
image: python:3
@ -23,6 +27,50 @@ build:
- content/games
- download
stage: test
- build
image: python
- cd ${CI_PROJECT_DIR}/checkers
- python3 -m pip install -r requirements.txt
- python3 ./check_links.py check ${CI_PROJECT_DIR}/content/apps
when: manual
stage: test
- build
image: python
- cd ${CI_PROJECT_DIR}/checkers
- python3 -m pip install -r requirements.txt
- python3 ./check_via_appstream.py check ${CI_PROJECT_DIR}/content/apps
when: manual
stage: test
- build
image: python
- cd ${CI_PROJECT_DIR}/checkers
- python3 -m pip install -r requirements.txt
- python3 ./check_via_git.py check ${CI_PROJECT_DIR}/content/apps
when: manual
stage: test
- build
image: python
- cd ${CI_PROJECT_DIR}/checkers
- python3 -m pip install -r requirements.txt
- python3 ./check_via_heuristics.py check ${CI_PROJECT_DIR}/content/apps
when: manual
stage: deploy

checkers/check_links.py Executable file
View File

@ -0,0 +1,158 @@
from itertools import chain
import asyncio
import datetime
import io
import pathlib
import sys
import urllib.parse
import aiofiles
import frontmatter
import httpx
from PIL import Image
import utils
def is_link(url):
return url.strip().startswith("http")
async def check_link_reachable(client, url):
response = await client.head(url, follow_redirects=False)
code = response.status_code
reason = response.reason_phrase.capitalize()
if code == 200:
return None
elif code == 301 or code == 308:
location = this_location = urllib.parse.urljoin(url, response.headers["Location"])
message = f"Permanent redirect to {location}"
# recursive resolution
if (result := await check_link_reachable(client, location)) is not None: # reachable
if result[0] is not None: # redirect url
location, message = result
message += f" via {this_location}"
return (location, message)
elif code == 302 or code == 307:
location = urllib.parse.urljoin(url, response.headers["Location"])
return (None, f"Temporary redirect ({code}) to {location}")
return (None, f"{reason} ({code})")
except Exception as e:
return (None, f"Error occurred checking {url}: {e}")
async def check_link_https(client, url):
if url.startswith("https"):
return None
suggestion = url.replace("http", "https")
if await check_link_reachable(client, suggestion) is not None: # not reachable
suggestion = None
return (suggestion, "Not HTTPS!")
async def check_field(client, field):
links = field
if not isinstance(links, list):
links = [links]
links = [link for link in links if is_link(link)]
checkers = [check_link_reachable, check_link_https]
for i, _ in enumerate(links):
for checker in checkers:
result = await checker(client, links[i])
if not result: # reachable
found_url, result_message = result
print(f"{links[i]}: {result_message}", file=sys.stderr)
if found_url:
links[i] = found_url
return links if not isinstance(field, list) or len(links) < 1 else links[0]
async def check_screenshot(client, image_url):
async with client.stream("GET", image_url) as response:
content_type = response.headers["content-type"]
if not content_type.startswith("image/"):
return f'{image_url}: Invalid image type "{content_type}"'
image = Image.open(io.BytesIO(await response.aread()))
width, height = image.size
if width > height:
return f"{image_url}: Landscape image with dimensions {width}x{height} detected but expecting mobile images to be portrait ones"
return None
except Exception as e:
return f"{image_url}: Error occurred: {e}"
# check links for error
async def check(client, item, update=False, item_root=None):
if item_root is None:
item_root = item
found = False
for key in item:
if isinstance(item[key], dict):
found |= await check(client, item[key], update, item_root)
field = await check_field(client, item[key])
if field != item[key]: # found
found = True
if update:
item[key] = field
utils.set_recursive(item_root, "updated", str(datetime.date.today()))
utils.set_recursive(item_root, "extra.updated_by", "script")
screenshots = chain(filter(None, utils.get_recursive(item, "extra.screenshots", [])), filter(None, utils.get_recursive(item, "extra.screenshots_img", [])))
for screenshot in screenshots:
screenshot_error = await check_screenshot(client, screenshot)
if screenshot_error is not None:
print(screenshot_error, file=sys.stderr)
return found
async def check_file(client, filename, update=False):
async with aiofiles.open(filename, mode="r", encoding="utf-8") as f:
doc = frontmatter.loads(await f.read())
found = await check(client, doc.metadata, update)
if found and update:
print(f"Writing changes to {filename}")
async with aiofiles.open(filename, mode="w", encoding="utf-8") as f:
await f.write(frontmatter.dumps(doc, handler=frontmatter.default_handlers.TOMLHandler()))
return found
async def run(folder, update=False):
async with httpx.AsyncClient(timeout=30.0) as client:
tasks = []
for filename in folder.glob("**/*.md"):
if filename.name == "_index.md":
tasks.append(asyncio.ensure_future(check_file(client, filename, update)))
found = any(await asyncio.gather(*tasks))
return found
async def main():
if len(sys.argv) < 3:
print(f"Syntax: {sys.argv[0]} check|fix FOLDER")
update = sys.argv[1] == "fix"
apps_folder = pathlib.Path(sys.argv[2])
found = await run(apps_folder, update)
if found and not update:
print(f'Errors found! Run "{sys.argv[0]} fix {apps_folder}" to apply suggested changes.', file=sys.stderr)
if __name__ == "__main__":

checkers/check_via_appstream.py Executable file
View File

@ -0,0 +1,171 @@
import asyncio
import datetime
import pathlib
import sys
import traceback
import aiofiles
import appstream_python
import frontmatter
import httpx
import markdownify
import utils
async def load_appstream(client, url):
if not url:
return None
app = appstream_python.AppstreamComponent()
response = await client.get(url)
if response.status_code != httpx.codes.OK:
print(f"Error loading {url}", file=sys.stderr)
return None
app.load_bytes(response.content, encoding=response.encoding)
except Exception as e:
print(f"Error loading {url}:", file=sys.stderr)
traceback.print_exception(e, file=sys.stderr)
return None
return app
def get_appstream_app_id(app):
return app.id
def get_appstream_name(app):
return app.name.get_default_text()
def get_appstream_categories(app):
return app.categories
def get_appstream_app_author(app):
return app.developer_name.get_default_text().split(",")
def get_appstream_metadata_licenses(app):
return app.metadata_license.split("AND")
def get_appstream_project_licenses(app):
return app.project_license.split("AND")
def get_appstream_summary(app):
return app.summary.get_default_text()
def get_appstream_description(app):
return markdownify.markdownify(app.description.to_html(lang=None), heading_style="ATX")
def get_appstream_screenshots(app):
return [screenshot.get_source_image().url.strip() for screenshot in app.screenshots if screenshot.get_source_image() is not None]
# possible URL types: https://www.freedesktop.org/software/appstream/docs/chap-Metadata.html#tag-url
def get_appstream_url(app, url_type):
return app.urls.get(url_type, "") or "" # filter out empty <url type="TYPE"></url>
async def check(client, item, update=False):
item_name = utils.get_recursive(item, "extra.app_id") or utils.get_recursive(item, "title", "")
app = await load_appstream(client, utils.get_recursive(item, "extra.appstream_xml_url"))
if not app:
return False
properties = [
{"apps_key": "extra.homepage", "handler": lambda app: get_appstream_url(app, "homepage")},
{"apps_key": "extra.bugtracker", "handler": lambda app: get_appstream_url(app, "bugtracker")},
{"apps_key": "extra.donations", "handler": lambda app: get_appstream_url(app, "donation")},
{"apps_key": "extra.translations", "handler": lambda app: get_appstream_url(app, "translate")},
{"apps_key": "extra.repository", "handler": lambda app: get_appstream_url(app, "vcs-bitemser")},
{"apps_key": "extra.app_id", "handler": get_appstream_app_id},
{"apps_key": "title", "handler": get_appstream_name},
{"apps_key": "taxonomies.categories", "handler": get_appstream_categories},
{"apps_key": "taxonomies.app_author", "handler": get_appstream_app_author},
{"apps_key": "taxonomies.metadata_licenses", "handler": get_appstream_metadata_licenses},
{"apps_key": "taxonomies.project_licenses", "handler": get_appstream_project_licenses},
{"apps_key": "description", "handler": get_appstream_summary},
# {"apps_key": "description", "handler": get_appstream_description},
{"apps_key": "extra.screenshots", "handler": get_appstream_screenshots},
found = False
for property in properties:
found_entry = property["handler"](app)
except Exception as e:
print(f'{item_name}: Error handling {property["apps_key"]}:', file=sys.stderr)
traceback.print_exception(e, file=sys.stderr)
if utils.get_recursive(item, property["apps_key"]) and not found_entry:
print(f'{item_name}: {property["apps_key"]} missing in upstream AppStream file. Consider contributing it upstream: {utils.get_recursive(item, property["apps_key"])}', file=sys.stderr)
if not found_entry or found_entry == utils.get_recursive(item, property["apps_key"]):
continue # already up to date
message = f'{item_name}: {property["apps_key"]} '
if not utils.get_recursive(item, property["apps_key"]):
message += "new: "
message += f'outdated "{utils.get_recursive(item, property["apps_key"])}" -> '
message += f'"{found_entry}"'
print(message, file=sys.stderr)
found = True
if update:
utils.set_recursive(item, property["apps_key"], found_entry)
if utils.get_recursive(item, source_column := (property["apps_key"] + "_source")):
if utils.get_recursive(item, source_column) != utils.get_recursive(item, "extra.appstream_xml_url"):
print(f'{item_name}: {source_column} {utils.get_recursive(item, source_column)} -> {utils.get_recursive(item, "extra.appstream_xml_url")}', file=sys.stderr)
utils.set_recursive(item, source_column, utils.get_recursive(item, "extra.appstream_xml_url"))
utils.set_recursive(item, "updated", str(datetime.date.today()))
utils.set_recursive(item, "extra.updated_by", "script")
return found
async def check_file(client, filename, update=False):
async with aiofiles.open(filename, mode="r", encoding="utf-8") as f:
doc = frontmatter.loads(await f.read())
found = await check(client, doc.metadata, update)
if found and update:
print(f"Writing changes to {filename}")
async with aiofiles.open(filename, mode="w", encoding="utf-8") as f:
await f.write(frontmatter.dumps(doc, handler=frontmatter.default_handlers.TOMLHandler()))
return found
async def run(folder, update=False):
async with httpx.AsyncClient(timeout=30.0) as client:
tasks = []
for filename in folder.glob("**/*.md"):
if filename.name == "_index.md":
tasks.append(asyncio.ensure_future(check_file(client, filename, update)))
found = any(await asyncio.gather(*tasks))
return found
async def main():
if len(sys.argv) < 3:
print(f"Syntax: {sys.argv[0]} check|fix FOLDER")
update = sys.argv[1] == "fix"
apps_folder = pathlib.Path(sys.argv[2])
found = await run(apps_folder, update)
if found and not update:
print(f'Errors found! Run "{sys.argv[0]} fix {apps_folder}" to apply suggested changes.', file=sys.stderr)
if __name__ == "__main__":

checkers/check_via_git.py Executable file
View File

@ -0,0 +1,140 @@
import asyncio
import pathlib
import re
import sys
import tempfile
import aiofiles
import appstream_python
import frontmatter
import git
import httpx
import utils
def is_sourcehut(url):
return re.match(r"^https?://(?:[^.]+\.)?sr.ht/", url) is not None
def is_qt(url):
return re.match(r"^https?://code.qt.io/", url) is not None
def get_repository_url(repo):
if is_sourcehut(repo):
return re.sub(r"^https?://(?:[^.]+\.)?sr\.ht/(.*)$", r"https://git.sr.ht/\g<1>", repo).lower(), None
if is_qt(repo):
return re.sub(r"^https?://code\.qt\.io/(?:cgit/)?(.*\.git).*$", r"https://code.qt.io/\g<1>", repo), None
branch = None
if (m := re.search(r"(?:/-)?/tree/(?P<branch>.*)$", repo)) is not None: # Github, Gitlab
repo = repo[: m.start()]
branch = m.groupdict().get("branch")
if repo[-1] == "/":
repo = repo[:-1]
return re.match(r"^(?:[^.]*(?:\.(?!git))?)*", repo)[0] + ".git", branch
async def find_appstream_xml(repo_url, branch=None, repo_path="repo"):
kwargs = {}
if branch is not None:
kwargs["branch"] = branch
repo = git.Repo.clone_from(repo_url, repo_path, multi_options=["--depth 1"], **kwargs)
def pred(x, y):
if x.type != "blob":
return False
name = pathlib.Path(x.path).name
if "metainfo.xml" not in name and "appdata.xml" not in name:
return False
return True
return [x.path for x in repo.head.commit.tree.traverse(pred)]
except Exception as e:
print(f"Error checking git repository {repo_url} for AppStream file: {e}", file=sys.stderr)
return None
async def load_appstream(file):
if not file:
return None
app = appstream_python.AppstreamComponent()
async with aiofiles.open(file, mode="rb") as f:
app.load_bytes(await f.read())
except Exception as e:
print(f"Error loading {file}: {e}", file=sys.stderr)
return None
return app
async def check(client, item, update=False):
item_name = utils.get_recursive(item, "extra.app_id") or utils.get_recursive(item, "title", "")
if utils.get_recursive(item, "extra.appstream_xml_url", "").strip():
return False
repo, branch = get_repository_url(utils.get_recursive(item, "extra.repository", ""))
if not repo:
print(f"No repository specified for {item_name}", file=sys.stderr)
return False
print(f"No AppStream url found for {item_name}, checking {repo}, {branch=}", file=sys.stderr)
async with aiofiles.tempfile.TemporaryDirectory() as tmpdir:
repo_path = pathlib.Path(tmpdir) / "repo"
appstream_xml_files = await find_appstream_xml(repo, branch, repo_path)
appstream_xml_files = [appstream_xml_file for appstream_xml_file in (appstream_xml_files or []) if await load_appstream(repo_path / appstream_xml_file)]
if appstream_xml_files:
print(f"Found AppStream metainfo files for {item_name} with {repo} at paths {', '.join(appstream_xml_files)}")
return True
return False
async def check_file(client, filename, update=False):
async with aiofiles.open(filename, mode="r", encoding="utf-8") as f:
doc = frontmatter.loads(await f.read())
found = await check(client, doc.metadata, update)
if found and update:
print(f"Writing changes to {filename}")
async with aiofiles.open(filename, mode="w", encoding="utf-8") as f:
await f.write(frontmatter.dumps(doc, handler=frontmatter.default_handlers.TOMLHandler()))
return found
async def run(folder, update=False):
async with httpx.AsyncClient(timeout=30.0) as client:
tasks = []
for filename in folder.glob("**/*.md"):
if filename.name == "_index.md":
tasks.append(asyncio.ensure_future(check_file(client, filename, update)))
found = any(await asyncio.gather(*tasks))
return found
async def main():
if len(sys.argv) < 3:
print(f"Syntax: {sys.argv[0]} check|fix FOLDER")
update = sys.argv[1] == "fix"
apps_folder = pathlib.Path(sys.argv[2])
found = await run(apps_folder, update)
if found and not update:
print(f'Errors found! Run "{sys.argv[0]} fix {apps_folder}" to apply suggested changes.', file=sys.stderr)
if __name__ == "__main__":

checkers/check_via_heuristics.py Executable file
View File

@ -0,0 +1,155 @@
import asyncio
import datetime
import pathlib
import re
import sys
import traceback
import aiofiles
import frontmatter
import httpx
import utils
def get_app_author(item):
repo_url = utils.get_recursive(item, "extra.repository", "")
if repo_url.startswith("https://code.qt.io/"):
return "Qt Company"
elif repo_url.startswith("https://invent.kde.org/plasma-mobile/"):
return "Plasma Mobile Developers"
elif repo_url.startswith("https://invent.kde.org/"):
return "KDE Community"
elif repo_url.startswith("https://gitlab.com/postmarketOS/"):
return "postmarketOS Developers"
elif repo_url.startswith("https://gitlab.gnome.org/"):
return "GNOME Developers"
return repo_url.split("/")[-2].lower().replace("~", "").replace("_", "-")
def is_github_or_gitea(s):
github_urls = [
return any(s.startswith(url) for url in github_urls)
def is_gitlab(s):
gitlab_urls = [
return any(s.startswith(url) for url in gitlab_urls)
def is_sourcehut(s):
return re.match(r"^https?://(?:[^.]+\.)?sr.ht/", s) is not None
def is_flathub(s):
return s.startswith("https://flathub.org")
def get_bugtracker(item):
repo_url = utils.get_recursive(item, "extra.repository", "")
if repo_url[-1] == "/":
repo_url = repo_url[:-1]
if is_github_or_gitea(repo_url):
return repo_url + "/issues/"
elif is_gitlab(repo_url):
return repo_url + "/-/issues/"
elif is_sourcehut(repo_url):
return re.sub(r"^https?://(?:[^.]+\.)?sr\.ht/(.*)$", r"https://todo.sr.ht/\g<1>", repo_url).lower()
raise Exception(f"Could not determine bugtracker based on repository {repo_url}")
def get_flathub(item):
flatpak = utils.get_recursive(item, "extra.flatpak", "")
return flatpak if is_flathub(flatpak) else ""
async def check(client, item, update=False):
item_name = utils.get_recursive(item, "extra.app_id") or utils.get_recursive(item, "title", "")
properties = [
{"apps_key": "taxonomies.app_author", "handler": get_app_author},
{"apps_key": "extra.bugtracker", "handler": get_bugtracker},
{"apps_key": "extra.flathub", "handler": get_flathub},
found = False
for property in properties:
if utils.get_recursive(item, property["apps_key"]):
continue # ignore non-empty fields
found_entry = property["handler"](item).strip()
except Exception as e:
print(f'{item_name}: Error handling {property["apps_key"]}: {e}', file=sys.stderr)
# traceback.print_exception(e, file=sys.stderr)
if not found_entry:
continue # ignore empty result
found = True
print(f'{item_name}: {property["apps_key"]}: {found_entry}', file=sys.stderr)
if update:
utils.set_recursive(item, property["apps_key"], found_entry)
utils.set_recursive(item, "updated", str(datetime.date.today()))
utils.set_recursive(item, "extra.updated_by", "script")
return found
async def check_file(client, filename, update=False):
async with aiofiles.open(filename, mode="r", encoding="utf-8") as f:
doc = frontmatter.loads(await f.read())
found = await check(client, doc.metadata, update)
if found and update:
print(f"Writing changes to {filename}")
async with aiofiles.open(filename, mode="w", encoding="utf-8") as f:
await f.write(frontmatter.dumps(doc, handler=frontmatter.default_handlers.TOMLHandler()))
return found
async def run(folder, update=False):
async with httpx.AsyncClient(timeout=30.0) as client:
tasks = []
for filename in folder.glob("**/*.md"):
if filename.name == "_index.md":
tasks.append(asyncio.ensure_future(check_file(client, filename, update)))
found = any(await asyncio.gather(*tasks))
return found
async def main():
if len(sys.argv) < 3:
print(f"Syntax: {sys.argv[0]} check|fix FOLDER")
update = sys.argv[1] == "fix"
apps_folder = pathlib.Path(sys.argv[2])
found = await run(apps_folder, update)
if found and not update:
print(f'Errors found! Run "{sys.argv[0]} fix {apps_folder}" to apply suggested changes.', file=sys.stderr)
if __name__ == "__main__":

View File

@ -0,0 +1,8 @@

checkers/utils.py Normal file
View File

@ -0,0 +1,28 @@
def get_recursive(obj, path, default=None):
if isinstance(path, str):
path = path.split(".")
if len(path) == 0:
return obj
key, *path = path
if key not in obj:
return default
return get_recursive(obj[key], path, default)
def set_recursive(obj, path, value):
if isinstance(path, str):
path = path.split(".")
if len(path) == 0:
raise KeyError("Empty path")
key, *path = path
if len(path) > 0:
if key not in obj:
obj[key] = {}
set_recursive(obj[key], path, value)
obj[key] = value