Merge pull request #267129 from adisbladis/fetchpypilegacy-take-2

fetchPypiLegacy: init PyPi legacy API fetcher
This commit is contained in:
adisbladis 2024-02-17 21:52:25 +13:00 committed by GitHub
commit 4e62dd9ade
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 219 additions and 0 deletions

View File

@ -0,0 +1,45 @@
# Fetch from PyPi legacy API as documented in https://warehouse.pypa.io/api-reference/legacy.html
{ runCommand
, lib
, python3
}:
{
# package name
pname,
# Package index
url ? null,
# Multiple package indices to consider
urls ? [ ],
# filename including extension
file,
# SRI hash
hash,
# allow overriding the derivation name
name ? null,
}:
let
urls' = urls ++ lib.optional (url != null) url;
pathParts = lib.filter ({ prefix, path }: "NETRC" == prefix) builtins.nixPath;
netrc_file =
if (pathParts != [ ])
then (lib.head pathParts).path
else "";
in
# Assert that we have at least one URL
assert urls' != [ ]; runCommand file
({
nativeBuildInputs = [ python3 ];
impureEnvVars = lib.fetchers.proxyImpureEnvVars;
outputHashMode = "flat";
# if hash is empty select a default algo to let nix propose the actual hash.
outputHashAlgo = if hash == "" then "sha256" else null;
outputHash = hash;
NETRC = netrc_file;
}
// (lib.optionalAttrs (name != null) {inherit name;}))
''
python ${./fetch-legacy.py} ${lib.concatStringsSep " " (map (url: "--url ${lib.escapeShellArg url}") urls')} --pname ${pname} --filename ${file}
mv ${file} $out
''

View File

@ -0,0 +1,162 @@
# Some repositories (such as Devpi) expose the Pypi legacy API
# (https://warehouse.pypa.io/api-reference/legacy.html).
#
# Note it is not possible to use pip
# https://discuss.python.org/t/pip-download-just-the-source-packages-no-building-no-metadata-etc/4651/12
import base64
import argparse
import netrc
import os
import shutil
import ssl
import sys
import urllib.request
from html.parser import HTMLParser
from os.path import normpath
from typing import Optional
from urllib.parse import urlparse, urlunparse
# Parse the legacy index page to extract the href and package names
class Pep503(HTMLParser):
def __init__(self) -> None:
super().__init__()
self.sources: dict[str, str] = {}
self.url: Optional[str] = None
self.name: Optional[str] = None
def handle_data(self, data: str) -> None:
if self.url is not None:
self.name = data
def handle_starttag(self, tag: str, attrs: list[tuple[str, Optional[str]]]) -> None:
if tag == "a":
for name, value in attrs:
if name == "href":
self.url = value
def handle_endtag(self, tag: str) -> None:
if self.url is not None:
if not self.name:
raise ValueError("Name not set")
self.sources[self.name] = self.url
self.url = None
def try_fetch(url: str, package_name: str, package_filename: str) -> None:
index_url = url + "/" + package_name + "/"
# Parse username and password for this host from the netrc file if given.
username: Optional[str] = None
password: Optional[str] = None
if os.environ.get("NETRC", "") != "":
netrc_obj = netrc.netrc(os.environ["NETRC"])
host = urlparse(index_url).netloc
# Strip port number if present
if ":" in host:
host = host.split(":")[0]
authenticators = netrc_obj.authenticators(host)
if authenticators:
username, _, password = authenticators
print("Reading index %s" % index_url)
context = ssl.create_default_context()
# Extract out username/password from index_url, if present.
parsed_url = urlparse(index_url)
username = parsed_url.username or username
password = parsed_url.password or password
index_url = parsed_url._replace(netloc=parsed_url.netloc.rpartition("@")[-1]).geturl()
req = urllib.request.Request(index_url)
if username and password: # Add authentication
password_b64 = base64.b64encode(":".join((username, password)).encode()).decode("utf-8")
req.add_header("Authorization", "Basic {}".format(password_b64))
else: # If we are not using authentication disable TLS verification for long term reproducibility
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
response = urllib.request.urlopen(req, context=context)
index = response.read()
parser = Pep503()
parser.feed(str(index, "utf-8"))
if package_filename not in parser.sources:
print("The file %s has not be found in the index %s" % (package_filename, index_url))
exit(1)
package_file = open(package_filename, "wb")
# Sometimes the href is a relative or absolute path within the index's domain.
indicated_url = urlparse(parser.sources[package_filename])
if indicated_url.netloc == "":
parsed_url = urlparse(index_url)
if indicated_url.path.startswith("/"):
# An absolute path within the index's domain.
path = parser.sources[package_filename]
else:
# A relative path.
path = parsed_url.path + "/" + parser.sources[package_filename]
package_url = urlunparse(
(
parsed_url.scheme,
parsed_url.netloc,
path,
None,
None,
None,
)
)
else:
package_url = parser.sources[package_filename]
# Handle urls containing "../"
parsed_url = urlparse(package_url)
real_package_url = urlunparse(
(
parsed_url.scheme,
parsed_url.netloc,
normpath(parsed_url.path),
parsed_url.params,
parsed_url.query,
parsed_url.fragment,
)
)
print("Downloading %s" % real_package_url)
req = urllib.request.Request(real_package_url)
if username and password:
req.add_unredirected_header("Authorization", "Basic {}".format(password_b64))
response = urllib.request.urlopen(req, context=context)
with response as r:
shutil.copyfileobj(r, package_file)
argparser = argparse.ArgumentParser(description="Fetch file from legacy pypi API")
argparser.add_argument("--url", action="append", required=True)
argparser.add_argument("--pname", action="store", required=True)
argparser.add_argument("--filename", action="store", required=True)
if __name__ == "__main__":
args = argparser.parse_args()
for url in args.url:
try:
try_fetch(url, args.pname, args.filename)
except urllib.error.HTTPError as e:
print("Got exception'", e, "', trying next package index", file=sys.stderr)
continue
else:
break
else:
print(
f"Could not fetch package '{args.pname}' file '{args.filename}' from any mirrors: {args.url}",
file=sys.stderr,
)
exit(1)

View File

@ -0,0 +1,9 @@
{ testers, fetchPypiLegacy, ... }: {
# Tests that we can send custom headers with spaces in them
fetchSimple = testers.invalidateFetcherByDrvHash fetchPypiLegacy {
pname = "requests";
file = "requests-2.31.0.tar.gz";
url = "https://pypi.org/simple";
hash = "sha256-lCxadY+Y15Dq7Ropy27vx/+w0c968Fw9J5Flbb1q0eE=";
};
}

View File

@ -111,6 +111,7 @@ with pkgs;
fetchzip = callPackages ../build-support/fetchzip/tests.nix { };
fetchgit = callPackages ../build-support/fetchgit/tests.nix { };
fetchFirefoxAddon = callPackages ../build-support/fetchfirefoxaddon/tests.nix { };
fetchPypiLegacy = callPackages ../build-support/fetchpypilegacy/tests.nix { };
install-shell-files = callPackage ./install-shell-files {};

View File

@ -1248,6 +1248,8 @@ with pkgs;
fetchPypi = callPackage ../build-support/fetchpypi { };
fetchPypiLegacy = callPackage ../build-support/fetchpypilegacy { };
resolveMirrorURLs = {url}: fetchurl {
showURLs = true;
inherit url;