import json import re import subprocess import urllib.error import urllib.request import xml.etree.ElementTree as ET from pathlib import Path from typing import Optional, Tuple from urllib.parse import urljoin, urlparse from .error import NurError from .manifest import LockedVersion, Repo, RepoType def fetch_commit_from_feed(url: str) -> str: req = urllib.request.urlopen(url) try: xml = req.read() root = ET.fromstring(xml) ns = "{http://www.w3.org/2005/Atom}" xpath = f"./{ns}entry/{ns}link" commit_link = root.find(xpath) if commit_link is None: raise NurError(f"No commits found in repository feed {url}") return Path(urlparse(commit_link.attrib["href"]).path).parts[-1] except urllib.error.HTTPError as e: if e.code == 404: raise NurError(f"Repository feed {url} not found") raise def nix_prefetch_zip(url: str) -> Tuple[str, Path]: data = subprocess.check_output( ["nix-prefetch-url", "--name", "source", "--unpack", "--print-path", url] ) sha256, path = data.decode().strip().split("\n") return sha256, Path(path) class GithubRepo: def __init__(self, owner: str, name: str) -> None: self.owner = owner self.name = name def url(self, path: str) -> str: return urljoin(f"https://github.com/{self.owner}/{self.name}/", path) def latest_commit(self) -> str: return fetch_commit_from_feed(self.url("commits/master.atom")) def prefetch(self, ref: str) -> Tuple[str, Path]: return nix_prefetch_zip(self.url(f"archive/{ref}.tar.gz")) class GitlabRepo: def __init__(self, domain: str, owner: str, name: str) -> None: self.domain = domain self.owner = owner self.name = name def latest_commit(self) -> str: url = ( f"https://{self.domain}/{self.owner}/{self.name}/commits/master?format=atom" ) return fetch_commit_from_feed(url) def prefetch(self, ref: str) -> Tuple[str, Path]: url = f"https://{self.domain}/api/v4/projects/{self.owner}%2F{self.name}/repository/archive.tar.gz?sha={ref}" return nix_prefetch_zip(url) def prefetch_git(repo: Repo) -> Tuple[LockedVersion, Path]: cmd = ["nix-prefetch-git"] if repo.submodules: cmd += ["--fetch-submodules"] cmd += [repo.url.geturl()] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode != 0: raise NurError( f"Failed to prefetch git repository {repo.url.geturl()}: {result.stderr}" ) metadata = json.loads(result.stdout) lines = result.stderr.decode("utf-8").split("\n") repo_path = re.search("path is (.+)", lines[-5]) assert repo_path is not None path = Path(repo_path.group(1)) rev = metadata["rev"] sha256 = metadata["sha256"] return LockedVersion(repo.url, rev, sha256, repo.submodules), path def prefetch_github(repo: Repo) -> Tuple[LockedVersion, Optional[Path]]: github_path = Path(repo.url.path) gh_repo = GithubRepo(github_path.parts[1], github_path.parts[2]) commit = gh_repo.latest_commit() locked_version = repo.locked_version if locked_version is not None: if locked_version.rev == commit: return locked_version, None sha256, path = gh_repo.prefetch(commit) return LockedVersion(repo.url, commit, sha256), path def prefetch_gitlab(repo: Repo) -> Tuple[LockedVersion, Optional[Path]]: gitlab_path = Path(repo.url.path) gl_repo = GitlabRepo( repo.url.hostname, gitlab_path.parts[-2], gitlab_path.parts[-1] ) commit = gl_repo.latest_commit() locked_version = repo.locked_version if locked_version is not None: if locked_version.rev == commit: return locked_version, None sha256, path = gl_repo.prefetch(commit) return LockedVersion(repo.url, commit, sha256), path def prefetch(repo: Repo) -> Tuple[Repo, LockedVersion, Optional[Path]]: if repo.type == RepoType.GITHUB: locked_version, path = prefetch_github(repo) elif repo.type == RepoType.GITLAB: locked_version, path = prefetch_gitlab(repo) else: locked_version, path = prefetch_git(repo) return repo, locked_version, path