#!/usr/bin/env nix-shell
#!nix-shell update-octave-shell.nix -i python3
Update a Octave package expression by passing in the `.nix` file, or the directory containing it.
You can pass in multiple files or paths.
You'll likely want to use
$ ./update-octave-libraries ../../pkgs/development/octave-modules/**/default.nix
to update all non-pinned libraries in that folder.
import argparse
import os
import pathlib
import re
import requests
import yaml
from concurrent.futures import ThreadPoolExecutor as Pool
from packaging.version import Version as _Version
from packaging.version import InvalidVersion
from packaging.specifiers import SpecifierSet
import collections
import subprocess
import tempfile
INDEX = ""
"""url of Octave packages' source on GitHub"""
EXTENSIONS = ['tar.gz', 'tar.bz2', 'tar', 'zip']
"""Permitted file extensions. These are evaluated from left to right and the first occurance is returned."""
GIT = "git"
NIXPGKS_ROOT = subprocess.check_output(["git", "rev-parse", "--show-toplevel"]).decode('utf-8').strip()
import logging
class Version(_Version,
def __init__(self, version):
# We cannot use `str(Version(0.04.21))` because that becomes `0.4.21`
self.raw_version = version
def __getitem__(self, i):
return self._version.release[i]
def __len__(self):
return len(self._version.release)
def __iter__(self):
yield from self._version.release
def _get_values(attribute, text):
"""Match attribute in text and return all matches.
:returns: List of matches.
regex = '{}\s+=\s+"(.*)";'.format(attribute)
regex = re.compile(regex)
values = regex.findall(text)
return values
def _get_unique_value(attribute, text):
"""Match attribute in text and return unique match.
:returns: Single match.
values = _get_values(attribute, text)
n = len(values)
if n > 1:
raise ValueError("found too many values for {}".format(attribute))
elif n == 1:
return values[0]
raise ValueError("no value found for {}".format(attribute))
def _get_line_and_value(attribute, text):
"""Match attribute in text. Return the line and the value of the attribute."""
regex = '({}\s+=\s+"(.*)";)'.format(attribute)
regex = re.compile(regex)
value = regex.findall(text)
n = len(value)
if n > 1:
raise ValueError("found too many values for {}".format(attribute))
elif n == 1:
return value[0]
raise ValueError("no value found for {}".format(attribute))
def _replace_value(attribute, value, text):
"""Search and replace value of attribute in text."""
old_line, old_value = _get_line_and_value(attribute, text)
new_line = old_line.replace(old_value, value)
new_text = text.replace(old_line, new_line)
return new_text
def _fetch_page(url):
r = requests.get(url)
if r.status_code ==
return list(yaml.safe_load_all(r.content))[0]
raise ValueError("request for {} failed".format(url))
def _fetch_github(url):
headers = {}
token = os.environ.get('GITHUB_API_TOKEN')
if token:
headers["Authorization"] = f"token {token}"
r = requests.get(url, headers=headers)
if r.status_code ==
return r.json()
raise ValueError("request for {} failed".format(url))
'major' : 0,
'minor' : 1,
'patch' : 2,
def _determine_latest_version(current_version, target, versions):
"""Determine latest version, given `target`, returning the more recent version.
current_version = Version(current_version)
def _parse_versions(versions):
for v in versions:
yield Version(v)
except InvalidVersion:
versions = _parse_versions(versions)
index = SEMVER[target]
ceiling = list(current_version[0:index])
if len(ceiling) == 0:
ceiling = None
ceiling = Version(".".join(map(str, ceiling)))
# We do not want prereleases
versions = SpecifierSet(prereleases=PRERELEASES).filter(versions)
if ceiling is not None:
versions = SpecifierSet(f"<{ceiling}").filter(versions)
return (max(sorted(versions))).raw_version
def _get_latest_version_octave_packages(package, extension, current_version, target):
"""Get latest version and hash from Octave Packages."""
url = "{}/{}.yaml".format(INDEX, package)
yaml = _fetch_page(url)
versions = list(map(lambda pv: pv['id'], yaml['versions']))
version = _determine_latest_version(current_version, target, versions)
releases = [v if v['id'] == version else None for v in yaml['versions']]
except KeyError as e:
raise KeyError('Could not find version {} for {}'.format(version, package)) from e
for release in releases:
if release['url'].endswith(extension):
sha256 = release['sha256']
sha256 = None
return version, sha256, None
def _get_latest_version_github(package, extension, current_version, target):
def strip_prefix(tag):
return re.sub("^[^0-9]*", "", tag)
def get_prefix(string):
matches = re.findall(r"^([^0-9]*)", string)
return next(iter(matches), "")
# when invoked as an updateScript, UPDATE_NIX_ATTR_PATH will be set
# this allows us to work with packages which live outside of octave-modules
attr_path = os.environ.get("UPDATE_NIX_ATTR_PATH", f"octavePackages.{package}")
homepage = subprocess.check_output(
["nix", "eval", "-f", f"{NIXPGKS_ROOT}/default.nix", "--raw", f"{attr_path}.src.meta.homepage"])\
except Exception as e:
raise ValueError(f"Unable to determine homepage: {e}")
owner_repo = homepage[len(""):] # remove prefix
owner, repo = owner_repo.split("/")
url = f"{owner}/{repo}/releases"
all_releases = _fetch_github(url)
releases = list(filter(lambda x: not x['prerelease'], all_releases))
if len(releases) == 0:
raise ValueError(f"{homepage} does not contain any stable releases")
versions = map(lambda x: strip_prefix(x['tag_name']), releases)
version = _determine_latest_version(current_version, target, versions)
release = next(filter(lambda x: strip_prefix(x['tag_name']) == version, releases))
prefix = get_prefix(release['tag_name'])
sha256 = subprocess.check_output(["nix-prefetch-url", "--type", "sha256", "--unpack", f"{release['tarball_url']}"], stderr=subprocess.DEVNULL)\
# this may fail if they have both a branch and a tag of the same name, attempt tag name
tag_url = str(release['tarball_url']).replace("tarball","tarball/refs/tags")
sha256 = subprocess.check_output(["nix-prefetch-url", "--type", "sha256", "--unpack", tag_url], stderr=subprocess.DEVNULL)\
return version, sha256, prefix
def _get_latest_version_git(package, extension, current_version, target):
"""NOTE: Unimplemented!"""
# attr_path = os.environ.get("UPDATE_NIX_ATTR_PATH", f"octavePackages.{package}")
# try:
# download_url = subprocess.check_output(
# ["nix", "--extra-experimental-features", "nix-command", "eval", "-f", f"{NIXPGKS_ROOT}/default.nix", "--raw", f"{attr_path}.src.url"])\
# .decode('utf-8')
# except Exception as e:
# raise ValueError(f"Unable to determine download link: {e}")
# with tempfile.TemporaryDirectory(prefix=attr_path) as new_clone_location:
#["git", "clone", download_url, new_clone_location])
# newest_commit = subprocess.check_output(
# ["git" "rev-parse" "$(git branch -r)" "|" "tail" "-n" "1"]).decode('utf-8')
'fetchFromGitHub' : _get_latest_version_github,
'fetchurl' : _get_latest_version_octave_packages,
'fetchgit' : _get_latest_version_git,
def _determine_fetcher(text):
# Count occurrences of fetchers.
nfetchers = sum(text.count('src = {}'.format(fetcher)) for fetcher in FETCHERS.keys())
if nfetchers == 0:
raise ValueError("no fetcher.")
elif nfetchers > 1:
raise ValueError("multiple fetchers.")
# Then we check which fetcher to use.
for fetcher in FETCHERS.keys():
if 'src = {}'.format(fetcher) in text:
return fetcher
def _determine_extension(text, fetcher):
"""Determine what extension is used in the expression.
If we use:
- fetchPypi, we check if format is specified.
- fetchurl, we determine the extension from the url.
- fetchFromGitHub we simply use `.tar.gz`.
if fetcher == 'fetchurl':
url = _get_unique_value('url', text)
extension = os.path.splitext(url)[1]
elif fetcher == 'fetchFromGitHub' or fetcher == 'fetchgit':
if "fetchSubmodules" in text:
raise ValueError("fetchFromGitHub fetcher doesn't support submodules")
extension = "tar.gz"
return extension
def _update_package(path, target):
# Read the expression
with open(path, 'r') as f:
text =
# Determine pname. Many files have more than one pname
pnames = _get_values('pname', text)
# Determine version.
version = _get_unique_value('version', text)
# First we check how many fetchers are mentioned.
fetcher = _determine_fetcher(text)
extension = _determine_extension(text, fetcher)
# Attempt a fetch using each pname, e.g. backports-zoneinfo vs backports.zoneinfo
successful_fetch = False
for pname in pnames:
if fetcher == "fetchgit":
logging.warning(f"You must update {pname} MANUALLY!")
return { 'path': path, 'target': target, 'pname': pname,
'old_version': version, 'new_version': version }
new_version, new_sha256, prefix = FETCHERS[fetcher](pname, extension, version, target)
successful_fetch = True
except ValueError:
if not successful_fetch:
raise ValueError(f"Unable to find correct package using these pnames: {pnames}")
if new_version == version:"Path {}: no update available for {}.".format(path, pname))
return False
elif Version(new_version) <= Version(version):
raise ValueError("downgrade for {}.".format(pname))
if not new_sha256:
raise ValueError("no file available for {}.".format(pname))
text = _replace_value('version', new_version, text)
# hashes from pypi are 16-bit encoded sha256's, normalize it to sri to avoid merge conflicts
# sri hashes have been the default format since nix 2.4+
sri_hash = subprocess.check_output(["nix", "--extra-experimental-features", "nix-command", "hash", "to-sri", "--type", "sha256", new_sha256]).decode('utf-8').strip()
# fetchers can specify a sha256, or a sri hash
text = _replace_value('sha256', sri_hash, text)
except ValueError:
text = _replace_value('hash', sri_hash, text)
if fetcher == 'fetchFromGitHub':
# in the case of fetchFromGitHub, it's common to see `rev = version;` or `rev = "v${version}";`
# in which no string value is meant to be substituted. However, we can just overwrite the previous value.
regex = '(rev\s+=\s+[^;]*;)'
regex = re.compile(regex)
matches = regex.findall(text)
n = len(matches)
if n == 0:
raise ValueError("Unable to find rev value for {}.".format(pname))
# forcefully rewrite rev, incase tagging conventions changed for a release
match = matches[0]
text = text.replace(match, f'rev = "refs/tags/{prefix}${{version}}";')
# incase there's no prefix, just rewrite without interpolation
text = text.replace('"${version}";', 'version;')
with open(path, 'w') as f:
f.write(text)"Path {}: updated {} from {} to {}".format(path, pname, version, new_version))
result = {
'path' : path,
'target': target,
'pname': pname,
'old_version' : version,
'new_version' : new_version,
#'fetcher' : fetcher,
return result
def _update(path, target):
# We need to read and modify a Nix expression.
if os.path.isdir(path):
path = os.path.join(path, 'default.nix')
# If a default.nix does not exist, we quit.
if not os.path.isfile(path):"Path {}: does not exist.".format(path))
return False
# If file is not a Nix expression, we quit.
if not path.endswith(".nix"):"Path {}: does not end with `.nix`.".format(path))
return False
return _update_package(path, target)
except ValueError as e:
logging.warning("Path {}: {}".format(path, e))
return False
def _commit(path, pname, old_version, new_version, pkgs_prefix="octave: ", **kwargs):
"""Commit result.
msg = f'{pkgs_prefix}{pname}: {old_version} -> {new_version}'
subprocess.check_call([GIT, 'add', path])
subprocess.check_call([GIT, 'commit', '-m', msg])
except subprocess.CalledProcessError as e:
subprocess.check_call([GIT, 'checkout', path])
raise subprocess.CalledProcessError(f'Could not commit {path}') from e
return True
def main():
epilog = """
environment variables:
GITHUB_API_TOKEN\tGitHub API token used when updating github packages
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, epilog=epilog)
parser.add_argument('package', type=str, nargs='+')
parser.add_argument('--target', type=str, choices=SEMVER.keys(), default='major')
parser.add_argument('--commit', action='store_true', help='Create a commit for each package update')
parser.add_argument('--use-pkgs-prefix', action='store_true', help='Use octavePackages.${pname}: instead of octave: ${pname}: when making commits')
args = parser.parse_args()
target =
packages = list(map(os.path.abspath, args.package))"Updating packages...")
# Use threads to update packages concurrently
with Pool() as p:
results = list(filter(bool, pkg: _update(pkg, target), packages)))"Finished updating packages.")
commit_options = {}
if args.use_pkgs_prefix:"Using octavePackages. prefix for commits")
commit_options["pkgs_prefix"] = "octavePackages."
# Commits are created sequentially.
if args.commit:"Committing updates...")
# list forces evaluation
list(map(lambda x: _commit(**x, **commit_options), results))"Finished committing updates")
count = len(results)"{} package(s) updated".format(count))
if __name__ == '__main__':